通過前面的架構簡述,知道了Producer是用來產生消息記錄,並將消息以非同步的方式發送給指定的topic的某個partition的。另外還知道,它保證了消息的有序的發送。那麼它是如何做到這些的呢?我們又該如何使用它的API來發送消息? Kafka Producer、Kafka Consumer相對於 ...
通過前面的架構簡述,知道了Producer是用來產生消息記錄,並將消息以非同步的方式發送給指定的topic的某個partition的。另外還知道,它保證了消息的有序的發送。那麼它是如何做到這些的呢?我們又該如何使用它的API來發送消息?
Kafka Producer、Kafka Consumer相對於 Kafka Broker,都屬於客戶端。Kafka支持多種語言的客戶端。下麵就根據Java 語言客戶端對Producer做個說明。
1、Producer API入門:
KafkaProducer是一個發送record到Kafka Cluster的客戶端API。這個類線程安全的。在應用程式中,通常的作法是:所有發往一個Kafka Cluster的線程使用同一個Producer對象.。如果你的程式要給多個Cluster發送消息,則需要使用多個Producer。
ProducerRecord說明
從上面代碼里可以看出,代表要發送的消息記錄類是ProducerRecord:
一條record通常包括5個欄位:
·topic:指定該record發往哪個topic下。[Required]
·partition:指定該record發到哪個partition中。[Optional]
·key:一個key。[Optional]
·value:記錄人內容。[Required]
·timestamp:時間戳。[Optional]
預設情況下:
如果用戶指定了partition,那麼就發往用戶指定的partition。如果用戶沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。
在Producer端,如果用戶指定了timestamp,則record使用用戶指定的時間,如果用戶沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log文件時會重寫該時間。
2、非同步發送流程
2.1、用戶線程調用send方法將record放到BufferPool中
可能在之前的kafka-client版本中,還支持同步方式發送消息記錄。不過在我看的版本(0.10.0.0)中,已經不再支持同步方式發送了。當用戶使用KafkaProducer#send()發送record時,執行流程是:
1、由interceptor chain對ProducerRecord做發送前的處理
攔截器介面是:ProducerInterceport,用戶可以自定義自己的攔截器實現。
該攔截器鏈,在Producer對象初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要註意的。
ProducerInterceptor有兩個方法:
·onSend: KafkaProducer#send 調用時就會執行此方法。
·onAcknowledgement:發送失敗,或者發送成功(broker 通知producer代表發送成功)時都會調用該方法。
此階段執行的就是onSend方法。
2、阻塞方式獲取到broker cluster 上broker cluster的信息
採用RPC方式獲取到的broker信息,由一個MetaData類封裝。它包括了broker cluster的必要信息,譬如有:所有的broker信息(id\host\port等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。
雖然該過程是阻塞的,但並不是每發送一個record都會通過RPC方式來獲取的。Metadata會在Producer端緩存,只有在record中指定的topic不存在時、或者MetaData輪詢周期到時才會執行。
3、對record中key、value進行序列化
這個沒有什麼可說的。內置了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。
4、為record設置partition屬性
前面說過,創建ProducerRecord時,partition是Optional的。所以如果用戶創建record時,沒有指定partition屬性。則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內置的實現:
·如果提供了Key值,會根據key序列化後的位元組數組的hashcode進行取模運算。
·如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。
5、校驗record的長度是否超出閾值
MAX_REQUEST_SIZE_CONFIG=”max.request.size”
BUFFER_MEMORY_CONFIG=”buffer.memory”
超出任何一項就會拋出異常。
6、為record設置timestamp
如果用戶創建ProducerRecord時沒有指定timestamp,此處為止設置為producer的當前時間。
其實在java client中,設計了一個Time介面,專門用於設置這個時間的。內置了一個實現SystemTime,這裡將record timestamp設置為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。
7、將該record壓縮後放到BufferPool中
這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端隊列Deque<RecordBatch>,隊列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。
關於record的壓縮方式,kafka producer在支持了幾種方式:
·NONE:就是不壓縮。
·GZIP:壓縮率為50%
·SNAPPY:壓縮率為50%
·LZ4:壓縮率為50%
在將record放到Deque中最後一個RecordBatch中的過程如下:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。
RecordBatch實際上是存儲於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在創建BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。
8、喚醒發送模塊
執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。
此外,在上述步驟2~8,不論哪一步出現問題,都會拋出異常。而拋出異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會調用第1步中提到的interceptor chain 執行onAcknowledgement告知用戶。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } /** * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } }View Code
2.2、發送調度
KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而發送調度,則是由另外一個線程(Sender)來完成的。
Sender的執行過程如下:
1、 取出就緒的record
這一步是檢查要發送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設置為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。
2、 取出RecordBatch,並過濾掉過期的RecordBatch
對於過期的RecordBatch,會通過Sensor通知Interceptor發送失敗。
3、為要發送的RecordBatch創建請求
一個RecordBatch一個ClientRequest。
4、保留請求併發送
把請求對象保留到一個inFlightRequest 集合中。這個集合中存放的是正在發送的請求,是一個topic到Deque的Map。當發送成功,或者失敗都會移除。
5、處理髮送結果
如果發送失敗,會嘗試retry。並由Sensor調度Interceptor。
如果發送成功,會由Sensor調度Interceptor。
3、Producer實現說明
從上述處理流程中,可以看到在java client中的一些設計:
1、Interceptor Chain:可以做為用於自定義插件的介面。
2、MetaData:producer 不按需以及定期的發送請求獲取最新的Cluster狀態信息。Producer根據這個信息可以直接將record batch發送到相關partition的Leader中。也就是在客戶端完成Load balance。
3、Partitioner:分區選擇工具,選擇發送到哪些分區,結合Metadata,完成Load balance。
4、RecordBatch:在客戶端對record壓縮進RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高性能。
5、非同步方式發送:提高用戶應用性能。
4、Producer Configuration
在文章開始的地方說明瞭,使用Kafka Producer Java Client時,只需要創建一個KafkaProducer就可以了。而它在運行過程中,會使用到很多配置項,這些配置項都是在KafkaProducer初始化時完成的。
下麵就來看看java client中要求的配置項:
·bootstrap.servers
用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有實例都配置上。因為它後自動發現所有的broker。
如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….
·key.serializer、value.serializer
配置序列化類名。指定 的這些類都要實現Serializer介面。
·acks
為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(發送RecordBatch的請求)完成情況。
對於message接收情況的確認,Kafka Broker支持了三種情形:1、不需要確認;2)leader接收到就確認;3)等所有可用的follower複製完畢進行確認。可以看出,這三種情況代表不同的確認粒度。在Java Producer Client中,對三種情形都做了支持,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。
Kafka Producer Java Client 是如何支持這三種確認呢?
1、 在為RecordBatch創建請求時,acks的值會被封裝為請求頭的一部分。
2、 發送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。
如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。
該配置項預設值是1,即leader接收後就響應。
·buffer.memory
BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。
配置項的單位是byte,範圍是:[0,….]
·compression.type
Kafka提供了多種壓縮類型,可選值有4個: none, gzip, snappy, lz4。預設值是none。
·retries
當一個RecordBatch發送失敗時,就會重新改善以確保數據完成交付。該配置設置了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。
如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設置成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender線程在一次輪詢中,如果有兩個recordbatch都要發送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是發送失敗,RecordBatch2緊接著RecordBatch1發送,它是發送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。
·ssl.key.password
Keystore 文件中私鑰的密碼。可選的。
·ssl.keystore.location
Keystore文件的位置。可選的。
·ssl.keystore.password
Keystore 文件的密碼。可選的。
·ssl.truststore.location
Trust store 文件的位置。可選的。
·ssl.truststore.password
Trust store文件的密碼。可選的。
·batch.size
RecordBatch的最大容量。預設值是16384(16KB)。
·client.id
邏輯名,client給broker發請求是會用到。預設值是:””。
·connections.max.idle.ms
Connection的最大空閑時間。預設值是540000 (9 min)
·linger.ms
Socket :solinger。延遲。預設值:0,即不延遲。
·max.block.ms
當需要的metadata未到達之前(例如要發送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設置這個阻塞時間是必要的。範圍:[0, Long.MAX]
·max.request.size
最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會拋出異常。
·partitioner.class
用於自定義partitioner演算法。預設值是:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
·receive.buffer.byte
TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。
如果設置為-1,則會採用操作系統的預設值。
·request.timeout.ms
最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。
·timeout.ms
這個時間配置的是follower到leader的ack超時時間。這個時間和 producer發送的請求的網路無關。
·block.on.buffer.full
當bufferPool用完後,如果client還在使用KafkaProducer發送record,要麼是BufferPool拒絕接收,要麼是拋出異常。
這個配置是預設值是flase,也就是當bufferpool滿時,不會拋出BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時拋出TimeoutExcpetion。
如果這個屬性值是true,則會把max.block.ms值設置為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。
·interceptor.class
自定義攔截器類。預設情況下沒有指定任何的interceptor。
·max.in.flight.requests.per.connection
每個連接中處於發送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]
·metric.reporters
MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。
·metrics.num.samples
計算metric時的採樣數。預設值是2。範圍:[1,Integer.MAX]
·metrics.sample.window.ms
採樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]