1.kafka consumer流程1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequest給bootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信 ...
1.kafka consumer流程
1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequest給bootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信息。
1.2.消費者連接協調節點,併發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前消費組的新的generation編號。這個時候消費組管理已經完成,消費者就可以開始抓取數據,併為它擁有的partitions提交offsets。
1.3.如果HeartbeatResponse沒有錯誤返回,消費者會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。
Coordinator協調節點的工作過程:
1.在穩定狀態下,協調節點通過故障檢測協議跟蹤每個消費組中每個消費者的健康狀況。
2.在選舉和啟動時,協調節點讀取它管理的消費組列表,以及從ZK中讀取每個消費組的成員信息。如果之前沒有成員信息,它不會做任何動作。只有在同一個消費組的第一個消費者註冊進來時,協調節點才開始工作(即開始載入消費組的消費者成員信息)。
3.當協調節點完全載入完它所負責的消費組列表的所有組成員之前,它會在以下幾種請求的響應中返回CoordinatorStartupNotComplete錯誤碼:HeartbeatRequest,OffsetCommitRequest,JoinGroupRequest。這樣消費者就會過段時間重試(直到完全載入,沒有錯誤碼返回為止)。
4.在選舉或啟動時,協調節點會對消費組中的所有消費者進行故障檢測。根據故障檢測協議被協調節點標記為Dead的消費者會從消費組中移除,這個時候協調節點會為Dead的消費者所屬的消費組觸發一個平衡操作(消費者Dead之後,這個消費者擁有的partition需要平衡給其他消費者)。
5.當HeartbeatResponse返回IllegalGeneration錯誤碼,就會觸發平衡操作。一旦所有存活的消費者通過JoinGroupRequests重新註冊到協調節點,協調節點會將最新的partition所有權信息在JoinGroupResponse的每個消費者之間通信(同步),然後就完成了平衡操作。
6.協調節點會跟蹤任何一個消費者已經註冊的topics的topic-partition的變更。如果它檢測到某個topic新增的partition,就會觸發平衡操作。當創建一個新的topics也會觸發平衡操作,因為消費者可以在topic被創建之前就註冊它感興趣的topics。
2.消費者組的使用場景
Kafka里的消費者組有兩個使用的場景:
2.1“隊列模式”:在同一組的消費者共同消費一個主題的所有消息,而且確保一條消息只被一個消費者處理。一個主題的所有的分區會和一個消費組的所有消費者做關聯:一個分區只會與一個消費者關聯,它的消息不會被其它的消費者接收。
最開始只有一個消費者時,所有的分區都分配給了它。當消息的規模增加時,我們就需要擴展消費者的數量,水平擴展處理能力,一直可以達到每個消費者只關聯一個分區。大於分區數的消費者是會處在空閑狀態,因為沒有分配任何的分區。
2.2“發佈/訂閱模式”: 創建不同的消費者組意味一個主題的消息會發送給所有訂閱它的消費者組,然後消費者組依照前面共同協作的場景進行分配。這往往是因為我們有不同的應用需求,比如一批交易數據,資金系統、ERP系統會消費它而風險監控也需要同時消費它。這就實現了數據的透明非同步共用。
在兩個場景中,消費者組有個重要的功能:rebalancing。當一個新的消費者加入一個組,如果還有有效的分區(消費者數<=主題分區數),會開始一個重新均衡分配的操作,會將一個已關聯的分區(它的原消費者仍保有至少一個分區)重新分配給新加入的消費者。同樣的,當一個消費者因為各種原因離開這個組,它的所有分區會被分配給剩下的消費者。
Subscribe(自動) assign(手動)
前面所說的自動分配是指在 KafkaConsumer API中的subscribe()方法。這個方法強制要求你為消費者設置一個消費者組,group.id參數不能為空。而你不需要處理分區的分配問題。而對應subscribe()方法。你可以採用手動的方式,指定消費者讀取哪個主題分區,則:assign() 方法。當你需要精確地控制消息處理的負載,也能確定哪個分區有哪些消息時,這種手動的方式會很有用
3.自動提交方式api
[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic topic11
[hadoop@h201 kkk]$ vi cc.java
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.Arrays;
public class cc {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//設置kafka集群的地址
props.put("bootstrap.servers", "h201:9092,h202:9092,h203:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "g11");
//開啟offset自動提交
props.put("enable.auto.commit", "true");
//自動提交時間間隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//實例化一個消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費者訂閱主題,可以訂閱多個主題
consumer.subscribe(Arrays.asList("topic11"));
//死迴圈不停的從broker中拿數據
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/javac -classpath /home/hadoop/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar cc.java
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/java cc
解釋:
Poll方法用來獲取消息 ,poll(拉取)
consumer.poll(100) :100ms內拉取一次數據
Record :為存儲的消息,record.value 為消息的內容