Kafka: Producer (0.10.0.0)

来源:http://www.cnblogs.com/f1194361820/archive/2016/11/10/6048429.html
-Advertisement-
Play Games

通過前面的架構簡述,知道了Producer是用來產生消息記錄,並將消息以非同步的方式發送給指定的topic的某個partition的。另外還知道,它保證了消息的有序的發送。那麼它是如何做到這些的呢?我們又該如何使用它的API來發送消息? Kafka Producer、Kafka Consumer相對於 ...


 

         通過前面的架構簡述,知道了Producer是用來產生消息記錄,並將消息以非同步的方式發送給指定的topic的某個partition的。另外還知道,它保證了消息的有序的發送。那麼它是如何做到這些的呢?我們又該如何使用它的API來發送消息?

         Kafka Producer、Kafka Consumer相對於 Kafka Broker,都屬於客戶端。Kafka支持多種語言的客戶端。下麵就根據Java 語言客戶端對Producer做個說明。

 

1、Producer API入門:

KafkaProducer是一個發送record到Kafka Cluster的客戶端API。這個類線程安全的。在應用程式中,通常的作法是:所有發往一個Kafka Cluster的線程使用同一個Producer對象.。如果你的程式要給多個Cluster發送消息,則需要使用多個Producer。

 

 

 

ProducerRecord說明

從上面代碼里可以看出,代表要發送的消息記錄類是ProducerRecord:

 

一條record通常包括5個欄位:

·topic:指定該record發往哪個topic下。[Required]

·partition:指定該record發到哪個partition中。[Optional]

·key:一個key。[Optional]

·value:記錄人內容。[Required]

·timestamp:時間戳。[Optional]

 

預設情況下:

如果用戶指定了partition,那麼就發往用戶指定的partition。如果用戶沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。

    在Producer端,如果用戶指定了timestamp,則record使用用戶指定的時間,如果用戶沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log文件時會重寫該時間。

 

 

 

2、非同步發送流程

2.1、用戶線程調用send方法將record放到BufferPool中

         可能在之前的kafka-client版本中,還支持同步方式發送消息記錄。不過在我看的版本(0.10.0.0)中,已經不再支持同步方式發送了。當用戶使用KafkaProducer#send()發送record時,執行流程是:

 

1、由interceptor chain對ProducerRecord做發送前的處理

攔截器介面是:ProducerInterceport,用戶可以自定義自己的攔截器實現。

 

該攔截器鏈,在Producer對象初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要註意的。

         ProducerInterceptor有兩個方法:

·onSend: KafkaProducer#send 調用時就會執行此方法。

·onAcknowledgement:發送失敗,或者發送成功(broker 通知producer代表發送成功)時都會調用該方法。

 

此階段執行的就是onSend方法。

 

2、阻塞方式獲取到broker cluster 上broker cluster的信息

採用RPC方式獲取到的broker信息,由一個MetaData類封裝。它包括了broker cluster的必要信息,譬如有:所有的broker信息(id\host\port等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。

         雖然該過程是阻塞的,但並不是每發送一個record都會通過RPC方式來獲取的。Metadata會在Producer端緩存,只有在record中指定的topic不存在時、或者MetaData輪詢周期到時才會執行。

 

3、對record中key、value進行序列化

這個沒有什麼可說的。內置了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。

 

4、為record設置partition屬性

前面說過,創建ProducerRecord時,partition是Optional的。所以如果用戶創建record時,沒有指定partition屬性。則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內置的實現:

·如果提供了Key值,會根據key序列化後的位元組數組的hashcode進行取模運算。

·如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。

 

5、校驗record的長度是否超出閾值

MAX_REQUEST_SIZE_CONFIG=”max.request.size”

BUFFER_MEMORY_CONFIG=”buffer.memory”

超出任何一項就會拋出異常。

 

6、為record設置timestamp

如果用戶創建ProducerRecord時沒有指定timestamp,此處為止設置為producer的當前時間。

其實在java client中,設計了一個Time介面,專門用於設置這個時間的。內置了一個實現SystemTime,這裡將record timestamp設置為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。

 

7、將該record壓縮後放到BufferPool

這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端隊列Deque<RecordBatch>,隊列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。

 

         關於record的壓縮方式,kafka producer在支持了幾種方式:

·NONE:就是不壓縮。

·GZIP:壓縮率為50%

·SNAPPY:壓縮率為50%

·LZ4:壓縮率為50%

 

         在將record放到Deque中最後一個RecordBatch中的過程如下:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。

         RecordBatch實際上是存儲於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在創建BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。

 

8、喚醒發送模塊

執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。

 

 

 

此外,在上述步驟2~8,不論哪一步出現問題,都會拋出異常。而拋出異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會調用第1步中提到的interceptor chain 執行onAcknowledgement告知用戶。

 

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {

        // intercept the record, which can be potentially modified; this method does not throw exceptions

        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

        return doSend(interceptedRecord, callback);

    }

 

    /**

     * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.

     * See {@link #send(ProducerRecord, Callback)} for details.

     */

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

        TopicPartition tp = null;

        try {

            // first make sure the metadata for the topic is available

            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);

            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

            byte[] serializedKey;

            try {

                serializedKey = keySerializer.serialize(record.topic(), record.key());

            } catch (ClassCastException cce) {

                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +

                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +

                        " specified in key.serializer");

            }

            byte[] serializedValue;

            try {

                serializedValue = valueSerializer.serialize(record.topic(), record.value());

            } catch (ClassCastException cce) {

                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +

                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +

                        " specified in value.serializer");

            }

            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());

            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

            ensureValidRecordSize(serializedSize);

            tp = new TopicPartition(record.topic(), partition);

            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

            // producer callback will make sure to call both 'callback' and interceptor callback

            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

            if (result.batchIsFull || result.newBatchCreated) {

                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

                this.sender.wakeup();

            }

            return result.future;

            // handling exceptions and record the errors;

            // for API exceptions return them in the future,

            // for other exceptions throw directly

        } catch (ApiException e) {

            log.debug("Exception occurred during message send:", e);

            if (callback != null)

                callback.onCompletion(null, e);

            this.errors.record();

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            return new FutureFailure(e);

        } catch (InterruptedException e) {

            this.errors.record();

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw new InterruptException(e);

        } catch (BufferExhaustedException e) {

            this.errors.record();

            this.metrics.sensor("buffer-exhausted-records").record();

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw e;

        } catch (KafkaException e) {

            this.errors.record();

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw e;

        } catch (Exception e) {

            // we notify interceptor about all exceptions, since onSend is called before anything else in this method

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw e;

        }

    }
View Code

 

  

2.2、發送調度

       KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而發送調度,則是由另外一個線程(Sender)來完成的。

Sender的執行過程如下:

1、 取出就緒的record

這一步是檢查要發送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設置為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。

 

2、 取出RecordBatch,並過濾掉過期的RecordBatch

對於過期的RecordBatch,會通過Sensor通知Interceptor發送失敗。

 

3、為要發送的RecordBatch創建請求

         一個RecordBatch一個ClientRequest。

4、保留請求併發送

把請求對象保留到一個inFlightRequest 集合中。這個集合中存放的是正在發送的請求,是一個topic到Deque的Map。當發送成功,或者失敗都會移除。

5、處理髮送結果

如果發送失敗,會嘗試retry。並由Sensor調度Interceptor。

如果發送成功,會由Sensor調度Interceptor。

 

3、Producer實現說明

從上述處理流程中,可以看到在java client中的一些設計:

1、Interceptor Chain:可以做為用於自定義插件的介面。

2、MetaData:producer 不按需以及定期的發送請求獲取最新的Cluster狀態信息。Producer根據這個信息可以直接將record batch發送到相關partition的Leader中。也就是在客戶端完成Load balance。

3、Partitioner:分區選擇工具,選擇發送到哪些分區,結合Metadata,完成Load balance。

4、RecordBatch:在客戶端對record壓縮進RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高性能。

5、非同步方式發送:提高用戶應用性能。

 

4、Producer Configuration

         在文章開始的地方說明瞭,使用Kafka Producer Java Client時,只需要創建一個KafkaProducer就可以了。而它在運行過程中,會使用到很多配置項,這些配置項都是在KafkaProducer初始化時完成的。

         下麵就來看看java client中要求的配置項:

 

·bootstrap.servers

用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有實例都配置上。因為它後自動發現所有的broker。

如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….

·key.serializervalue.serializer

配置序列化類名。指定 的這些類都要實現Serializer介面。

 

·acks

為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(發送RecordBatch的請求)完成情況。

對於message接收情況的確認,Kafka Broker支持了三種情形:1、不需要確認;2)leader接收到就確認;3)等所有可用的follower複製完畢進行確認。可以看出,這三種情況代表不同的確認粒度。在Java Producer Client中,對三種情形都做了支持,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。

 

         Kafka Producer Java Client 是如何支持這三種確認呢?

1、  在為RecordBatch創建請求時,acks的值會被封裝為請求頭的一部分。

2、  發送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。

如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。

 

該配置項預設值是1,即leader接收後就響應。

 

·buffer.memory

BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。

配置項的單位是byte,範圍是:[0,….]

 

·compression.type

Kafka提供了多種壓縮類型,可選值有4個: none, gzip, snappy, lz4。預設值是none。

 

·retries

當一個RecordBatch發送失敗時,就會重新改善以確保數據完成交付。該配置設置了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。

         如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設置成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender線程在一次輪詢中,如果有兩個recordbatch都要發送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是發送失敗,RecordBatch2緊接著RecordBatch1發送,它是發送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。

 

·ssl.key.password

Keystore 文件中私鑰的密碼。可選的。

·ssl.keystore.location

Keystore文件的位置。可選的。

·ssl.keystore.password

Keystore 文件的密碼。可選的。

·ssl.truststore.location

Trust store 文件的位置。可選的。

·ssl.truststore.password

Trust store文件的密碼。可選的。

·batch.size

RecordBatch的最大容量。預設值是16384(16KB)。

·client.id

邏輯名,client給broker發請求是會用到。預設值是:””。

·connections.max.idle.ms

Connection的最大空閑時間。預設值是540000 (9 min)

·linger.ms

Socket :solinger。延遲。預設值:0,即不延遲。

·max.block.ms

當需要的metadata未到達之前(例如要發送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設置這個阻塞時間是必要的。範圍:[0, Long.MAX]

 

·max.request.size

最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會拋出異常。

 

·partitioner.class

用於自定義partitioner演算法。預設值是:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

·receive.buffer.byte

TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。

如果設置為-1,則會採用操作系統的預設值。

 

·request.timeout.ms

最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。

·timeout.ms

這個時間配置的是follower到leader的ack超時時間。這個時間和 producer發送的請求的網路無關。

·block.on.buffer.full

當bufferPool用完後,如果client還在使用KafkaProducer發送record,要麼是BufferPool拒絕接收,要麼是拋出異常。

這個配置是預設值是flase,也就是當bufferpool滿時,不會拋出BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時拋出TimeoutExcpetion。

 

如果這個屬性值是true,則會把max.block.ms值設置為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。

·interceptor.class

自定義攔截器類。預設情況下沒有指定任何的interceptor。

·max.in.flight.requests.per.connection

每個連接中處於發送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]

·metric.reporters

MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。

 

·metrics.num.samples

計算metric時的採樣數。預設值是2。範圍:[1,Integer.MAX]

·metrics.sample.window.ms

採樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 回到目錄 空間與時間 空間換時間是在資料庫中經常出現的術語,簡單說就是把查詢需要的條件進行索引的存儲,然後查詢時為O(1)的時間複雜度來快速獲取數據,從而達到了使用空間存儲來換快速的時間響應!對於redis這個k/v存儲系統來說,複雜的查詢不是它所建議的,它的優勢在於通過key快速定位數據,它定位數 ...
  • 成功排除故障的十個步驟1.定義問題,建立一個清晰的問題陳述,目標是獲取技術問題和成功標準的一兩句摘要。2.確定問題的影響,企業利益相關者不會知道技術細節,你需要確定問題所造成的財物影響。3.占用正確的資源,可能是內部資源或者外部資源,以便問題得到有效的技術和人力支持。4.確定潛在的原因,會見所有必要 ...
  • 添加環境變數解決: 變數名:TNS_ADMIN 變數值:D:\Ocl\product\11.2.0\dbhome_1\NETWORK\ADMIN tnsnames.ora所在的路徑 ...
  • http://blog.csdn.net/lgb934/article/details/8662956 ...
  • InfluxDB提供類SQL語法,如果熟悉SQL的話會非常容易上手。本文就為大家介紹一下InfluxDB的基本操作。 InfluxDB提供類SQL語法,如果熟悉SQL的話會非常容易上手。 本文就為大家介紹一下InfluxDB的基本操作,更多InfluxDB詳細教程請看:InfluxDB系列學習教程目 ...
  • --查看資料庫版本-- select * from product_component_version; -- 查看dbf存放位置 select * from dba_data_files; -- 查看文件位置 SELECT * FROM dba_directories; -- 新建表空間 crea ...
  • 凌晨收到同事電話,反饋應用程式訪問Oracle資料庫時報錯,當時現場現象確認: 1. 應用程式訪問不了資料庫,使用SQL Developer測試發現訪問不了資料庫。報ORA-12570 TNS:packet reader failure 2. 使用lsnrctl status檢查監聽,一直沒有響應,... ...
  • 1.一個sqlserver資料庫實例上只能有一個tempdb資料庫,這個實例上所有的用戶都共用這個資料庫。2.tempdb資料庫在每次sqlserver重啟後都會重新創建,所以數據會丟失。3.因為tempdb資料庫上面的特性所以對tempdb資料庫的操作日誌不會保存重做信息,所以相比正常資料庫來說, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...