生產者創建消息。在其他基於發佈與訂閱的消息系統中,生產者可能被稱為發佈者 或 寫入者。 一般情況下,一個消息會被髮布到一個特定的主題上。生產者在預設情況下把消息均衡地分佈到主題的所有分區上,而並不關心特定消息會被寫到哪個分區。不過,在某些情況下,生產者會把消息直接寫到指定的分區。這通常是通過消息鍵和 ...
生產者創建消息。在其他基於發佈與訂閱的消息系統中,生產者可能被稱為發佈者 或 寫入者。
一般情況下,一個消息會被髮布到一個特定的主題上。生產者在預設情況下把消息均衡地分佈到主題的所有分區上,而並不關心特定消息會被寫到哪個分區。不過,在某些情況下,生產者會把消息直接寫到指定的分區。這通常是通過消息鍵和分區器來實現的,分區器為鍵生成一個散列值,並將其映射到指定的分區上。這樣可以保證包含同一個鍵的消息會被寫到同一個分區上。生產者也可以使用自定義的分區器,根據不同的業務規則將消息映射到分區。
生產者發送消息的方式
生產者發送消息主要有 2 種方式:同步發送消息、非同步發送消息
同步發送消息
同步發送消息:我們調用 KafkaProducer 的 send() 方法發送消息,send() 方法會返回一個包含 RecordMetadata 的 Future 對象,然後調用 Future 的 get() 方法等待 Kafka 響應,通過 Kafka 的響應,我們就可以知道消息是否發送成功。
- 如果伺服器返回錯誤,Future 的 get() 方法會拋出異常。
- 如果沒有發生錯誤,我們會得到一個 RecordMetadata 對象,這個對象包含消息的目標主題、分區信息和消息的偏移量等信息。
我們調用 KafkaProducer 的 send() 方法發送 ProducerRecord 對象,消息先是被放進緩衝區,然後使用單獨的線程將消息發送到伺服器端。
異常處理
如果在發送數據之前或者在發送過程中發生了任何錯誤,比如 broker 返回了一個不允許重發消息的異常或者已經超過了重發的次數,那麼就會拋出異常。在發送消息之前,生產者也是有可能發生異常的。這些異常有可能是 SerializationException(說明序列化消息失敗)、BufferExhaustedException 或 TimeoutException(說明緩衝區已滿),又或者是 InterruptException(說明發送線程被中斷)。
KafkaProducer 一般會發生兩類錯誤。
- 其中一類是可重試錯誤,這類錯誤可以通過重發消息來解決。比如對於連接錯誤,可以通過再次建立連接來解決,“無主(no leader)”錯誤則可以通過重新為分區選舉首領來解決。KafkaProducer 可以被配置成自動重試,如果在多次重試後仍無法解決問題,應用程式會收到一個重試異常。
- 另一類錯誤無法通過重試解決,比如“消息太大”異常。對於這類錯誤,KafkaProducer 不會進行任何重試,直接拋出異常。
public void send(String topic, String key, String val) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);
try {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
SendResult<String, String> sendResult = future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
非同步發送消息
非同步發送消息:我們調用 KafkaProducer 的 send() 方法,並指定一個回調方法,在伺服器返迴響應時調用該方法。
大多數時候,我們並不需要等待響應。不過在遇到消息發送失敗時,我們需要拋出異常、記錄錯誤日誌,或者把消息寫入“錯誤消息”文件以便日後分析。為了在非同步發送消息的同時能夠對異常情況進行處理,生產者提供了回調支持。
為了使用回調,需要一個實現了 org.apache.kafka.clients.producer.Callback 介面的類,這個介面只有一個 onCompletion() 方法。如果 Kafka 返回一個錯誤,onCompletion() 方法會拋出一個非空異常。通過 onCompletion() 方法拋出的異常,我們可以對發送失敗的消息進行處理。一般情況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數超出上限的情況。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
分區器
介紹分區
ProducerRecord 對象包含目標主題、消息鍵和值(消息)。
- 如果消息鍵為 null,並且使用了預設的 DefaultPartitioner 分區器,那麼分區器使用粘性分區策略(UniformSticky),會隨機選擇一個分區,並儘可能一直使用該分區,等到該分區的 batch 已滿或者已完成,Kafka 再隨機一個分區進行使用(保證和上一次的分區不同)。
- 如果消息鍵不為 null,並且使用了預設的 DefaultPartitioner 分區器,那麼分區器會對消息鍵進行散列(使用 Kafka 自己的散列演算法,即使升級 Java 版本,散列值也不會發生變化),然後根據散列值把消息映射到特定的分區上(散列值 與 主題的分區數進行取餘得到 partition 值)。
這裡的關鍵之處在於,同一個鍵總是被映射到同一個分區上,所以在進行映射時,我們會使用主題的所有分區,而不僅僅是可用的分區。這也意味著,如果寫入數據的分區是不可用的,那麼就會發生錯誤。
只有在不改變主題分區數量的情況下,鍵與分區之間的映射才能保持不變。一旦主題增加了新的分區,那麼鍵與分區之間的映射關係就改變了。如果要使用鍵來映射分區,那麼最好在創建主題的時候就把分區規劃好,而且永遠不要增加新分區。
自定義分區策略
生產者可以使用自定義的分區器,根據不同的業務規則將消息映射到分區。
通過分區器實現自定義分區策略的步驟:
- 定義一個類,該類實現 Partitioner 介面(分區器)
- 配置生產者(KafkaProducer),讓生產者發送消息時使用自定義的分區器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {
/**
* 返回信息對應的分區
*
* @param topic 主題
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化後的位元組數組
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化後的位元組數組
* @param cluster 集群元數據可以查看分區信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
if ((keyBytes == null) || (!(key instanceof String))) {
throw new InvalidRecordException("We expect all messages to have String type as key");
}
// 實現自己的分區策略
// 返回數據寫入的分區號
return 0;
}
// 關閉資源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
參考資料
《Kafka 權威指南》第 3 章:Kafka 生產者——向 Kafka 寫入數據
本文來自博客園,作者:真正的飛魚,轉載請註明原文鏈接:https://www.cnblogs.com/feiyu2/p/17250979.html