5.kafka API consumer

来源:https://www.cnblogs.com/xiguage119/archive/2019/07/24/11241417.html
-Advertisement-
Play Games

1.kafka consumer流程1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequest給bootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信 ...


1.kafka consumer流程
1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequestbootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信息。

1.2.消費者連接協調節點,併發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前消費組的新的generation編號。這個時候消費組管理已經完成,消費者就可以開始抓取數據,併為它擁有的partitions提交offsets

1.3.如果HeartbeatResponse沒有錯誤返回,消費者會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。

Coordinator協調節點的工作過程:
1.在穩定狀態下,協調節點通過故障檢測協議跟蹤每個消費組中每個消費者的健康狀況。

2.在選舉和啟動時,協調節點讀取它管理的消費組列表,以及從ZK中讀取每個消費組的成員信息。如果之前沒有成員信息,它不會做任何動作。只有在同一個消費組的第一個消費者註冊進來時,協調節點才開始工作(即開始載入消費組的消費者成員信息)

3.當協調節點完全載入完它所負責的消費組列表的所有組成員之前,它會在以下幾種請求的響應中返回CoordinatorStartupNotComplete錯誤碼:HeartbeatRequestOffsetCommitRequestJoinGroupRequest。這樣消費者就會過段時間重試(直到完全載入,沒有錯誤碼返回為止)

4.在選舉或啟動時,協調節點會對消費組中的所有消費者進行故障檢測。根據故障檢測協議被協調節點標記為Dead的消費者會從消費組中移除,這個時候協調節點會為Dead的消費者所屬的消費組觸發一個平衡操作(消費者Dead之後,這個消費者擁有的partition需要平衡給其他消費者)

5.HeartbeatResponse返回IllegalGeneration錯誤碼,就會觸發平衡操作。一旦所有存活的消費者通過JoinGroupRequests重新註冊到協調節點,協調節點會將最新的partition所有權信息在JoinGroupResponse的每個消費者之間通信(同步),然後就完成了平衡操作。

6.協調節點會跟蹤任何一個消費者已經註冊的topicstopic-partition的變更。如果它檢測到某個topic新增的partition,就會觸發平衡操作。當創建一個新的topics也會觸發平衡操作,因為消費者可以在topic被創建之前就註冊它感興趣的topics

2.消費者組的使用場景

Kafka里的消費者組有兩個使用的場景:
2.1“隊列模式”:在同一組的消費者共同消費一個主題的所有消息,而且確保一條消息只被一個消費者處理。一個主題的所有的分區會和一個消費組的所有消費者做關聯:一個分區只會與一個消費者關聯,它的消息不會被其它的消費者接收。
最開始只有一個消費者時,所有的分區都分配給了它。當消息的規模增加時,我們就需要擴展消費者的數量,水平擴展處理能力,一直可以達到每個消費者只關聯一個分區。大於分區數的消費者是會處在空閑狀態,因為沒有分配任何的分區。

2.2“發佈/訂閱模式”: 創建不同的消費者組意味一個主題的消息會發送給所有訂閱它的消費者組,然後消費者組依照前面共同協作的場景進行分配。這往往是因為我們有不同的應用需求,比如一批交易數據,資金系統、ERP系統會消費它而風險監控也需要同時消費它。這就實現了數據的透明非同步共用。

在兩個場景中,消費者組有個重要的功能:rebalancing。當一個新的消費者加入一個組,如果還有有效的分區(消費者數<=主題分區數),會開始一個重新均衡分配的操作,會將一個已關聯的分區(它的原消費者仍保有至少一個分區)重新分配給新加入的消費者。同樣的,當一個消費者因為各種原因離開這個組,它的所有分區會被分配給剩下的消費者。

 Subscribe(自動) assign(手動)
前面所說的自動分配是指在 KafkaConsumer API中的subscribe()方法。這個方法強制要求你為消費者設置一個消費者組,group.id參數不能為空。而你不需要處理分區的分配問題而對應subscribe()方法你可以採用手動的方式,指定消費者讀取哪個主題分區,則:assign() 方法。當你需要精確地控制消息處理的負載,也能確定哪個分區有哪些消息時,這種手動的方式會很有用

3.自動提交方式api
[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic topic11

 

[hadoop@h201 kkk]$ vi cc.java
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.Arrays;
public class cc {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//設置kafka集群的地址
props.put("bootstrap.servers", "h201:9092,h202:9092,h203:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "g11");
//開啟offset自動提交
props.put("enable.auto.commit", "true");
//自動提交時間間隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//實例化一個消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費者訂閱主題,可以訂閱多個主題
consumer.subscribe(Arrays.asList("topic11"));
//死迴圈不停的從broker中拿數據
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

 

[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/javac -classpath /home/hadoop/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar cc.java
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/java cc

 

解釋:
Poll方法用來獲取消息 ,poll(拉取)
consumer.poll(100) :100ms內拉取一次數據
Record :為存儲的消息,record.value 為消息的內容

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • nano是一個字元終端的文本編輯器,有點像DOS下的editor程式。它比vi/vim要簡單得多,比較適合Linux初學者使用。某些Linux發行版的預設編輯器就是nano。 nano命令可以打開指定文件進行編輯,預設情況下它會自動斷行,即在一行中輸入過長的內容時自動拆分成幾行,但用這種方式來處理某 ...
  • 1.網路設置 裝好CentOS7後,我們一開始是上不了網的 這時候,可以輸入命令dhclient,可以自動獲取一個IP地址,再用命令ip addr查看IP 不過這時候獲取的IP是動態的,下次重啟系統後,IP地址也會變化,還是建議設置成靜態IP 2.配置YUM本地源 參考 https://www.cn ...
  • 公司的Linux伺服器都是通過一臺JumpServer跳轉的。個人使用Jumpserver(開源跳板機系統)時,有時候由於需要上傳、下載文件很不方便。而由於配置關係,一般情況無法使用SecureCRT直接通過ssh連接到伺服器。所以個人設置了/etc/ssh/sshd_config。允許我的電腦(電... ...
  • 1. sysctl -w net.ipv4.tcp_syncookies=1 #啟用使用syncookiessysctl -w net.ipv4.tcp_synack_retries=1 #降低syn重試次數sysctl -w net.ipv4.tcp_syn_retries=1 #降低syn重試次 ...
  • linux啟動順序流程圖: 啟動第一步--載入 BIOS 當你打開電腦電源,電腦會首先載入 BIOS 信息,BIOS 信息是如此的重要,以至於電腦必須在最開始就找到它。這是因為 BIOS 中包含了 CPU 的相關信息、設備啟動順序信息、硬碟信息、記憶體信息、時鐘信息、PnP 特性等等。在此之後, ...
  • 安裝依賴 解壓 安裝nginx 配置Tomcat伺服器 upstream tomcats{ server localhost:8080 weight=3; # weigh表示權重,越大訪問的機率越多 server localhost:8880 weight=6; } location / { # 這 ...
  • 恢復內容開始 1.用JDBC設置aa的balance值為1500 2.用JDBC添加姓名cc,balance為3000 3.用JDBC刪除id為3的數據 4.用JDBC創建一個student表 5.用JDBC查詢account表中所有數據 6.用JDBC查詢account表中所有數據 7.JDBC工 ...
  • 使用數據處理函數 函數 與其他大多數電腦語言一樣,SQL支持利用函數來處理數據。函數一般是在數據上執行的,他給數據的轉換和處理提供了方便,在前一章中用來去掉尾空格的RTrim()就是一個函數的例子 文本處理函數 輸入: SELECT vend_name,Upper(vend_name) AS ve ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...