掃盲Kafka?看這一篇就夠了!

来源:https://www.cnblogs.com/jingdongkeji/archive/2023/12/06/17879177.html
-Advertisement-
Play Games

解耦、削峰:傳統的方式上游發送數據下游需要實時接收,如果上游在某些業務場景:例如上午十點會流量激增至頂峰,那麼下游資源可能會扛不住壓力。但如果使用消息隊列,就可以將消息暫存在消息管道中,下游可以按照自己的速度逐步處理; ...


kafka的使用場景

為什麼要使用 Kafka 消息隊列?

解耦、削峰:傳統的方式上游發送數據下游需要實時接收,如果上游在某些業務場景:例如上午十點會流量激增至頂峰,那麼下游資源可能會扛不住壓力。但如果使用消息隊列,就可以將消息暫存在消息管道中,下游可以按照自己的速度逐步處理;

可擴展:通過橫向擴展生產者、消費者和broker, Kafka可以輕鬆處理巨大的消息流;

高吞吐、低延遲:在一臺普通的伺服器上既可以達到10W/s的吞吐速率;

容災性:kafka通過副本replication的設置和leader/follower的容災機制保障了消息的安全性。

kafka的高吞吐、低延遲是如何實現的?

1.順序讀寫

Kafka使用磁碟順序讀寫來提升性能

順序讀寫和隨機讀寫性能對比:

順序讀 隨機讀 順序寫 隨機寫
機械硬碟 84.0MB/s 0.033MB/s (512位元組) 79.0MB/s 0.083MB/s (512位元組)
固態硬碟 220.7MB/s 5.296MB/s(512位元組) 77.2MB/s 10.203MB/s(512位元組)

從數據可以看出磁碟的順序讀寫速度遠高於隨機讀寫的速度,這是因為傳統的磁頭探針結構,隨機讀寫時需要頻繁尋道,也就需要磁頭和探針頻繁的轉動,而機械結構的磁頭和探針的位置調整是十分費時的,這就嚴重影響到硬碟的定址速度,進而影響到隨機寫入速度。

Kafka的message是不斷追加到本地磁碟文件末尾的,而不是隨機的寫入,這使得Kafka寫入吞吐量得到了顯著提升 。每一個Partition其實都是一個文件 ,收到消息後Kafka會把數據插入到文件末尾。

2.頁緩存(pageCache)

PageCache是系統級別的緩存,它把儘可能多的空閑記憶體當作磁碟緩存使用來進一步提高IO效率;

PageCache同時可以避免在JVM內部緩存數據,避免不必要的GC、以及記憶體空間占用。對於In-Process Cache,如果Kafka重啟,它會失效,而操作系統管理的PageCache依然可以繼續使用。

  • producer把消息發到broker後,數據並不是直接落入磁碟的,而是先進入PageCache。PageCache中的數據會被內核中的處理線程採用同步或非同步的方式定期刷盤至磁碟。
  • Consumer消費消息時,會先從PageCache獲取消息,獲取不到才回去磁碟讀取,並且會預讀出一些相鄰的塊放入PageCache,以方便下一次讀取。
  • 如果Kafka producer的生產速率與consumer的消費速率相差不大,那麼幾乎只靠對broker PageCache的讀寫就能完成整個生產和消費過程,磁碟訪問非常少

3.零拷貝

正常過程:

  1. 操作系統將數據從磁碟上讀入到內核空間的讀緩衝區中

  2. 應用程式(也就是Kafka)從內核空間的讀緩衝區將數據拷貝到用戶空間的緩衝區中

  3. 應用程式將數據從用戶空間的緩衝區再寫回到內核空間的socket緩衝區中

  4. 操作系統將socket緩衝區中的數據拷貝到NIC緩衝區中,然後通過網路發送給客戶端

在這個過程中,可以發現, 數據從磁碟到最終發出去,要經歷4次拷貝,而在這四次拷貝過程中, 有兩次拷貝是浪費的。

1.從內核空間拷貝到用戶空間;

2.從用戶空間再次拷貝到內核空間;

除此之外,由於用戶空間和內核空間的切換,會帶來Cpu上下文切換,對於Cpu的性能也會造成影響;

零拷貝省略了數據在內核空間和用戶空間之間的重覆穿梭;用戶態和內核態切換時產生中斷,耗時;

4.分區分段索引

Kafka的message是按topic分類存儲的,topic中的數據又是按照一個一個的partition即分區存儲到不同broker節點。每個partition對應了操作系統上的一個文件夾,partition實際上又是按照segment分段存儲的。符合分散式系統分區分桶的設計思想

通過這種分區分段的設計,Kafka的message消息實際上是分散式存儲在一個一個小的segment中的,每次文件操作也是直接操作的segment。為了進一步的查詢優化,Kafka又預設為分段後的數據文件建立了索引文件,就是文件系統上的.index文件。這種分區分段+索引的設計,不僅提升了數據讀取的效率,同時也提高了數據操作的並行度。

5.批量處理

kafka發送消息不是一條一條發送的,而是批量發送,很大的提高了發送消息的吞吐量。

假設發送一條消息的時間是1ms,而此時的吞吐量就是1000TPS。但是假如我們將消息批量發送,1000條消息需要10ms,而此時的吞吐量就達到了1000*100TPS。而且這樣也很大程度的減少了請求Broker的次數,提升了總體的效率。

kafka架構

基本概念

名詞 概念
Producer 生產者(發送消息)
Consumer 消費者(接收消息)
ConsumerGroup 消費者組,可以並行消費同一topic中的消息
Broker 一個獨立的kafka伺服器被稱為broker。broker接收來自生產者的消息,為消息設置偏移量,並提交消息到磁碟保存。broker為消費者提供服務,對讀取分區的請求作出相應,返回已經提交到磁碟上的消息。可起到負載均衡、容錯的作用。
Topic 主題,一個隊列,可理解為按照消息的邏輯分類將消息劃分為不同的topic
Partition topic的物理分組,一個topic可以分為多個partition,每個partition是一個有序隊列。可起到提高可擴展性,應對高併發場景的作用。
replica 副本,為保證集群的高可用性,kafka提供副本機制,一個topic的每個分區都有若幹個副本,一個leader和若幹個follower
leader 每個分區多個副本的主節點,生產者發送數據的對象,以及消費者消費數據的對象都是leader
offset 對於Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中對應的位置。

架構圖

Q1:Topic的分區及副本在broker上是如何分配的呢?

這裡涉及到兩個參數:

startIndex:第一個分區的第一個副本放置位置(P0-R0)

nextReplicaShift:其他分區的副本的放置是依次後移的,間隔距離就是 nextReplicaShift 值。

Q2:Kafka的架構是基於什麼設計思想呢?

分治思想:

1. topic分治:對於kafka的topic,我們在創建之初可以設置多個partition來存放數據,對於同一個topic的數據,每條數據的key通過哈希取模被路由到不同的partition中(如果沒有設置key,則根據消息本身取模),以此達到分治的目的。

2. partition分治:為了方便數據的消費,kafka將原始的數據轉化為”索引+數據“的形式進行分治,將一個partition對應一個文件轉變為一個partition對應多個人不同類型的文件,分別為:

  • .index文件:索引文件,用來記錄log文件中真實消息的相對偏移量和物理位置,為了減少索引文件的大小,使用了稀疏索引
  • .log文件:用來記錄producer寫入的真實消息,即消息體本身;
  • .timeindex文件:時間索引文件,類比.index文件,用來記錄log文件中真實消息寫入的時間情況,跟offset索引文件功能一樣,只不過這個以時間作為索引,用來快速定位目標時間點的數據;

3. 底層文件分治:不能將partition全部文件都放入一套 ”.index+.log+.timeindex“ 文件中,因此需要對文件進行拆分。kafka對單個.index文件、.timeindex文件、.log文件的大小都有限定(通過不同參數配置),且這3個文件互為一組。當.log文件的大小達到閾值則會自動拆分形成一組新的文件,這種將數據拆分成多個的小文件叫做segment,一個log文件代表一個segment。

kafka工作流程

生產流程:

  1. 先從zk獲取對應分區的leader在哪個broker

  2. broker進程上的leader將消息寫入到本地log中

  3. follower從leader上拉取消息,寫入到本地log,並向leader發送ACK

  4. leader接收到所有的ISR中的Replica的ACK後,並向生產者返回ACK

消費流程:

  1. 每個consumer都可以根據分配策略,獲得要消費的分區

  2. 獲取到consumer對應的leader處於哪個broker以及offset

  3. 拉取數據

  4. 消費者提交offset

分區策略

相信上面的內容已經讓大家大致瞭解了消息生產及消費的過程:一個topic內的消息會被髮送到不同的分區以供不同的消費者拉取消息。

那麼在這個過程中就涉及到了兩個問題:

  1. 生產者按照什麼策略將數據分配到分區中呢?

  2. 消費者按照什麼策略去不同的分區拉取消息呢?

生產者分區策略

生產者寫入消息到topic,Kafka將依據不同的策略將數據分配到不同的分區中:

1. 輪詢分區策略

即按消息順序進行分區順序分配,是預設的策略,也是使用最多的策略,可以最大限度保證所有消息平均分配到一個分區;

key為null,則使用輪詢演算法均衡地分配分區;

2. 按key分區分配策略

key不為null,key.hash() % n

但是按照key決定分區有可能會造成數據傾斜

3. 隨機分區策略

隨機分區,不建議使用

4. 自定義分區策略

根據業務需要制定以分區策略

亂序問題在Kafka中生產者是有寫入策略,如果topic有多個分區,就會將數據分散在不同的partition中存儲當partition數量大於1的時候,數據(消息)會打散分佈在不同的partition中如果只有一個分區,消息是有序的

消費者分區策略

同一時刻,一條消息只能被組中的一個消費者實例消費:

  1. 消費者數=分區數:一個分區對應一個消費者

  2. 消費者數<分區數:一個消費者對應多個分區

  3. 消費者數>分區數:多出來的消費者將不會消費任何消息

分區分配策略:保障每個消費者儘量能夠均衡地消費分區的數據,不能出現某個消費者消費分區的數量特別多,某個消費者消費的分區特別少

1. Range分配策略(範圍分配策略):Kafka預設的分配策略

計算公式:

n=分區數量/消費者數量

m=分區數量%消費者數量

前m個消費者消費n+1個,剩餘消費者消費n個

以上圖為例:n=8/3=2m=8%3=2因此前2個消費者消費2+1=3個分區,剩下1個消費者消費2個分區

2. RoundRobin分配策略(輪詢分配策略)

消費者挨個分配消費的分區:

如下圖,3個消費者共同消費8個分區

第一輪:Consumer0-->A-Partition0;Consumer1-->A-Partition1;Consumer2-->A-Partition2

第二輪:Consumer0-->A-Partition3;Consumer1-->B-Partition0;Consumer2-->B-Partition1

第三輪:Consumer0-->B-Partition2;Consumer1-->B-Partition3

3. Striky粘性分配策略

在沒有發生rebalance跟輪詢分配策略是一致的

發生了rebalance(例如Consumer2故障宕機),輪詢分配策略,重新走一遍輪詢分配的過程。而粘性會保證跟上一次的儘量一致,只是將新的需要分配的分區,均勻的分配到現有可用的消費者中即可,這樣就減少了上下文的切換

副本的ACK機制

producer是不斷地往Kafka中寫入數據,寫入數據會有一個返回結果,表示是否寫入成功。這裡對應有一個ACKs的配置。

  • acks = 0:生產者只管寫入,不管是否寫入成功,可能會數據丟失。性能是最好的

  • acks = 1:生產者會等到leader分區寫入成功後,返回成功,接著發送下一條

  • acks = -1/all:確保消息寫入到leader分區、還確保消息寫入到對應副本都成功後,接著發送下一條,性能是最差的

根據業務情況來選擇ack機制,是要求性能最高,一部分數據丟失影響不大,可以選擇0/1。如果要求數據一定不能丟失,就得配置為-1/all。

分區中是有leader和follower的概念,為了確保消費者消費的數據是一致的,只能從分區leader去讀寫消息,follower做的事情就是同步數據。

Q&A:

1. offset存在哪裡?

0.9版本前預設存在zk,但是由於頻繁訪問zk,zk需要一個一個節點更新offset,不能批量或分組更新,導致offset更新成了瓶頸。

在新版 Kafka 以及之後的版本,Kafka 消費的offset都會預設存放在 Kafka 集群中的一個叫 __consumer_offsets 的topic中。offset以消息形式發送到該topic並保存在broker中。這樣consumer提交offset時,只需連接到broker,不用訪問zk,避免了zk節點更新瓶頸。

2.leader選舉策略?

2種leader:①broker的leader即controller leader ② partition的leader

  1. Controller leader:當broker啟動的時候,都會創建KafkaController對象,但是集群中只能有一個leader對外提供服務,這些每個節點上的KafkaController會在指定的zookeeper路徑下創建臨時節點,只有第一個成功創建的節點的KafkaController才可以成為leader,其餘的都是follower。當leader故障後,所有的follower會收到通知,再次競爭在該路徑下創建節點從而選舉新的leader

  2. Partition leader :由controller leader執行

  • 從Zookeeper中讀取當前分區的所有ISR(in-sync replicas)集合
  • 調用配置的分區選擇演算法選擇分區的leader

如何處理所有Replica都不工作?在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:等待ISR中的任一個Replica“活”過來,並且選它作為Leader選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。

3.分區可以提高擴展性以及吞吐量,那分區越多越好嗎?

分區並不是越多越好,分區過多存在著以下弊端:

  • producer記憶體成本增大、consumer線程開銷增大partition是kafka管理數據的基本邏輯組織單元,越多的partition意味著越多的數據存儲文件(一個partition對應至少3個數據文件)
  • 分區增多數據連續性下降
  • 可用性降低

4.與資料庫相比kafka的優勢?

  • 業務場景不同,底層數據結構不同,kafka數據存儲對於功能要求較少,因此讀寫更快kafka存儲數據(消息本身)的文件的數據結構是數組,數組的特點就是:數據間位置連續,如果按照順序讀取,或者追加寫入的話,其時間複雜度為O(1),效率最高。

5.消費偏移的更新方式 無論是kafka預設api,還是java的api,offset的更新方式都有兩種:自動提交和手動提交

  1. 自動提交(預設方式)

Kafka中偏移量的自動提交是由參數enable_auto_commit和auto_commit_interval_ms控制的,當enable_auto_commit=True時,Kafka在消費的過程中會以頻率為auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量提交,具體提交到哪個Partation:Math.abs(groupID.hashCode()) % numPartitions。

這種方式也被稱為at most once,fetch到消息後就可以更新offset,無論是否消費成功。

2. 手動提交

鑒於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證消息不被重覆消費以及消息不被丟失。

對於手動提交offset主要有3種方式:

同步提交:提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束,同步方式下消費者線程在拉取消息會被阻塞,在broker對提交的請求做出響應之前,會一直阻塞直到偏移量提交操作成功或者在提交過程中發生異常,限制了消息的吞吐量。

非同步提交:非同步手動提交offset時,消費者線程不會阻塞,提交失敗的時候也不會進行重試,並且可以配合回調函數在broker做出響應的時候記錄錯誤信息。 對於非同步提交,由於不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,如果偏移量還未提交就會造成偏移量丟失。

非同步+同步:針對非同步提交偏移量丟失的問題,通過對消費者進行非同步批次提交並且在關閉時同步提交的方式,這樣即使上一次的非同步提交失敗,通過同步提交還能夠進行補救,同步會一直重試,直到提交成功。 通過finally在最後不管是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失

參考資料:

https://www.jianshu.com/p/90a15fe33551

https://www.jianshu.com/p/da3f19cee0e2

https://www.modb.pro/db/373502

https://blog.csdn.net/anryg/article/details/123579937

https://toutiao.io/posts/ptwuho/preview

作者:京東科技 於添馨

來源:京東雲開發者社區 轉載請註明來源


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用 ssh-keygen 生成密鑰 參考資料 https://learn.microsoft.com/zh-cn/azure/virtual-machines/linux/create-ssh-keys-detailed 快速開始 以下 ssh-keygen 命令預設在 ~/.ssh 目錄中生成 ...
  • 家中有閑置的小新,是A卡正好合適裝linux 安裝前 關閉安全引導 通過關機鍵旁邊的重置口重新開機 OR 在開機界面按F2(開啟Hotkey 模式的要按Fn+F2) 進入BIOS 設置界面,關閉Secure Boot,這樣方便安裝linux 系統刻錄 linux有很多發行版 大家可以自行選擇,推薦U ...
  • ssh介紹 SSH,全稱為Secure Shell,是一種用於在網路中安全地進行遠程登錄和執行命令的協議。它通過加密的方式提供了對網路通信的保護,使得用戶可以在不安全的網路環境下安全地進行遠程管理和數據傳輸。 以下是 SSH 協議的一些重要特點和功能: 安全性:SSH 使用加密技術來保護數據在網路中 ...
  • 資料庫完整性包括正確性和相容性,DBMS通過提供約束條件機制、檢查方法和違約處理功能維護資料庫完整性,其中實體完整性通過主鍵確保唯一標識,參照實體性通過外鍵關聯,用戶定義完整性滿足特定應用需求,而觸發器和斷言提供事件驅動和複雜約束支持,共同構成資料庫保持一致性的關鍵機制。 ...
  • SQL(結構化查詢語言)的演變從IBM的SystemR開始,經過ANSI的標準化,近年來SQL標準變得更加豐富和複雜。SQL的特點包括綜合統一、高度非過程化、面向集合的操作方式以及提供多種使用方式的統一語法結構。在資料庫實例中,基本表獨立存在,而視圖是基本表導出的虛表,用於供人查看。資料庫模式結構包... ...
  • SQL ALTER TABLE 語句 SQL ALTER TABLE 語句用於在現有表中添加、刪除或修改列,也可用於添加和刪除各種約束。 ALTER TABLE - 添加列 要在表中添加列,請使用以下語法: ALTER TABLE 表名 ADD 列名 數據類型; 以下 SQL 向 "Customer ...
  • 本文分享自華為雲社區《GaussDB資料庫SQL系列-SQL與ETL淺談》,作者:Gauss松鼠會小助手2。 一、前言 在SQL語言中,ETL(抽取、轉換和載入)是一種用於將數據從源系統抽取到目標系統的過程。ETL過程通常包括三個階段:抽取(Extract)、轉換(Transform)和載入(Loa ...
  • 數據作為新時代重要的生產要素之一,數據資產化的相關工作正在提速。自今年10月1日起,中國資產評估協會制定的《數據資產評估指導意見》正式施行。同時,《企業數據資源相關會計處理暫行規定》近期轉為正式稿,也將於明年1月1日起施行。 《暫行規定》規定:企業使用的數據資源,符合《企業會計準則第6號——無形資產 ...
一周排行
    -Advertisement-
    Play Games
  • 背景 在瀏覽器中訪問本地靜態資源html網頁時,可能會遇到跨域問題如圖。 是因為瀏覽器預設啟用了同源策略,即只允許載入與當前網頁具有相同源(協議、功能變數名稱和埠)的內容。 WebView2預設情況下啟用了瀏覽器的同源策略,即只允許載入與主機相同源的內容。所以如果我們把靜態資源發佈到iis或者通過node ...
  • 最近看幾個老項目的SQL條件中使用了1=1,想想自己也曾經這樣寫過,略有感觸,特別拿出來說道說道。編寫SQL語句就像炒菜,每一種調料的使用都會影響菜品的最終味道,每一個SQL條件的加入也會影響查詢的執行效率。那麼 1=1 存在什麼樣的問題呢?為什麼又會使用呢? ...
  • 好久不見,我又回來了。 給大家分享一個我最近使用c#代碼操作ftp伺服器的代碼示例: 1 public abstract class FtpOperation 2 { 3 /// <summary> 4 /// FTP伺服器地址 5 /// </summary> 6 private string f ...
  • 一:背景 1. 講故事 過年喝了不少酒,腦子不靈光了,停了將近一個月沒寫博客,今天就當新年開工寫一篇吧。 去年年初有位朋友找到我,說他們的系統會偶發性崩潰,在網上也發了不少帖子求助,沒找到自己滿意的答案,讓我看看有沒有什麼線索,看樣子這是一個牛皮蘚的問題,既然對方有了dump,那就分析起來吧。 二: ...
  • 自己製作的一個基於Entity Framework Core 的資料庫操作攔截器,可以列印資料庫執行sql,方便開發調試,代碼如下: /// <summary> /// EF Core 的資料庫操作攔截器,用於在資料庫操作過程中進行日誌記錄和監視。 /// </summary> /// <remar ...
  • 本文分享自華為雲社區《Go併發範式 流水線和優雅退出 Pipeline 與 Cancellation》,作者:張儉。 介紹 Go 的併發原語可以輕鬆構建流數據管道,從而高效利用 I/O 和多個 CPU。 本文展示了此類pipelines的示例,強調了操作失敗時出現的細微之處,並介紹了乾凈地處理失敗的 ...
  • 在上篇文章中,我們介紹到在多線程環境下,如果編程不當,可能會出現程式運行結果混亂的問題。出現這個原因主要是,JMM 中主記憶體和線程工作記憶體的數據不一致,以及多個線程執行時無序,共同導致的結果。 ...
  • 1、下載安裝包首先、進入官網下載安裝包網址:https://www.python.org/downloads/windows/下載步驟:進入下載地址,根據自己的電腦系統選擇相應的python版本 選擇適配64位操作系統的版本(查看自己的電腦操作系統版本), 點擊下載安裝包 也可以下載我百度雲分享的安 ...
  • 簡介 git-commit-id-maven-plugin 是一個maven 插件,用來在打包的時候將git-commit 信息打進jar中。 這樣做的好處是可以將發佈的某版本和對應的代碼關聯起來,方便查閱和線上項目的維護。至於它的作用,用官方說法,這個功能對於大型分散式項目來說是無價的。 功能 你 ...
  • 序言 在數字時代,圖像生成技術正日益成為人工智慧領域的熱點。 本討論將重點聚焦於兩個備受矚目的模型:DALL-E和其他主流AI繪圖方法。 我們將探討它們的優勢、局限性以及未來的發展方向。通過比較分析,我們期望能夠更全面地瞭解這些技術,為未來的研究和應用提供啟示。 Q: 介紹一下 dall-e Ope ...