解耦、削峰:傳統的方式上游發送數據下游需要實時接收,如果上游在某些業務場景:例如上午十點會流量激增至頂峰,那麼下游資源可能會扛不住壓力。但如果使用消息隊列,就可以將消息暫存在消息管道中,下游可以按照自己的速度逐步處理; ...
kafka的使用場景
為什麼要使用 Kafka 消息隊列?
解耦、削峰:傳統的方式上游發送數據下游需要實時接收,如果上游在某些業務場景:例如上午十點會流量激增至頂峰,那麼下游資源可能會扛不住壓力。但如果使用消息隊列,就可以將消息暫存在消息管道中,下游可以按照自己的速度逐步處理;
可擴展:通過橫向擴展生產者、消費者和broker, Kafka可以輕鬆處理巨大的消息流;
高吞吐、低延遲:在一臺普通的伺服器上既可以達到10W/s的吞吐速率;
容災性:kafka通過副本replication的設置和leader/follower的容災機制保障了消息的安全性。
kafka的高吞吐、低延遲是如何實現的?
1.順序讀寫
Kafka使用磁碟順序讀寫來提升性能
順序讀寫和隨機讀寫性能對比:
順序讀 | 隨機讀 | 順序寫 | 隨機寫 | |
---|---|---|---|---|
機械硬碟 | 84.0MB/s | 0.033MB/s (512位元組) | 79.0MB/s | 0.083MB/s (512位元組) |
固態硬碟 | 220.7MB/s | 5.296MB/s(512位元組) | 77.2MB/s | 10.203MB/s(512位元組) |
從數據可以看出磁碟的順序讀寫速度遠高於隨機讀寫的速度,這是因為傳統的磁頭探針結構,隨機讀寫時需要頻繁尋道,也就需要磁頭和探針頻繁的轉動,而機械結構的磁頭和探針的位置調整是十分費時的,這就嚴重影響到硬碟的定址速度,進而影響到隨機寫入速度。
Kafka的message是不斷追加到本地磁碟文件末尾的,而不是隨機的寫入,這使得Kafka寫入吞吐量得到了顯著提升 。每一個Partition其實都是一個文件 ,收到消息後Kafka會把數據插入到文件末尾。
2.頁緩存(pageCache)
PageCache是系統級別的緩存,它把儘可能多的空閑記憶體當作磁碟緩存使用來進一步提高IO效率;
PageCache同時可以避免在JVM內部緩存數據,避免不必要的GC、以及記憶體空間占用。對於In-Process Cache,如果Kafka重啟,它會失效,而操作系統管理的PageCache依然可以繼續使用。
- producer把消息發到broker後,數據並不是直接落入磁碟的,而是先進入PageCache。PageCache中的數據會被內核中的處理線程採用同步或非同步的方式定期刷盤至磁碟。
- Consumer消費消息時,會先從PageCache獲取消息,獲取不到才回去磁碟讀取,並且會預讀出一些相鄰的塊放入PageCache,以方便下一次讀取。
- 如果Kafka producer的生產速率與consumer的消費速率相差不大,那麼幾乎只靠對broker PageCache的讀寫就能完成整個生產和消費過程,磁碟訪問非常少
3.零拷貝
正常過程:
-
操作系統將數據從磁碟上讀入到內核空間的讀緩衝區中
-
應用程式(也就是Kafka)從內核空間的讀緩衝區將數據拷貝到用戶空間的緩衝區中
-
應用程式將數據從用戶空間的緩衝區再寫回到內核空間的socket緩衝區中
-
操作系統將socket緩衝區中的數據拷貝到NIC緩衝區中,然後通過網路發送給客戶端
在這個過程中,可以發現, 數據從磁碟到最終發出去,要經歷4次拷貝,而在這四次拷貝過程中, 有兩次拷貝是浪費的。
1.從內核空間拷貝到用戶空間;
2.從用戶空間再次拷貝到內核空間;
除此之外,由於用戶空間和內核空間的切換,會帶來Cpu上下文切換,對於Cpu的性能也會造成影響;
零拷貝省略了數據在內核空間和用戶空間之間的重覆穿梭;用戶態和內核態切換時產生中斷,耗時;
4.分區分段索引
Kafka的message是按topic分類存儲的,topic中的數據又是按照一個一個的partition即分區存儲到不同broker節點。每個partition對應了操作系統上的一個文件夾,partition實際上又是按照segment分段存儲的。符合分散式系統分區分桶的設計思想。
通過這種分區分段的設計,Kafka的message消息實際上是分散式存儲在一個一個小的segment中的,每次文件操作也是直接操作的segment。為了進一步的查詢優化,Kafka又預設為分段後的數據文件建立了索引文件,就是文件系統上的.index文件。這種分區分段+索引的設計,不僅提升了數據讀取的效率,同時也提高了數據操作的並行度。
5.批量處理
kafka發送消息不是一條一條發送的,而是批量發送,很大的提高了發送消息的吞吐量。
假設發送一條消息的時間是1ms,而此時的吞吐量就是1000TPS。但是假如我們將消息批量發送,1000條消息需要10ms,而此時的吞吐量就達到了1000*100TPS。而且這樣也很大程度的減少了請求Broker的次數,提升了總體的效率。
kafka架構
基本概念
名詞 | 概念 |
---|---|
Producer | 生產者(發送消息) |
Consumer | 消費者(接收消息) |
ConsumerGroup | 消費者組,可以並行消費同一topic中的消息 |
Broker | 一個獨立的kafka伺服器被稱為broker。broker接收來自生產者的消息,為消息設置偏移量,並提交消息到磁碟保存。broker為消費者提供服務,對讀取分區的請求作出相應,返回已經提交到磁碟上的消息。可起到負載均衡、容錯的作用。 |
Topic | 主題,一個隊列,可理解為按照消息的邏輯分類將消息劃分為不同的topic |
Partition | topic的物理分組,一個topic可以分為多個partition,每個partition是一個有序隊列。可起到提高可擴展性,應對高併發場景的作用。 |
replica | 副本,為保證集群的高可用性,kafka提供副本機制,一個topic的每個分區都有若幹個副本,一個leader和若幹個follower |
leader | 每個分區多個副本的主節點,生產者發送數據的對象,以及消費者消費數據的對象都是leader |
offset | 對於Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中對應的位置。 |
架構圖
Q1:Topic的分區及副本在broker上是如何分配的呢?
這裡涉及到兩個參數:
startIndex:第一個分區的第一個副本放置位置(P0-R0)
nextReplicaShift:其他分區的副本的放置是依次後移的,間隔距離就是 nextReplicaShift 值。
Q2:Kafka的架構是基於什麼設計思想呢?
分治思想:
1. topic分治:對於kafka的topic,我們在創建之初可以設置多個partition來存放數據,對於同一個topic的數據,每條數據的key通過哈希取模被路由到不同的partition中(如果沒有設置key,則根據消息本身取模),以此達到分治的目的。
2. partition分治:為了方便數據的消費,kafka將原始的數據轉化為”索引+數據“的形式進行分治,將一個partition對應一個文件轉變為一個partition對應多個人不同類型的文件,分別為:
- .index文件:索引文件,用來記錄log文件中真實消息的相對偏移量和物理位置,為了減少索引文件的大小,使用了稀疏索引
- .log文件:用來記錄producer寫入的真實消息,即消息體本身;
- .timeindex文件:時間索引文件,類比.index文件,用來記錄log文件中真實消息寫入的時間情況,跟offset索引文件功能一樣,只不過這個以時間作為索引,用來快速定位目標時間點的數據;
3. 底層文件分治:不能將partition全部文件都放入一套 ”.index+.log+.timeindex“ 文件中,因此需要對文件進行拆分。kafka對單個.index文件、.timeindex文件、.log文件的大小都有限定(通過不同參數配置),且這3個文件互為一組。當.log文件的大小達到閾值則會自動拆分形成一組新的文件,這種將數據拆分成多個的小文件叫做segment,一個log文件代表一個segment。
kafka工作流程
生產流程:
-
先從zk獲取對應分區的leader在哪個broker
-
broker進程上的leader將消息寫入到本地log中
-
follower從leader上拉取消息,寫入到本地log,並向leader發送ACK
-
leader接收到所有的ISR中的Replica的ACK後,並向生產者返回ACK
消費流程:
-
每個consumer都可以根據分配策略,獲得要消費的分區
-
獲取到consumer對應的leader處於哪個broker以及offset
-
拉取數據
-
消費者提交offset
分區策略
相信上面的內容已經讓大家大致瞭解了消息生產及消費的過程:一個topic內的消息會被髮送到不同的分區以供不同的消費者拉取消息。
那麼在這個過程中就涉及到了兩個問題:
-
生產者按照什麼策略將數據分配到分區中呢?
-
消費者按照什麼策略去不同的分區拉取消息呢?
生產者分區策略
生產者寫入消息到topic,Kafka將依據不同的策略將數據分配到不同的分區中:
1. 輪詢分區策略
即按消息順序進行分區順序分配,是預設的策略,也是使用最多的策略,可以最大限度保證所有消息平均分配到一個分區;
key為null,則使用輪詢演算法均衡地分配分區;
2. 按key分區分配策略
key不為null,key.hash() % n
但是按照key決定分區有可能會造成數據傾斜
3. 隨機分區策略
隨機分區,不建議使用
4. 自定義分區策略
根據業務需要制定以分區策略
亂序問題在Kafka中生產者是有寫入策略,如果topic有多個分區,就會將數據分散在不同的partition中存儲當partition數量大於1的時候,數據(消息)會打散分佈在不同的partition中如果只有一個分區,消息是有序的
消費者分區策略
同一時刻,一條消息只能被組中的一個消費者實例消費:
-
消費者數=分區數:一個分區對應一個消費者
-
消費者數<分區數:一個消費者對應多個分區
-
消費者數>分區數:多出來的消費者將不會消費任何消息
分區分配策略:保障每個消費者儘量能夠均衡地消費分區的數據,不能出現某個消費者消費分區的數量特別多,某個消費者消費的分區特別少
1. Range分配策略(範圍分配策略):Kafka預設的分配策略
計算公式:
n=分區數量/消費者數量
m=分區數量%消費者數量
前m個消費者消費n+1個,剩餘消費者消費n個
以上圖為例:n=8/3=2m=8%3=2因此前2個消費者消費2+1=3個分區,剩下1個消費者消費2個分區
2. RoundRobin分配策略(輪詢分配策略)
消費者挨個分配消費的分區:
如下圖,3個消費者共同消費8個分區
第一輪:Consumer0-->A-Partition0;Consumer1-->A-Partition1;Consumer2-->A-Partition2
第二輪:Consumer0-->A-Partition3;Consumer1-->B-Partition0;Consumer2-->B-Partition1
第三輪:Consumer0-->B-Partition2;Consumer1-->B-Partition3
3. Striky粘性分配策略
在沒有發生rebalance跟輪詢分配策略是一致的
發生了rebalance(例如Consumer2故障宕機),輪詢分配策略,重新走一遍輪詢分配的過程。而粘性會保證跟上一次的儘量一致,只是將新的需要分配的分區,均勻的分配到現有可用的消費者中即可,這樣就減少了上下文的切換
副本的ACK機制
producer是不斷地往Kafka中寫入數據,寫入數據會有一個返回結果,表示是否寫入成功。這裡對應有一個ACKs的配置。
- acks = 0:生產者只管寫入,不管是否寫入成功,可能會數據丟失。性能是最好的
- acks = 1:生產者會等到leader分區寫入成功後,返回成功,接著發送下一條
- acks = -1/all:確保消息寫入到leader分區、還確保消息寫入到對應副本都成功後,接著發送下一條,性能是最差的
根據業務情況來選擇ack機制,是要求性能最高,一部分數據丟失影響不大,可以選擇0/1。如果要求數據一定不能丟失,就得配置為-1/all。
分區中是有leader和follower的概念,為了確保消費者消費的數據是一致的,只能從分區leader去讀寫消息,follower做的事情就是同步數據。
Q&A:
1. offset存在哪裡?
0.9版本前預設存在zk,但是由於頻繁訪問zk,zk需要一個一個節點更新offset,不能批量或分組更新,導致offset更新成了瓶頸。
在新版 Kafka 以及之後的版本,Kafka 消費的offset都會預設存放在 Kafka 集群中的一個叫 __consumer_offsets 的topic中。offset以消息形式發送到該topic並保存在broker中。這樣consumer提交offset時,只需連接到broker,不用訪問zk,避免了zk節點更新瓶頸。
2.leader選舉策略?
2種leader:①broker的leader即controller leader ② partition的leader
-
Controller leader:當broker啟動的時候,都會創建KafkaController對象,但是集群中只能有一個leader對外提供服務,這些每個節點上的KafkaController會在指定的zookeeper路徑下創建臨時節點,只有第一個成功創建的節點的KafkaController才可以成為leader,其餘的都是follower。當leader故障後,所有的follower會收到通知,再次競爭在該路徑下創建節點從而選舉新的leader
-
Partition leader :由controller leader執行
- 從Zookeeper中讀取當前分區的所有ISR(in-sync replicas)集合
- 調用配置的分區選擇演算法選擇分區的leader
如何處理所有Replica都不工作?在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:等待ISR中的任一個Replica“活”過來,並且選它作為Leader選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。
3.分區可以提高擴展性以及吞吐量,那分區越多越好嗎?
分區並不是越多越好,分區過多存在著以下弊端:
- producer記憶體成本增大、consumer線程開銷增大partition是kafka管理數據的基本邏輯組織單元,越多的partition意味著越多的數據存儲文件(一個partition對應至少3個數據文件)
- 分區增多數據連續性下降
- 可用性降低
4.與資料庫相比kafka的優勢?
- 業務場景不同,底層數據結構不同,kafka數據存儲對於功能要求較少,因此讀寫更快kafka存儲數據(消息本身)的文件的數據結構是數組,數組的特點就是:數據間位置連續,如果按照順序讀取,或者追加寫入的話,其時間複雜度為O(1),效率最高。
5.消費偏移的更新方式 無論是kafka預設api,還是java的api,offset的更新方式都有兩種:自動提交和手動提交
- 自動提交(預設方式)
Kafka中偏移量的自動提交是由參數enable_auto_commit和auto_commit_interval_ms控制的,當enable_auto_commit=True時,Kafka在消費的過程中會以頻率為auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量提交,具體提交到哪個Partation:Math.abs(groupID.hashCode()) % numPartitions。
這種方式也被稱為at most once,fetch到消息後就可以更新offset,無論是否消費成功。
2. 手動提交
鑒於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證消息不被重覆消費以及消息不被丟失。
對於手動提交offset主要有3種方式:
同步提交:提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束,同步方式下消費者線程在拉取消息會被阻塞,在broker對提交的請求做出響應之前,會一直阻塞直到偏移量提交操作成功或者在提交過程中發生異常,限制了消息的吞吐量。
非同步提交:非同步手動提交offset時,消費者線程不會阻塞,提交失敗的時候也不會進行重試,並且可以配合回調函數在broker做出響應的時候記錄錯誤信息。 對於非同步提交,由於不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,如果偏移量還未提交就會造成偏移量丟失。
非同步+同步:針對非同步提交偏移量丟失的問題,通過對消費者進行非同步批次提交並且在關閉時同步提交的方式,這樣即使上一次的非同步提交失敗,通過同步提交還能夠進行補救,同步會一直重試,直到提交成功。 通過finally在最後不管是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失
參考資料:
https://www.jianshu.com/p/90a15fe33551
https://www.jianshu.com/p/da3f19cee0e2
https://www.modb.pro/db/373502
https://blog.csdn.net/anryg/article/details/123579937
https://toutiao.io/posts/ptwuho/preview
作者:京東科技 於添馨
來源:京東雲開發者社區 轉載請註明來源