5.kafka API consumer

来源:https://www.cnblogs.com/xiguage119/archive/2019/07/24/11241417.html
-Advertisement-
Play Games

1.kafka consumer流程1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequest給bootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信 ...


1.kafka consumer流程
1.1.在啟動時或者協調節點故障轉移時,消費者發送ConsumerMetadataRequestbootstrap brokers列表中的任意一個brokers。在ConsumerMetadataResponse中,它接收消費者對應的消費組所屬的協調節點的位置信息。

1.2.消費者連接協調節點,併發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前消費組的新的generation編號。這個時候消費組管理已經完成,消費者就可以開始抓取數據,併為它擁有的partitions提交offsets

1.3.如果HeartbeatResponse沒有錯誤返回,消費者會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。

Coordinator協調節點的工作過程:
1.在穩定狀態下,協調節點通過故障檢測協議跟蹤每個消費組中每個消費者的健康狀況。

2.在選舉和啟動時,協調節點讀取它管理的消費組列表,以及從ZK中讀取每個消費組的成員信息。如果之前沒有成員信息,它不會做任何動作。只有在同一個消費組的第一個消費者註冊進來時,協調節點才開始工作(即開始載入消費組的消費者成員信息)

3.當協調節點完全載入完它所負責的消費組列表的所有組成員之前,它會在以下幾種請求的響應中返回CoordinatorStartupNotComplete錯誤碼:HeartbeatRequestOffsetCommitRequestJoinGroupRequest。這樣消費者就會過段時間重試(直到完全載入,沒有錯誤碼返回為止)

4.在選舉或啟動時,協調節點會對消費組中的所有消費者進行故障檢測。根據故障檢測協議被協調節點標記為Dead的消費者會從消費組中移除,這個時候協調節點會為Dead的消費者所屬的消費組觸發一個平衡操作(消費者Dead之後,這個消費者擁有的partition需要平衡給其他消費者)

5.HeartbeatResponse返回IllegalGeneration錯誤碼,就會觸發平衡操作。一旦所有存活的消費者通過JoinGroupRequests重新註冊到協調節點,協調節點會將最新的partition所有權信息在JoinGroupResponse的每個消費者之間通信(同步),然後就完成了平衡操作。

6.協調節點會跟蹤任何一個消費者已經註冊的topicstopic-partition的變更。如果它檢測到某個topic新增的partition,就會觸發平衡操作。當創建一個新的topics也會觸發平衡操作,因為消費者可以在topic被創建之前就註冊它感興趣的topics

2.消費者組的使用場景

Kafka里的消費者組有兩個使用的場景:
2.1“隊列模式”:在同一組的消費者共同消費一個主題的所有消息,而且確保一條消息只被一個消費者處理。一個主題的所有的分區會和一個消費組的所有消費者做關聯:一個分區只會與一個消費者關聯,它的消息不會被其它的消費者接收。
最開始只有一個消費者時,所有的分區都分配給了它。當消息的規模增加時,我們就需要擴展消費者的數量,水平擴展處理能力,一直可以達到每個消費者只關聯一個分區。大於分區數的消費者是會處在空閑狀態,因為沒有分配任何的分區。

2.2“發佈/訂閱模式”: 創建不同的消費者組意味一個主題的消息會發送給所有訂閱它的消費者組,然後消費者組依照前面共同協作的場景進行分配。這往往是因為我們有不同的應用需求,比如一批交易數據,資金系統、ERP系統會消費它而風險監控也需要同時消費它。這就實現了數據的透明非同步共用。

在兩個場景中,消費者組有個重要的功能:rebalancing。當一個新的消費者加入一個組,如果還有有效的分區(消費者數<=主題分區數),會開始一個重新均衡分配的操作,會將一個已關聯的分區(它的原消費者仍保有至少一個分區)重新分配給新加入的消費者。同樣的,當一個消費者因為各種原因離開這個組,它的所有分區會被分配給剩下的消費者。

 Subscribe(自動) assign(手動)
前面所說的自動分配是指在 KafkaConsumer API中的subscribe()方法。這個方法強制要求你為消費者設置一個消費者組,group.id參數不能為空。而你不需要處理分區的分配問題而對應subscribe()方法你可以採用手動的方式,指定消費者讀取哪個主題分區,則:assign() 方法。當你需要精確地控制消息處理的負載,也能確定哪個分區有哪些消息時,這種手動的方式會很有用

3.自動提交方式api
[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic topic11

 

[hadoop@h201 kkk]$ vi cc.java
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.Arrays;
public class cc {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//設置kafka集群的地址
props.put("bootstrap.servers", "h201:9092,h202:9092,h203:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "g11");
//開啟offset自動提交
props.put("enable.auto.commit", "true");
//自動提交時間間隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//實例化一個消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費者訂閱主題,可以訂閱多個主題
consumer.subscribe(Arrays.asList("topic11"));
//死迴圈不停的從broker中拿數據
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

 

[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/javac -classpath /home/hadoop/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar cc.java
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/java cc

 

解釋:
Poll方法用來獲取消息 ,poll(拉取)
consumer.poll(100) :100ms內拉取一次數據
Record :為存儲的消息,record.value 為消息的內容

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • nano是一個字元終端的文本編輯器,有點像DOS下的editor程式。它比vi/vim要簡單得多,比較適合Linux初學者使用。某些Linux發行版的預設編輯器就是nano。 nano命令可以打開指定文件進行編輯,預設情況下它會自動斷行,即在一行中輸入過長的內容時自動拆分成幾行,但用這種方式來處理某 ...
  • 1.網路設置 裝好CentOS7後,我們一開始是上不了網的 這時候,可以輸入命令dhclient,可以自動獲取一個IP地址,再用命令ip addr查看IP 不過這時候獲取的IP是動態的,下次重啟系統後,IP地址也會變化,還是建議設置成靜態IP 2.配置YUM本地源 參考 https://www.cn ...
  • 公司的Linux伺服器都是通過一臺JumpServer跳轉的。個人使用Jumpserver(開源跳板機系統)時,有時候由於需要上傳、下載文件很不方便。而由於配置關係,一般情況無法使用SecureCRT直接通過ssh連接到伺服器。所以個人設置了/etc/ssh/sshd_config。允許我的電腦(電... ...
  • 1. sysctl -w net.ipv4.tcp_syncookies=1 #啟用使用syncookiessysctl -w net.ipv4.tcp_synack_retries=1 #降低syn重試次數sysctl -w net.ipv4.tcp_syn_retries=1 #降低syn重試次 ...
  • linux啟動順序流程圖: 啟動第一步--載入 BIOS 當你打開電腦電源,電腦會首先載入 BIOS 信息,BIOS 信息是如此的重要,以至於電腦必須在最開始就找到它。這是因為 BIOS 中包含了 CPU 的相關信息、設備啟動順序信息、硬碟信息、記憶體信息、時鐘信息、PnP 特性等等。在此之後, ...
  • 安裝依賴 解壓 安裝nginx 配置Tomcat伺服器 upstream tomcats{ server localhost:8080 weight=3; # weigh表示權重,越大訪問的機率越多 server localhost:8880 weight=6; } location / { # 這 ...
  • 恢復內容開始 1.用JDBC設置aa的balance值為1500 2.用JDBC添加姓名cc,balance為3000 3.用JDBC刪除id為3的數據 4.用JDBC創建一個student表 5.用JDBC查詢account表中所有數據 6.用JDBC查詢account表中所有數據 7.JDBC工 ...
  • 使用數據處理函數 函數 與其他大多數電腦語言一樣,SQL支持利用函數來處理數據。函數一般是在數據上執行的,他給數據的轉換和處理提供了方便,在前一章中用來去掉尾空格的RTrim()就是一個函數的例子 文本處理函數 輸入: SELECT vend_name,Upper(vend_name) AS ve ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...