20,000+ 字,徹底搞懂 Kafka!

来源:https://www.cnblogs.com/javastack/archive/2023/08/25/17656918.html
-Advertisement-
Play Games

## **1、為什麼有消息系統** ##### 1、解耦合 ##### 2、非同步處理 例如電商平臺,秒殺活動。 一般流程會分為: 1. 風險控制 2. 庫存鎖定 3. 生成訂單 4. 簡訊通知 5. 更新數據 通過消息系統將秒殺活動業務拆分開,將不急需處理的業務放在後面慢慢處理; 流程改為: 1. ...


1、為什麼有消息系統

1、解耦合
2、非同步處理

例如電商平臺,秒殺活動。

一般流程會分為:

  1. 風險控制
  2. 庫存鎖定
  3. 生成訂單
  4. 簡訊通知
  5. 更新數據

通過消息系統將秒殺活動業務拆分開,將不急需處理的業務放在後面慢慢處理;

流程改為:

  1. 風險控制
  2. 庫存鎖定
  3. 消息系統
  4. 生成訂單
  5. 簡訊通知
  6. 更新數據
3、流量的控制

3.1 網關在接受到請求後,就把請求放入到消息隊列裡面

3.2 後端的服務從消息隊列裡面獲取到請求,完成後續的秒殺處理流程。然後再給用戶返回結果。

  • 優點:控制了流量
  • 缺點:會讓流程變慢

推薦一個開源免費的 Spring Boot 實戰項目:

https://github.com/javastacks/spring-boot-best-practice

2、Kafka核心概念

  • 生產者:Producer 往Kafka集群生成數據
  • 消費者:Consumer 往Kafka裡面去獲取數據,處理數據、消費數據

Kafka的數據是由消費者自己去拉去Kafka裡面的數據

  • 主題:topic
  • 分區:partition

預設一個topic有一個分區(partition),自己可設置多個分區(分區分散存儲在伺服器不同節點上)

解決了一個海量數據如何存儲的問題

例如:有2T的數據,一臺伺服器有1T,一個topic可以分多個區,分別存儲在多台伺服器上,解決海量數據存儲問題

3、Kafka的集群架構

Kafka集群中,一個kafka伺服器就是一個broker,Topic只是邏輯上的概念,partition在磁碟上就體現為一個目錄。

Consumer Group:消費組,消費數據的時候,都必須指定一個group id,指定一個組的id

假定程式A和程式B指定的group id號一樣,那麼兩個程式就屬於同一個消費組

特殊:

  • 比如,有一個主題topicA, 程式A去消費了這個topicA,那麼程式B就不能再去消費topicA(程式A和程式B屬於一個消費組)
  • 再比如程式A已經消費了topicA裡面的數據,現在還是重新再次消費topicA的數據,是不可以的,但是重新指定一個group id號以後,可以消費。

不同消費組之間沒有影響。消費組需自定義,消費者名稱程式自動生成(獨一無二)。

Controller:Kafka節點裡面的一個主節點。藉助zookeeper

4、Kafka磁碟順序寫保證寫數據性能

kafka寫數據:

順序寫,往磁碟上寫數據時,就是追加數據,沒有隨機寫的操作。

經驗:

如果一個伺服器磁碟達到一定的個數,磁碟也達到一定轉數,往磁碟裡面順序寫(追加寫)數據的速度和寫記憶體的速度差不多。

生產者生產消息,經過kafka服務先寫到os cache 記憶體中,然後經過sync順序寫到磁碟上

5、Kafka零拷貝機制保證讀數據高性能

消費者讀取數據流程:

  1. 消費者發送請求給kafka服務
  2. kafka服務去os cache緩存讀取數據(緩存沒有就去磁碟讀取數據)
  3. 從磁碟讀取了數據到os cache緩存中
  4. os cache複製數據到kafka應用程式中
  5. kafka將數據(複製)發送到socket cache中
  6. socket cache通過網卡傳輸給消費者

kafka linux sendfile技術 — 零拷貝

  1. 消費者發送請求給kafka服務
  2. kafka服務去os cache緩存讀取數據(緩存沒有就去磁碟讀取數據)
  3. 從磁碟讀取了數據到os cache緩存中
  4. os cache直接將數據發送給網卡
  5. 通過網卡將數據傳輸給消費者

6、Kafka日誌分段保存

Kafka中一個主題,一般會設置分區;比如創建了一個topic_a,然後創建的時候指定了這個主題有三個分區。

其實在三台伺服器上,會創建三個目錄。

伺服器1(kafka1):

  • 創建目錄topic_a-0:
  • 目錄下麵是我們文件(存儲數據),kafka數據就是message,數據存儲在log文件里
  • .log結尾的就是日誌文件,在kafka中把數據文件就叫做日誌文件。

一個分區下麵預設有n多個日誌文件(分段存儲),一個日誌文件預設1G

伺服器2(kafka2):

  • 創建目錄topic_a-1:

伺服器3(kafka3):

  • 創建目錄topic_a-2:

7、Kafka二分查找定位數據

Kafka裡面每一條消息,都有自己的offset(相對偏移量),存在物理磁碟上面,在position

Position:物理位置(磁碟上面那個地方)

也就是說一條消息就有兩個位置:

  • offset:相對偏移量(相對位置)
  • position:磁碟物理位置

稀疏索引:

  • Kafka中採用了稀疏索引的方式讀取索引,kafka每當寫入了4k大小的日誌(.log),就往index里寫入一個記錄索引。

其中會採用二分查找

8、高併發網路設計(先瞭解NIO)

網路設計部分是kafka中設計最好的一個部分,這也是保證Kafka高併發、高性能的原因

對kafka進行調優,就得對kafka原理比較瞭解,尤其是網路設計部分

Reactor網路設計模式1:

Reactor網路設計模式2:

Reactor網路設計模式3:

Kafka超高併發網路設計:

9、Kafka冗餘副本保證高可用

在kafka裡面分區是有副本的,註:0.8以前是沒有副本機制的。創建主題時,可以指定分區,也可以指定副本個數。副本是有角色的:

leader partition:

  • 寫數據、讀數據操作都是從leader partition去操作的。
  • 會維護一個ISR(in-sync- replica )列表,但是會根據一定的規則刪除ISR列表裡面的值

生產者發送來一個消息,消息首先要寫入到leader partition中

寫完了以後,還要把消息寫入到ISR列表裡面的其它分區,寫完後才算這個消息提交

follower partition:從leader partition同步數據。

10、優秀架構思考-總結

Kafka — 高併發、高可用、高性能

  • 高可用:多副本機制
  • 高併發:網路架構設計 三層架構:多selector -> 多線程 -> 隊列的設計(NIO)
  • 高性能:

寫數據:

  1. 把數據先寫入到OS Cache
  2. 寫到磁碟上面是順序寫,性能很高

讀數據:

  1. 根據稀疏索引,快速定位到要消費的數據

  2. 零拷貝機制

    • 減少數據的拷貝
    • 減少了應用程式與操作系統上下文切換

11、Kafka生產環境搭建

11.1 需求場景分析

電商平臺,需要每天10億請求都要發送到Kafka集群上面。二八反正,一般評估出來問題都不大。

10億請求 -> 24 過來的,一般情況下,每天的12:00 到早上8:00 這段時間其實是沒有多大的數據量的。80%的請求是用的另外16小時的處理的。16個小時處理 -> 8億的請求。16 * 0.2 = 3個小時 處理了8億請求的80%的數據

也就是說6億的數據是靠3個小時處理完的。我們簡單的算一下高峰期時候的qps

6億/3小時 =5.5萬/s qps=5.5萬

10億請求 * 50kb = 46T 每天需要存儲46T的數據

一般情況下,我們都會設置兩個副本 46T * 2 = 92T,Kafka裡面的數據是有保留的時間周期,保留最近3天的數據。

92T * 3天 = 276T

我這兒說的是50kb不是說一條消息就是50kb不是(把日誌合併了,多條日誌合併在一起),通常情況下,一條消息就幾b,也有可能就是幾百位元組。

11.2 物理機數量評估

1)首先分析一下是需要虛擬機還是物理機

像Kafka mysql hadoop這些集群搭建的時候,我們生產裡面都是使用物理機。

2)高峰期需要處理的請求總的請求每秒5.5萬個,其實一兩台物理機絕對是可以抗住的。一般情況下,我們評估機器的時候,是按照高峰期的4倍的去評估。

如果是4倍的話,大概我們集群的能力要準備到 20萬qps。這樣子的集群才是比較安全的集群。大概就需要5台物理機。每台承受4萬請求。

場景總結:

  • 搞定10億請求,高峰期5.5萬的qps,276T的數據,需要5台物理機。

11.3 磁碟選擇

搞定10億請求,高峰期5.5萬的qps,276T的數據,需要5台物理機。

1)SSD固態硬碟,還是需要普通的機械硬碟

  • SSD硬碟:性能比較好,但是價格貴
  • SAS盤:某方面性能不是很好,但是比較便宜。

SSD硬碟性能比較好,指的是它隨機讀寫的性能比較好。適合MySQL這樣集群。

但是其實他的順序寫的性能跟SAS盤差不多。

kafka的理解:就是用的順序寫。所以我們就用普通的【機械硬碟】就可以了。

2)需要我們評估每台伺服器需要多少塊磁碟

5台伺服器,一共需要276T ,大約每台伺服器 需要存儲60T的數據。我們公司裡面伺服器的配置用的是 11塊硬碟,每個硬碟 7T。11 * 7T = 77T

77T * 5 台伺服器 = 385T

場景總結:

  • 搞定10億請求,需要5台物理機,11(SAS) * 7T

11.4 記憶體評估 搞定10億請求,需要5台物理機,11(SAS) * 7T

我們發現kafka讀寫數據的流程 都是基於os cache,換句話說假設咱們的os cashe無限大那麼整個kafka是不是相當於就是基於記憶體去操作,如果是基於記憶體去操作,性能肯定很好。記憶體是有限的。

  • 儘可能多的記憶體資源要給 os cache
  • Kafka的代碼用 核心的代碼用的是scala寫的,客戶端的代碼java寫的。都是基於jvm。所以我們還要給一部分的記憶體給jvm。

Kafka的設計,沒有把很多數據結構都放在jvm裡面。所以我們的這個jvm不需要太大的記憶體。根據經驗,給個10G就可以了。

NameNode:jvm裡面還放了元數據(幾十G),JVM一定要給得很大。比如給個100G。

假設我們這個10請求的這個項目,一共會有100個topic。100 topic * 5 partition * 2 = 1000 partition

一個partition其實就是物理機上面的一個目錄,這個目錄下麵會有很多個.log的文件。

  • .log就是存儲數據文件,預設情況下一個.log文件的大小是1G。

我們如果要保證 1000個partition 的最新的.log 文件的數據 如果都在記憶體裡面,這個時候性能就是最好。1000 * 1G = 1000G記憶體.

我們只需要把當前最新的這個log 保證裡面的25%的最新的數據在記憶體裡面。250M * 1000 = 0.25 G* 1000 =250G的記憶體。

  • 250記憶體 / 5 = 50G記憶體
  • 50G+10G = 60G記憶體

64G的記憶體,另外的4G,操作系統本生是不是也需要記憶體。其實Kafka的jvm也可以不用給到10G這麼多。評估出來64G是可以的。當然如果能給到128G的記憶體的伺服器,那就最好。

我剛剛評估的時候用的都是一個topic是5個partition,但是如果是數據量比較大的topic,可能會有10個partition。

總結:

  • 搞定10億請求,需要5台物理機,11(SAS) * 7T ,需要64G的記憶體(128G更好)

11.5 CPU壓力評估

評估一下每台伺服器需要多少cpu core(資源很有限)

我們評估需要多少個cpu ,依據就是看我們的服務裡面有多少線程去跑。線程就是依托cpu 去運行的。如果我們的線程比較多,但是cpu core比較少,這樣的話,我們的機器負載就會很高,性能不就不好。

評估一下,kafka的一臺伺服器 啟動以後會有多少線程?

  • Acceptor線程 1
  • processor線程 3 6~9個線程
  • 處理請求線程 8個 32個線程
  • 定時清理的線程,拉取數據的線程,定時檢查ISR列表的機制 等等。

所以大概一個Kafka的服務啟動起來以後,會有一百多個線程。

  • cpu core = 4個,一遍來說,幾十個線程,就肯定把cpu 打滿了。
  • cpu core = 8個,應該很輕鬆的能支持幾十個線程。

如果我們的線程是100多個,或者差不多200個,那麼8 個 cpu core是搞不定的。

所以我們這兒建議:

  • CPU core = 16個。如果可以的話,能有32個cpu core 那就最好。

結論:

  • kafka集群,最低也要給16個cpu core,如果能給到32 cpu core那就更好。
  • 2cpu * 8 =16 cpu core
  • 4cpu * 8 = 32 cpu core

總結:

  • 搞定10億請求,需要5台物理機, 11(SAS) * 7T ,需要64G的記憶體(128G更好),需要16個cpu core(32個更好)

11.6 網路需求評估

評估我們需要什麼樣網卡?

一般要麼是千兆的網卡(1G/s),還有的就是萬兆的網卡(10G/s)

高峰期的時候 每秒會有5.5萬的請求涌入,5.5/5 = 大約是每台伺服器會有1萬個請求涌入。我們之前說的,10000 * 50kb = 488M 也就是每條伺服器,每秒要接受488M的數據。數據還要有副本,副本之間的同步,也是走的網路的請求。488 * 2 = 976m/s

說明一下:

  • 很多公司的數據,一個請求裡面是沒有50kb這麼大的,我們公司是因為主機在生產端封裝了數據,然後把多條數據合併在一起了,所以我們的一個請求才會有這麼大。
  • 一般情況下,網卡的帶寬是達不到極限的,如果是千兆的網卡,我們能用的一般就是700M左右。但是如果最好的情況,我們還是使用萬兆的網卡。
  • 如果使用的是萬兆的,那就是很輕鬆。

11.7 集群規劃

  • 請求量
  • 規劃物理機的個數
  • 分析磁碟的個數,選擇使用什麼樣的磁碟
  • 記憶體
  • cpu core
  • 網卡

就是告訴大家,以後要是公司裡面有什麼需求,進行資源的評估,伺服器的評估,大家按照我的思路去評估。

一條消息的大小 50kb -> 1kb 500byte 1M

ip 主機名

  • 192.168.0.100 hadoop1
  • 192.168.0.101 hadoop2
  • 192.168.0.102 hadoop3

主機的規劃:kafka集群架構的時候:主從式的架構:

  • controller -> 通過zk集群來管理整個集群的元數據。

zookeeper集群

  • hadoop1
  • hadoop2
  • hadoop3

kafka集群

  • 理論上來講,我們不應該把kafka的服務於zk的服務安裝在一起。
  • 但是我們這兒伺服器有限。所以我們kafka集群也是安裝在hadoop1 haadoop2 hadoop3

11.8 zookeeper集群搭建

11.9 核心參數詳解

11.10 集群壓力測試

12、kafka運維

12.1 常見運維工具介紹

KafkaManager — 頁面管理工具

12.2 常見運維命令

場景一:topic數據量太大,要增加topic數

一開始創建主題的時候,數據量不大,給的分區數不多。

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6

kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6

broker id:

  • hadoop1:0
  • hadoop2:1
  • hadoop3:2

假設一個partition有三個副本:partition0:

a,b,c

  • a:leader partition
  • b,c:follower partition
ISR:{a,b,c}

如果一個follower分區 超過10秒 沒有向leader partition去拉取數據,那麼這個分區就從ISR列表裡面移除。

場景二:核心topic增加副本因數

如果對核心業務數據需要增加副本因數

vim test.json腳本,將下麵一行json腳本保存

{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

執行上面json腳本:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

場景三:負載不均衡的topic,手動遷移

vi topics-to-move.json

{“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1}
// 把你所有的topic都寫在這裡

kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate
// 把你所有的包括新加入的broker機器都寫在這裡,就會說是把所有的partition均勻的分散在各個broker上,包括新進來的broker

此時會生成一個遷移方案,可以保存到一個文件里去:expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

這種數據遷移操作一定要在晚上低峰的時候來做,因為他會在機器之間遷移數據,非常的占用帶寬資源

  • generate: 根據給予的Topic列表和Broker列表生成遷移計劃。generate並不會真正進行消息遷移,而是將消息遷移計劃計算出來,供execute命令使用。
  • execute: 根據給予的消息遷移計划進行遷移。
  • verify: 檢查消息是否已經遷移完成。

場景四:如果某個broker leader partition過多

正常情況下,我們的leader partition在伺服器之間是負載均衡。

  • hadoop1 4
  • hadoop2 1
  • hadoop3 1

現在各個業務方可以自行申請創建topic,分區數量都是自動分配和後續動態調整的,kafka本身會自動把leader partition均勻分散在各個機器上,這樣可以保證每台機器的讀寫吞吐量都是均勻的。

但是也有例外,那就是如果某些broker宕機,會導致leader partition過於集中在其他少部分幾台broker上,這會導致少數幾台broker的讀寫請求壓力過高,其他宕機的broker重啟之後都是folloer partition,讀寫請求很低。

造成集群負載不均衡有一個參數,auto.leader.rebalance.enable,預設是true,每隔300秒(leader.imbalance.check.interval.seconds)檢查leader負載是否平衡

如果一臺broker上的不均衡的leader超過了10%,leader.imbalance.per.broker.percentage,就會對這個broker進行選舉。

配置參數:

  • auto.leader.rebalance.enable 預設是true
  • leader.imbalance.per.broker.percentage: 每個broker允許的不平衡的leader的比率。如果每個broker超過了這個值,控制器會觸發leader的平衡。這個值表示百分比。10%
  • leader.imbalance.check.interval.seconds:預設值300秒

13、Kafka生產者

13.1 消費者發送消息原理

13.2 消費者發送消息原理—基礎案例演示

13.3 如何提升吞吐量

如何提升吞吐量:參數一:buffer.memory:

設置發送消息的緩衝區,預設值是33554432,就是32MB

參數二:compression.type:

預設是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯的,壓縮之後可以減小數據量,提升吞吐量,但是會加大producer端的cpu開銷

參數三:batch.size:

  • 設置batch的大小,如果batch太小,會導致頻繁網路請求,吞吐量下降;
  • 如果batch太大,會導致一條消息需要等待很久才能被髮送出去,而且會讓記憶體緩衝區有很大壓力,過多數據緩衝在記憶體里,預設值是:16384,就是16kb,也就是一個batch滿了16kb就發送出去,一般在實際生產環境,這個batch的值可以增大一些來提升吞吐量,如果一個批次設置大了,會有延遲。一般根據一條消息大小來設置。
  • 如果我們消息比較少。配合使用的參數linger.ms,這個值預設是0,意思就是消息必須立即被髮送,但是這是不對的,一般設置一個100毫秒之類的,這樣的話就是說,這個消息被髮送出去後進入一個batch,如果100毫秒內,這個batch滿了16kb,自然就會發送出去。

13.4 如何處理異常

1、LeaderNotAvailableException:

這個就是如果某台機器掛了,此時leader副本不可用,會導致你寫入失敗,要等待其他follower副本切換為leader副本之後,才能繼續寫入,此時可以重試發送即可;如果說你平時重啟kafka的broker進程,肯定會導致leader切換,一定會導致你寫入報錯,是LeaderNotAvailableException

2、NotControllerException:

這個也是同理,如果說Controller所在Broker掛了,那麼此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可。

3、NetworkException:網路異常 timeout

  • 配置retries參數,他會自動重試的
  • 但是如果重試幾次之後還是不行,就會提供Exception給我們來處理了,我們獲取到異常以後,再對這個消息進行單獨處理。我們會有備用的鏈路。發送不成功的消息發送到Redis或者寫到文件系統中,甚至是丟棄。

13.5 重試機制

重試會帶來一些問題:

消息重覆

有的時候一些leader切換之類的問題,需要進行重試,設置retries即可,但是消息重試會導致,重覆發送的問題,比如說網路抖動一下導致他以為沒成功,就重試了,其實人家都成功了.

消息亂序消息重試是可能導致消息的亂序的,因為可能排在你後面的消息都發送出去了。所以可以使用" max.in.flight.requests.per.connection"參數設置為1,這樣可以保證producer同一時間只能發送一條消息。

兩次重試的間隔預設是100毫秒,用"retry.backoff.ms"來進行設置,基本上在開發過程中,靠重試機制基本就可以搞定95%的異常問題。

13.6 ACK參數詳解

producer端

request.required.acks=0;
  • 只要請求已發送出去,就算是發送完了,不關心有沒有寫成功。
  • 性能很好,如果是對一些日誌進行分析,可以承受丟數據的情況,用這個參數,性能會很好。
request.required.acks=1;
  • 發送一條消息,當leader partition寫入成功以後,才算寫入成功。
  • 不過這種方式也有丟數據的可能。
request.required.acks=-1;
  • 需要ISR列表裡面,所有副本都寫完以後,這條消息才算寫入成功。
  • ISR:1個副本。1 leader partition 1 follower partition

kafka服務端:

min.insync.replicas:1

如果我們不設置的話,預設這個值是1,一個leader partition會維護一個ISR列表,這個值就是限制ISR列表裡面,至少得有幾個副本,比如這個值是2,那麼當ISR列表裡面只有一個副本的時候。往這個分區插入數據的時候會報錯。

設計一個不丟數據的方案:

  • 分區副本 >=2
  • acks = -1
  • min.insync.replicas >=2

還有可能就是發送有異常:對異常進行處理

13.7 自定義分區

分區:

  • 沒有設置key

我們的消息就會被輪訓的發送到不同的分區。

  • 設置了key

kafka自帶的分區器,會根據key計算出來一個hash值,這個hash值會對應某一個分區。

如果key相同的,那麼hash值必然相同,key相同的值,必然是會被髮送到同一個分區。

但是有些比較特殊的時候,我們就需要自定義分區

public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List partitionInfoList = cluster.availablePartitionsForTopic(topic);
//獲取到分區的個數 0,1,2
int partitionCount = partitionInfoList.size();
//最後一個分區
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}

如何使用:

配置上這個類即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);

13.8 綜合案例演示

需求分析:

電商背景 -》 二手的電商平臺

【歡樂送】的項目,用戶購買了東西以後會有【星星】,用星星去換物品。一塊錢一個星星。

訂單系統(消息的生產),發送一條消息(支付訂單,取消訂單) -> Kafka <- 會員系統,從kafak裡面去消費數據,找到對應用戶消費的金額,然後給該用戶更新星星的數量。

分析一下:

發送消息的時候,可以指定key,也可以不指定key。

1)如果不指定key

  • zhangsan ->下訂單 -> 100 -> +100
  • zhangsan -> 取消訂單 -> -100 -> -100
  • 會員系統消費數據的時候,有可能先消費到的是 取消訂單的數據。

2)如果指定key,key -> hash(數字) -> 對應分區號 -> 發送到對應的分區裡面。

  • 如果key相同的 -> 數據肯定會被髮送到同一個分區(有序的)

這個項目需要指定key,把用戶的id指定為key.

14、Kafka消費者

14.1 消費組概念

groupid相同就屬於同一個消費組

1)每個consumer都要屬於一個consumer.group,就是一個消費組,topic的一個分區只會分配給一個消費組下的一個consumer來處理,每個consumer可能會分配多個分區,也有可能某個consumer沒有分配到任何分區。

2)如果想要實現一個廣播的效果,那隻需要使用不同的group id去消費就可以。

topicA:

  • partition0、partition1

groupA:

  • consumer1:消費 partition0
  • consuemr2:消費 partition1
  • consuemr3:消費不到數據

groupB:

  • consuemr3:消費到partition0和partition1

3)如果consumer group中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啟了,那麼又會把一些分區重新交還給他

14.2 基礎案例演示

14.3 偏移量管理

每個consumer記憶體里數據結構保存對每個topic的每個分區的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高併發請求zk是不合理的架構設計,zk是做分散式系統的協調的,輕量級的元數據存儲,不能負責高併發讀寫,作為數據存儲。

現在新的版本提交offset發送給kafka內部topic:__consumer_offsets,提交過去的時候, key是group.id+topic+分區號,value就是當前offset的值,每隔一段時間,kafka內部會對這個topic進行compact(合併),也就是每個group.id+topic+分區號就保留最新數據。

__consumer_offsets可能會接收高併發的請求,所以預設分區50個(leader partitiron -> 50 kafka),這樣如果你的kafka部署了一個大的集群,比如有50台機器,就可以用50台機器來抗offset提交的請求壓力。

  • 消費者 -> broker端的數據
  • message -> 磁碟 -> offset 順序遞增
  • 從哪兒開始消費?-> offset
  • 消費者(offset)

14.4 偏移量監控工具介紹

web頁面管理的一個管理軟體(kafka Manager)

  • 修改bin/kafka-run-class.sh腳本,第一行增加JMX_PORT=9988
  • 重啟kafka進程

另一個軟體:主要監控的consumer的偏移量。

就是一個jar包java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar

com.quantifind.kafka.offsetapp.OffsetGetterWeb
  • offsetStorage kafka \(根據版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper)
  • zk hadoop1:2181
  • port 9004
  • refresh 15.seconds
  • retain 2.days

寫了一段程式 ,消費kafka裡面的數據(consumer,處理數據 -> 業務代碼) -> Kafka 如何去判斷你的這段代碼真的是實時的去消費的呢?

延遲幾億條數據 -> 閾值(20萬條的時候 發送一個告警。)

14.5 消費異常感知

heartbeat.interval.ms

  • consumer心跳時間間隔,必須得與coordinator保持心跳才能知道consumer是否故障了,
  • 然後如果故障之後,就會通過心跳下發rebalance的指令給其他的consumer通知他們進行rebalance的操作

session.timeout.ms

  • kafka多長時間感知不到一個consumer就認為他故障了,預設是10秒

max.poll.interval.ms

  • 如果在兩次poll操作之間,超過了這個時間,那麼就會認為這個consume處理能力太弱了,會被踢出消費組,分區分配給別人去消費,一般來說結合業務處理的性能來設置就可以了。

14.6 核心參數解釋

fetch.max.bytes

獲取一條消息最大的位元組數,一般建議設置大一些,預設是1M 其實我們在之前多個地方都見到過這個類似的參數,意思就是說一條信息最大能多大?

  1. Producer:發送的數據,一條消息最大多大, -> 10M
  2. Broker:存儲數據,一條消息最大能接受多大 -> 10M
  3. Consumer:

max.poll.records:

一次poll返回消息的最大條數,預設是500條

connection.max.idle.ms

consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收

enable.auto.commit:

開啟自動提交偏移量

auto.commit.interval.ms:

每隔多久提交一次偏移量,預設值5000毫秒

auto.offset.reset

  • earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
  • latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
  • none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

14.7 綜合案例演示

引入案例:二手電筒商平臺(歡樂送),根據用戶消費的金額,對用戶星星進行累計。

  • 訂單系統(生產者) -> Kafka集群裡面發送了消息。
  • 會員系統(消費者) -> Kafak集群裡面消費消息,對消息進行處理。

14.8 group coordinator原理

面試題:消費者是如何實現rebalance的?— 根據coordinator實現

什麼是coordinator

每個consumer group都會選擇一個broker作為自己的coordinator,他是負責監控這個消費組裡的各個消費者的心跳,以及判斷是否宕機,然後開啟rebalance的

如何選擇coordinator機器

首先對groupId進行hash(數字),接著對__consumer_offsets的分區數量取模,預設是50,_consumer_offsets的分區數可以通過offsets.topic.num.partitions來設置,找到分區以後,這個分區所在的broker機器就是coordinator機器。

比如說:groupId,“myconsumer_group” -> hash值(數字)-> 對50取模 -> 8__consumer_offsets 這個主題的8號分區在哪台broker上面,那一臺就是coordinator 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區去提交offset,

運行流程

  • 每個consumer都發送JoinGroup請求到Coordinator,
  • 然後Coordinator從一個consumer group中選擇一個consumer作為leader,
  • 把consumer group情況發送給這個leader,
  • 接著這個leader會負責制定消費方案,
  • 通過SyncGroup發給Coordinator
  • 接著Coordinator就把消費方案下發給各個consumer,他們會從指定的分區的

leader broker開始進行socket連接以及消費消息

14.9 rebalance策略

consumer group靠coordinator實現了Rebalance

這裡有三種rebalance的策略:range、round-robin、sticky

比如我們消費的一個主題有12個分區:

p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假設我們的消費者組裡面有三個消費者

range策略

  • range策略就是按照partiton的序號範圍
  • p0~3 consumer1
  • p4~7 consumer2
  • p8~11 consumer3
  • 預設就是這個策略;

round-robin策略

  • 就是輪詢分配
  • consumer1:0,3,6,9
  • consumer2:1,4,7,10
  • consumer3:2,5,8,11

但是前面的這兩個方案有個問題:12 -> 2 每個消費者會消費6個分區

假設consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3,這樣的話,原本在consumer2上的的p6,p7分區就被分配到了 consumer3上。

sticky策略

最新的一個sticky策略,就是說儘可能保證在rebalance的時候,讓原本屬於這個consumer的分區還是屬於他們,然後把多餘的分區再均勻分配過去,這樣儘可能維持原來的分區分配的策略

  • consumer1:0-3
  • consumer2: 4-7
  • consumer3: 8-11

假設consumer3掛了

  • consumer1:0-3,+8,9
  • consumer2: 4-7,+10,11

15、Broker管理

15.1 Leo、hw含義

  • Kafka的核心原理
  • 如何去評估一個集群資源
  • 搭建了一套kafka集群 -》 介紹了簡單的一些運維管理的操作。
  • 生產者(使用,核心的參數)
  • 消費者(原理,使用的,核心參數)
  • broker內部的一些原理,核心的概念:LEO,HW

LEO:是跟offset偏移量有關係。

LEO:

在kafka裡面,無論leader partition還是follower partition統一都稱作副本(replica)。

每次partition接收到一條消息,都會更新自己的LEO,也就是log end offset,LEO其實就是最新的offset + 1

HW:高水位

LEO有一個很重要的功能就是更新HW,如果follower和leader的LEO同步了,此時HW就可以更新

HW之前的數據對消費者是可見,消息屬於commit狀態。HW之後的消息消費者消費不到。

15.2 Leo更新

15.3 hw更新

15.4 controller如何管理整個集群

1: 競爭controller的

  • /controller/id

2:controller服務監聽的目錄:

  • /broker/ids/ 用來感知 broker上下線
  • /broker/topics/ 創建主題,我們當時創建主題命令,提供的參數,ZK地址。
  • /admin/reassign_partitions 分區重分配

15.5 延時任務

kafka的延遲調度機制(擴展知識)

我們先看一下kafka裡面哪些地方需要有任務要進行延遲調度。

第一類延時的任務:

比如說producer的acks=-1,必須等待leader和follower都寫完才能返迴響應。

有一個超時時間,預設是30秒(request.timeout.ms)。

所以需要在寫入一條數據到leader磁碟之後,就必須有一個延時任務,到期時間是30秒延時任務 放到DelayedOperationPurgatory(延時管理器)中。

假如在30秒之前如果所有follower都寫入副本到本地磁碟了,那麼這個任務就會被自動觸發蘇醒,就可以返迴響應結果給客戶端了,否則的話,這個延時任務自己指定了最多是30秒到期,如果到了超時時間都沒等到,就直接超時返回異常。

第二類延時的任務:

follower往leader拉取消息的時候,如果發現是空的,此時會創建一個延時拉取任務

延時時間到了之後(比如到了100ms),就給follower返回一個空的數據,然後follower再次發送請求讀取消息,但是如果延時的過程中(還沒到100ms),leader寫入了消息,這個任務就會自動蘇醒,自動執行拉取任務。

海量的延時任務,需要去調度。

15.6 時間輪機制

1.什麼會有要設計時間輪?

Kafka內部有很多延時任務,沒有基於JDK Timer來實現,那個插入和刪除任務的時間複雜度是O(nlogn),而是基於了自己寫的時間輪來實現的,時間複雜度是O(1),依靠時間輪機制,延時任務插入和刪除,O(1)

2.時間輪是什麼?

其實時間輪說白其實就是一個數組。

  • tickMs:時間輪間隔 1ms

  • wheelSize:時間輪大小 20

  • interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms

  • currentTime:當時時間的指針。

    • a:因為時間輪是一個數組,所以要獲取裡面數據的時候,靠的是index,時間複雜度是O(1)
    • b:數組某個位置上對應的任務,用的是雙向鏈表存儲的,往雙向鏈表裡面插入,刪除任務,時間複雜度也是O(1)

3.多層級的時間輪

比如:要插入一個110毫秒以後運行的任務。

  • tickMs:時間輪間隔 20ms

  • wheelSize:時間輪大小 20

  • interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms

  • currentTime:當時時間的指針。

    • 第一層時間輪:1ms * 20
    • 第二層時間輪:20ms * 20
    • 第三層時間輪:400ms * 20

作者:erainm
來源:blog.csdn.net/eraining/article/details/115860664

近期熱文推薦:

1.1,000+ 道 Java面試題及答案整理(2022最新版)

2.勁爆!Java 協程要來了。。。

3.Spring Boot 2.x 教程,太全了!

4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優雅的方式!!

5.《Java開發手冊(嵩山版)》最新發佈,速速下載!

覺得不錯,別忘了隨手點贊+轉發哦!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 環境: SpringBoot2.7.8 背景: 在增加出庫訂單時需要對物品表的的數量進行修改 因此我在OutboundController中創建了幾個公共方法,並將其註入到Spring中,結果給我報了這一串錯誤。 Description:The dependencies of some of the ...
  • [TOC] 本文主要介紹ImGui應用中的一些界面優化方法,如果是第一次使用ImGui推薦從上一篇文章開始:[使用C++界面框架ImGUI開發一個簡單程式](https://www.cnblogs.com/timefiles/p/17632348.html),最終的界面效果如下: ![image]( ...
  • ## 所有類的基類 Object Lua 沒有嚴格的 oo(Object-Oriented)定義,可以利用元表特性來實現 先定義所有類的基類,即`Object`類。代碼順序從上到下,自成一體。[完整代碼](#oo.lua) 定義一個空表 `Object` ,`__index` 指向其自身(繼承將直接 ...
  • 本節我們一起學習一下SpringBoot中的非同步調用,主要用於優化耗時較長的操作,提高系統性能和吞吐量。 # 一、新建項目,啟動非同步調用 首先給啟動類增加註解@EnableAsync,支持非同步調用 ``` @EnableAsync @SpringBootApplication public clas ...
  • 一、福祿平臺介紹 產品介紹 開放平臺提供專業的數字權益商品標準化介面和免費接入服務,數字權益商品涵蓋話費、流量、游戲、Q幣、視頻會員、加油卡、禮品卡等多種品類,可滿足使用者多方面的業務需求,豐富企業的產品內容、提升競爭優勢。 產品功能 商品管理:提供API商戶可以進行對接的商品類目和編號。 訂單管理 ...
  • 虛擬機是如何調用方法的內容已經講解完畢,從本節開始,我們來探討虛擬機是如何執行方法中的位元組碼指令的。上文中提到過,許多Java虛擬機的執行引擎在執行Java代碼的時候都有解釋執行(通過解釋器執行)和編譯執行(通過即時編譯器產生本地代碼執行)兩種選擇,在本章中,我們先來探討一下在解釋執行時,虛擬機執行 ...
  • 最近朋友需要一個人證比對軟體需要實現以下功能: 1. 通過攝像頭實時採集人臉圖像 2. 通過身份證讀卡器採集身份證信息 和 身份證照片 3. 使用實時人臉照片 和 身份證照片做相似度比對 4. 比對後返回相似度,或者返回同一人,非同一人 5. 實時採集照片 和 身份證信息照片存檔,方便以後查閱 ## ...
  • **註**:本文是根據官方網站翻譯得來,其中做了部分修改用於理解文章字義。 # mojo介紹 Mojo被設計為Python的超集,因此許多語言功能和你可能在Python中知道的概念可以直接翻譯成Mojo。例如一個 Mojo中的“Hello World”程式看起來和Python一模一樣: ``` pr ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...