一.引言 kafka是廣泛使用的流處理組件,我們知道怎麼使用它,也知道它的實現原理。但是更重要的部分是它的設計理念,即kafka設計者當時是如何考量各種方案的,瞭解這些,對提升我們的設計能力非常有幫助。 二.動機 我們將 Kafka 設計為一個統一平臺,來處理大型公司可能擁有的所有實時數據流。 為此 ...
一.引言
kafka是廣泛使用的流處理組件,我們知道怎麼使用它,也知道它的實現原理。但是更重要的部分是它的設計理念,即kafka設計者當時是如何考量各種方案的,瞭解這些,對提升我們的設計能力非常有幫助。
二.動機
我們將 Kafka 設計為一個統一平臺,來處理大型公司可能擁有的所有實時數據流。 為此,我們必須考慮相當廣泛的用例集。
- 它必須具有高吞吐量,才能支持大容量事件流,例如實時日誌聚合。
- 它需要優雅地處理大量積壓數據,以便能夠支持離線系統的周期性數據負載。
- 系統必須保證low-latency delivery,才能處理更傳統的消息傳遞用例。
- 我們希望支持分區、分散式、實時處理,基於舊的事件流創建新的事件流。 這激發了我們的分區和consumer模型。
- 最後,在將流送入其他數據系統進行服務的情況下,系統必須能夠在出現機器故障時保證容錯性。
支持這些用途使我們做出具有許多獨特元素的設計,與傳統的消息傳遞系統相比,它更類似於資料庫日誌。 我們將在以下部分概述設計的一些元素。
三.持久化
不要害怕文件系統!
Kafka 嚴重依賴文件系統來存儲和緩存消息。 人們普遍認為“磁碟很慢”,這讓人們懷疑持久化結構能否提供有競爭力的性能。 事實上,磁碟比人們預期的要慢得多,也快得多,這取決於它們的使用方式。 一個設計得當的磁碟結構通常可以和網路一樣快。關於磁碟性能的關鍵事實是,在過去十年中,硬碟驅動器的吞吐量與磁碟尋道的延遲有所不同。 在6個 7200rpm SATA RAID-5 陣列的 JBOD 配置上,順序寫入的性能約為 600MB/秒,而隨機寫入的性能僅為約 100k/秒——相差超過 6000 倍。 這些順序讀取和寫入是所有使用模式中最可預測的,並且由操作系統進行了大量優化。 現代操作系統提供read-ahead和write-behind技術,以大的數據塊預取數據,並將較小的邏輯寫入分組為大的物理寫入。 可以在這篇 ACM 隊列文章ACM Queue article;找到對這個問題的進一步討論;他們實際上發現順序磁碟訪問在某些情況下比隨機記憶體訪問更快!
tips:read-ahead
Linux的文件預讀readahead,指Linux系統內核將指定文件的某 區域預讀進頁緩存起來,便於接下來對該區域進行讀取時,不會因缺頁(page fault)而阻塞。因為從記憶體讀取比從磁碟讀取要快很多。預讀可以有效的減少磁碟的尋道次數和應用程式的I/O等待時間,是改進磁碟讀I/O性能的重要 優化手段之一。
為了彌補這種性能差異,現代操作系統越來越積極地使用記憶體進行磁碟緩存。 現代操作系統會傾向於將所有空閑記憶體用作磁碟緩存,而性能損失很小。 所有的磁碟讀寫都會經過這個統一的緩存。 如果不使用直接 I/O,則無法輕易關閉此功能。因此即使進程緩存了數據,該數據也可能會在操作系統page cache中複製,將其存儲兩次。
此外,當構建在 JVM 之上,任何花時間研究 Java 記憶體使用的人都知道兩件事:
- 對象的記憶體開銷非常高,通常會使存儲的數據空間翻倍(或更糟)。
- 隨著堆內數據的增加,Java GC變得越來越繁瑣和緩慢。
tips: Java對象構成
Java 的對象包括對象頭,類指針,數組長度(可選),數據,比如只有一個int類型欄位的對象需要96bit的記憶體。
由於這些因素,使用文件系統和依賴page cache優於在記憶體中維護緩存或其他結構——至少,可以用作緩存的記憶體翻倍,並且可能通過存儲緊湊的位元組結構而不是單個對象再次翻倍。 這樣做將導致 32GB 機器上的緩存高達 28-30GB, 並且沒有GC問題。 此外,即使服務重新啟動,此緩存仍將保持預熱狀態,而進程內緩存將需要在記憶體中重建(對於 10GB 的緩存可能需要 10 分鐘),否則它將需要從一個完全未載入的緩存開始(這可能意味著糟糕的初始性能)。 這也大大簡化了代碼,因為維護緩存和文件系統之間一致性的所有邏輯現在都在操作系統中,這往往比one-off 進程內嘗試更有效、更正確。 如果您的磁碟使用有利於順序讀取,那麼read-ahead實際上是在每次磁碟讀取時使用有用的數據預先填充此緩存。
這表明瞭一種非常簡單的設計:與其在記憶體中保留儘可能多的內容,併在我們用完空間時,恐慌地將其全部刷新到文件系統,不如將其反轉。 所有數據都立即寫入文件系統上的持久日誌中,而不必刷新到磁碟。 實際上這隻是意味著它被轉移到內核的page cache中。
這種以page cache為中心的設計風格在此處有關 Varnish 設計的文章article 中進行了描述(以及適度的自大)。
常數時間性能
消息系統中使用的持久數據結構通常是以每個consumer隊列為單位,帶有關聯的 BTree 或其他通用隨機訪問數據結構,以維護有關消息的元數據。 BTree 是可用的最通用的數據結構,可以在消息系統中支持各種事務性和非事務性語義。 不過,它們確實帶來了相當高的成本:Btree 操作是 O(log N)。 通常 O(log N) 被認為本質上等同於常數時間,但對於磁碟操作而言並非如此。 磁碟尋道的速度是10 毫秒一次,並且每個磁碟一次只能進行一次尋道,因此並行性受到限制。 因此,即使很少的磁碟尋道也會導致非常高的開銷。 由於存儲系統混合了非常快的緩存操作和非常慢的物理磁碟操作,固定緩存數據增加時,樹結構的性能通常是超線性的——即數據加倍會使性能變得慢兩倍不止。直觀地說,持久隊列可以建立在簡單的讀取和追加到文件的基礎上,就像日誌記錄解決方案的一樣。 這種結構的優點是所有操作都是 O(1) 並且讀取不會阻塞彼此的寫入。 這具有明顯的性能優勢,因為性能與數據大小完全分離——一臺伺服器現在可以充分利用許多廉價、低轉速的 1+TB SATA 驅動器。 儘管它們的尋道性能較差,但這些驅動器對於大容量讀寫具有可接受的性能,而且價格是前者的 1/3,容量是前者的 3 倍。
在沒有任何性能損失的情況下訪問幾乎無限的磁碟空間意味著我們可以提供一些消息系統中不常見的功能。 例如,在 Kafka 中,我們可以將消息保留相對較長的時間(比如一周),而不是嘗試在消息被消費後立即刪除。 正如我們將要描述的,這為consumer帶來了很大的靈活性。
四.效率
我們在效率方面付出了巨大的努力。 我們的主要用例之一是處理網路活動數據,該數據量非常大:每次頁面瀏覽都可能產生數十次寫入。 此外,我們假設發佈的每條消息都被至少一個consumer(通常是很多)消費,因此我們努力使消費儘可能高效。
我們還發現,根據構建和運行大量類似系統的經驗,效率是有效多租戶操作的關鍵。 如果下游基礎設施服務很容易因為應用程式使用量的小波動而成為瓶頸,那麼這種小的變化通常會產生問題。 必須非常快,才能確保應用程式不會因為基礎架構出現問題。 當嘗試在集中式集群上運行支持數十或數百個應用程式的集中式服務時,這一點尤為重要,因為使用模式的變化幾乎每天都會發生。
我們在上一節中討論了磁碟效率。 一旦消除了不良的磁碟訪問模式,在這種類型的系統中有兩個常見的低效率原因:太多的小 I/O 操作和過多的位元組複製。
小I/O問題既發生在客戶端和伺服器之間,也發生在伺服器自身的持久化操作中。
為避免這種情況,我們的協議是圍繞“消息集”抽象構建的,該抽象將消息自然地分組在一起。 這允許網路請求將消息組合在一起並分攤網路往返的開銷,而不是一次發送一條消息。 伺服器依次將消息塊一次性附加到其日誌中,而consumer一次獲取大的線性塊。
這個簡單的優化使性能得到巨大的提升。 批處理導致更大的網路數據包、更大的順序磁碟操作、連續的記憶體塊等,所有這些都允許 Kafka 將突發的隨機消息寫入流轉換為流向consumer的線性寫入。
另一個低效率是位元組複製。 在低消息速率下這不是問題,但在負載下影響很大。 為了避免這種情況,我們採用了一種由producer、broker和consumer共用的標準化二進位消息格式(因此數據塊可以在它們之間傳輸而無需修改)。
broker維護的消息日誌本身只是一個文件目錄,每個文件由一系列消息集填充,這些消息集以producer和consumer使用的相同格式寫入磁碟。 維護這種通用格式可以優化最重要的操作:持久日誌塊的網路傳輸。 現代 unix 操作系統提供高度優化的代碼路徑,用於將數據從頁面緩存傳輸到套接字; 在 Linux 中,這是通過 sendfile 系統調用完成的。
要瞭解 sendfile 的影響,瞭解從文件到套接字傳輸數據的通用數據路徑:
- 操作系統從磁碟讀取數據到內核空間的page cache
- 應用程式從內核空間讀取數據到用戶空間緩衝區
- 應用程式將數據寫回內核空間的套接字緩衝區
- 操作系統將數據從套接字緩衝區複製到 NIC 緩衝區,併在此處通過網路發送
這顯然是低效的,有四個副本和兩個系統調用。 使用 sendfile,通過允許操作系統將數據從頁面緩存直接發送到網路,可以避免這種重新複製。 所以在這個優化路徑中,只需要最終拷貝到網卡緩衝區。
我們期望一個共同的用例是一個topic的多個consumer。 使用上面的零拷貝優化,數據只被覆制到頁面緩存中一次併在每次消費時重覆使用,而不是存儲在記憶體中併在每次讀取時複製到用戶空間。 這允許以接近網路連接限制的速率使用消息。
page cache 和 sendfile 的這種組合意味著在 Kafka 集群上,consumer大部分時間不會落後,你將看不到磁碟上的任何讀取活動,因為它們將完全從緩存中提供數據。
TLS/SSL 庫在用戶空間運行(Kafka 目前不支持內核中的 SSL_sendfile)。 由於此限制,啟用 SSL 時不使用 sendfile。 啟用SSL配置,參考security.protocol和security.inter.broker.protocol
有關 Java 中的 sendfile 和zero copy支持的更多背景信息,請參閱本文article。
tips: send file 圖解
端到端批量壓縮
在某些情況下,瓶頸實際上不是 CPU 或磁碟,而是網路帶寬。 對於需要通過廣域網在數據中心之間發送消息的數據管道來說尤其如此。 當然,用戶總是可以在不需要 Kafka 的任何支持的情況下一次壓縮一條消息,但這可能會導致非常差的壓縮率,因為大部分冗餘是由於相同類型的消息之間的重覆(例如欄位名稱在JSON 或 Web 日誌中的用戶agent或常見字元串值)。 高效壓縮需要將多條消息一起壓縮,而不是單獨壓縮每條消息。Kafka 通過高效的批處理格式支持這一點。 一批消息可以聚集在一起壓縮並以這種形式發送到伺服器。 這批消息將以壓縮形式寫入,併在日誌中保持壓縮狀態,只會被consumer解壓。
Kafka 支持 GZIP、Snappy、LZ4 和 ZStandard 壓縮協議。 有關壓縮的更多詳細信息,請參見此處here.。
五. producer
負載均衡
producer直接將數據發送到作為分區leader的broker,而無需任何中間路由層。 為了幫助producer做到這一點,所有 Kafka 節點都可以在任何給定時間回答有關哪些伺服器處於活動狀態以及topic 分區的leader所在的元數據請求,以允許producer適當地定向其請求。客戶端控制將消息發佈到哪個分區。 這可以隨機完成,實現一種隨機負載平衡,也可以通過某種語義分區函數來完成。 我們通過允許用戶指定一個鍵來進行分區並使用它來散列到一個分區,來自定義語義分區的介面(如果需要,還有一個選項可以覆蓋分區函數)。 例如,如果選擇的鍵是用戶 ID,則給定用戶的所有數據都將發送到同一分區。 這反過來將允許consumer對他們的消費做出本地假設。 這種分區方式明確設計為允許在consumer中進行本地敏感處理程式。
tips: 如果我們想保證訂單消費的順序性,可以將同一用戶的訂單發送給同一分區,一個分區只會同時被一個consumer消費,並且在consumer中進行單線程執行。
非同步發送
批處理是效率的重要驅動因素之一,為了啟用批處理,Kafka producer將嘗試在記憶體中累積數據併在單個請求中發送更大的批次。 批處理可以配置為累積不超過固定數量的消息,並且等待時間不超過某個固定的延遲限制(比如 64k 或 10 毫秒)。 這允許積累更多的位元組來發送,並且在伺服器上執行少量且更大的 I/O 操作。 這種緩衝是可配置的,並提供了一種機制來權衡少量的額外延遲以獲得更好的吞吐量。有關producer的配置 configuration 和 api api 的詳細信息可以在文檔的其他地方找到。
tips
批處理的設計思想在很多其他分散式組件中出現過,比如es中的批量插入。
六.consumer
Kafka consumer通過向引導它想要消費的分區的broker發出“獲取”請求來工作。 consumer在每個請求中指定其在日誌中的offset,並從該位置開始接收回一大塊日誌。 因此,consumer對該位置有很大的控制權,並且可以在需要時回退它以重新使用數據。
推 vs. 拉
我們最初考慮的一個問題是,consumer是應該從broker那裡提取數據,還是broker應該將數據push給consumer。 在這方面,Kafka 遵循大多數消息系統共用的更傳統的設計,其中數據從producer push到broker,並由consumer從broker pull。 一些以日誌為中心的系統,例如 Scribe 和 Apache Flume,遵循一種非常不同的基於push的路徑,其中數據被push到下游。 兩種方法各有利弊。 然而,基於push的系統難以處理各種consumer,因為broker控制數據傳輸的速率。 目標通常是讓consumer能夠以最大可能的速度消費; 不幸的是,在push系統中,這意味著當consumer的消費率低於生產率時,consumer往往會不知所措(本質上是DDoS攻擊)。 基於pull的系統具有更好的特性,即consumer落後後,在可能的時候趕上來。 這可以通過某種退避協議來緩解,consumer可以通過該協議表明它不堪重負,但是讓傳輸速率充分利用(但不要過度利用)比看起來更棘手。 以前以這種方式構建系統的嘗試使我們採用了更傳統的拉模型。基於pull的系統的另一個優點是它有助於將發送給consumer的數據積極地分批處理。 基於push的系統必須選擇立即發送請求或積累更多數據然後在不知道下游consumer是否能夠立即處理它的情況下發送它。 如果針對低延遲進行了調整,這將導致一次只發送一條消息,但無論如何傳輸最終都會被緩衝,這是一種浪費。 基於pull的設計解決了這個問題,因為consumer總是在其在日誌中的當前位置(或達到某個可配置的最大大小)之後拉取所有可用消息。 因此,可以在不引入不必要延遲的情況下獲得最佳批處理。
pull系統的不足之處在於,如果broker沒有數據,consumer可能會進行空轉等待數據到達。 為避免這種情況,我們在pull請求中設置了參數,允許consumer請求在“長輪詢”中阻塞,等待數據到達(並可選擇等待給定位元組數可用以確保較大的傳輸大小)。
您可以想象其他可能的設計,這些設計只會端到端地pull。 producer將在本地寫入本地日誌,而broker將從中提取數據,而consumer則從中提取數據。 通常會提出一種類似類型的“存儲轉發”producer。 這很有趣,但我們覺得不太適合我們擁有數千個producer的目標用例。 我們大規模運行持久性數據系統的經驗讓我們感到,在系統中涉及許多應用程式的數千個磁碟實際上不會使事情變得更可靠,而且操作起來會是一場噩夢。 在實踐中,我們發現我們可以大規模運行具有強大 SLA 的管道,而無需producer持久性。
consumer位置
令人驚訝的是,跟蹤已消費的內容是消息傳遞系統的關鍵性能點之一。
大多數消息傳遞系統都保留有關在broker上使用了哪些消息的元數據。 也就是說,當消息被分發給consumer時,broker要麼立即在本地記錄該事實,要麼等待consumer的確認。 這是一個相當直觀的選擇,而且對於單機伺服器來說確實不清楚這個狀態還能怎麼設計。 由於許多消息傳遞系統中用於存儲的數據結構擴展性很差,這也是一個務實的選擇——因為broker知道消費了什麼,它可以立即刪除它,從而保持數據量較小。可能不明顯的是,讓broker和consumer就已消費的內容達成一致並不是一個微不足道的問題。 如果broker在每次通過網路分發消息時立即將消息記錄為已消費,那麼如果consumer未能處理該消息(比如因為它崩潰或請求超時或其他原因),該消息將丟失。 為瞭解決這個問題,很多消息系統都增加了確認功能,即消息在發送時只標記為已發送而不是被消費; broker等待來自consumer的特定確認以將消息記錄為已消費。 這種策略解決了丟失消息的問題,但又產生了新的問題。 首先,如果consumer處理消息但在發送確認之前失敗,那麼消息將被消費兩次。 第二個問題是關於性能的,現在broker必須記錄關於每條消息的多個狀態(首先鎖定它以免它被第二次發出,然後將其標記為永久消費以便它可以被刪除)。 必須處理棘手的問題,例如如何處理已發送但從未確認的消息。
Kafka 以不同的方式處理這個問題。 我們的topic分為一組完全有序的分區,每個分區在任何給定時間都由每個訂閱consumer組中的一個consumer消費。 這意味著consumer在每個分區中的位置只是一個整數,即下一條要消費的消息的偏移量。 這使得關於已消耗內容的狀態非常小,每個分區只有一個數字。 可以定期檢查此狀態。 這使得消息確認非常廉價。
這個決定有一個附帶的好處。 consumer可以故意回退到舊的偏移量並重新消費數據。 這違反了隊列的共同約定,但事實證明這是許多consumer的基本特征。 比如consumer代碼有bug,在消費了一些消息後被髮現,修複bug後consumer可以重新消費這些消息。
離線數據載入
可擴展的持久性允許consumer,只定期消費的可能性,例如批量數據載入,周期性地將數據批量載入到離線系統,如 Hadoop 或關係數據倉庫。
在 Hadoop 的情況下,我們通過將負載拆分到各個map任務來並行化數據載入,每個映射任務對應一個節點/topic/分區組合,從而允許載入中的完全並行。 Hadoop 提供了任務管理,失敗的任務可以重新啟動而沒有重覆數據的危險——它們只是從原來的位置重新啟動。
static membership
靜態成員旨在提高流應用程式、consumer組和其他構建等在rebalance協議之上的應用程式的可用性。 rebalance協議依賴group coordinator將實體 ID 分配給組成員。 這些生成的 ID 是短暫的,並且會在成員重新啟動和重新加入時更改。 對於基於consumer的應用程式,這種“dynamic membership”可能會導致在代碼部署、配置更新和定期重啟等管理操作期間將大部分任務重新分配給不同的實例。 對於大型狀態應用程式,打亂的任務需要很長時間才能恢復到之前的狀態,並導致應用程式部分或完全不可用。 受此觀察的啟發,Kafka 的group management protocol允許組成員提供持久實體 ID。 基於這些 id,組成員資格保持不變,因此不會觸發rebalance。
如果你想使用static membership,
- 將broker集群和客戶端應用程式升級到 2.3 或更高版本,並確保升級後的broker使用 2.3 或更高版本的 inter.broker.protocol.version。
- 將配置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 設置為一組下每個consumer實例的唯一值。
- 對於 Kafka Streams 應用程式,為每個 KafkaStreams 實例設置唯一的 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 就足夠了,與實例使用的線程數無關。
如果您的broker版本低於 2.3,但您選擇在客戶端設置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG,應用程式將檢測broker版本,然後拋出 UnsupportedException。 如果您不小心為不同的實例配置了重覆的 ID,broker端的防護機制將通過觸發 org.apache.kafka.common.errors.FencedInstanceIdException 通知您的重覆客戶端立即關閉。 有關詳細信息,請參閱 KIP-345
七.消息語義
現在我們對producer和consumer的工作方式有了一些瞭解,讓我們討論一下 Kafka 在producer和consumer之間提供的語義保證。 顯然,可以提供多種可能的消息傳遞保證:
At most once——消息可能會丟失,但永遠不會重新傳遞。
At least once——消息永遠不會丟失,但可以重新傳遞。
Exactly once——這才是人們真正想要的,每條消息只傳遞一次。
值得註意的是,這分為兩個問題:發佈消息的持久性保證和消費消息時的保證。
許多系統聲稱提供“Exactly once”交付語義,但閱讀細則很重要,這些聲明中的大多數都是誤導性的(即它們不會轉化為consumer或producer可能失敗的情況,有多個consumer進程的情況,或寫入磁碟的數據可能丟失的情況)。Kafka 的語義是直截了當的。 發佈消息時,我們有消息被“提交”到日誌的概念。 一旦發佈的消息被提交,只要複製該消息寫入的分區的一個broker保持“存活”狀態,它就不會丟失。 提交消息的定義、活動分區以及我們嘗試處理的故障類型的描述將在下一節中更詳細地描述。 現在讓我們假設一個完美的、無損的broker,並嘗試理解對producer和consumer的保證。 如果producer嘗試發佈消息並遇到網路錯誤,則無法確定此錯誤是發生在消息提交之前還是之後。 這類似於使用自動生成的鍵插入資料庫表的語義。
在 0.11.0.0 之前,如果producer未能收到指示消息已提交的響應,它別無選擇,只能重新發送消息。 這提供了At least once傳遞語義,因為如果原始請求實際上已經成功,則消息可能會在重新發送期間再次寫入日誌。 從 0.11.0.0 開始,Kafka producer還支持冪等傳遞選項,保證重新發送不會導致日誌中出現重覆條目。 為此,broker為每個producer分配一個 ID,並使用producer隨每條消息發送的序列號對消息進行重覆數據刪除。 同樣從 0.11.0.0 開始,producer支持使用類似事務的語義將消息發送到多個topic分區的能力:即 要麼所有消息都已成功寫入,要麼都沒有。 主要用例是 Kafka topic之間的一次性處理(如下所述)。
並非所有用例都需要如此強大的保證。 對於對延遲敏感的用途,我們允許producer指定其所需的持久性級別。 如果producer指定它想要等待提交的消息,這可能需要 10 毫秒的數量級。 然而,producer也可以指定它想要完全非同步地執行發送,或者它只想等到leader(但不一定是follower)收到消息。
現在讓我們從consumer的角度來描述語義。 所有副本都有完全相同的日誌和相同的偏移量。 consumer控制其在此日誌中的位置。 如果consumer從未崩潰,它可以將這個位置存儲在記憶體中,但如果consumer失敗並且我們希望這個topic分區被另一個進程接管,新進程將需要選擇一個合適的位置來開始處理。 假設consumer讀取了一些消息——它有幾個選項來處理消息和更新它的位置。
- 它可以讀取消息,然後保存它在日誌中的位置,最後處理消息。 在這種情況下,consumer進程有可能在保存其位置之後但在保存其消息處理的輸出之前崩潰。 在這種情況下,接管處理的進程將從保存的位置開始,即使該位置之前的一些消息還沒有被處理。 這對應於“At most once”語義,因為在consumer失敗消息的情況下可能不會被處理。
- 它可以讀取消息、處理消息並最終保存其位置。 在這種情況下,consumer進程有可能在處理消息之後但在保存其位置之前崩潰。 在這種情況下,當新進程接管它接收到的前幾條消息時,它已經被處理過了。 這對應於consumer失敗情況下的“At least once”語義。 在許多情況下,消息有一個主鍵,因此更新是冪等的(兩次接收相同的消息只是用它自己的另一個副本覆蓋記錄)。
那麼 exactly once 語義(即你真正想要的東西)呢? 當從 Kafka topic消費並生產到另一個topic時(如在 Kafka Streams 應用程式中),我們可以利用上面提到的 0.11.0.0 中的新事務producer功能。 consumer的位置作為消息存儲在topic中,因此我們可以在與接收處理數據的輸出topic相同的事務中將偏移量寫入 Kafka。 如果交易被中止,consumer的位置將恢復到它的舊值,並且輸出topic的產生的數據將對其他consumer不可見,這取決於他們的“隔離級別”。 在預設的“read_uncommitted”隔離級別中,所有消息對consumer都是可見的,即使它們是中止事務的一部分,但在“read_committed”中,consumer將只返回來自已提交事務的消息(以及任何不屬於這部分的消息)。
tips:kafka事務示例
寫入外部系統時,限制在於需要將consumer的位置與實際存儲為輸出的內容相協調。 實現這一點的經典方法是在consumer位置的存儲和consumer輸出的存儲之間引入兩階段提交。 但這可以通過讓consumer將其偏移量存儲在與其輸出相同的位置來更簡單和更普遍地處理。 這更好,因為consumer可能想要寫入的許多輸出系統不支持兩階段提交。 作為這方面的一個例子,考慮一個 Kafka Connect 連接器,它在 HDFS 中填充數據以及它讀取的數據的偏移量,這樣可以保證數據和偏移量都被更新,或者兩者都不更新。 我們對許多其他數據系統遵循類似的模式,這些系統需要這些更強的語義並且消息沒有主鍵以允許重覆數據刪除。
因此Kafka有效地支持Kafka Streams中的exactly-once delivery,在Kafka topics之間傳輸和處理數據時,一般可以使用事務性producer/consumer來提供exactly-once交付。 其他目標系統的 Exactly-once 交付通常需要與此類系統合作,但 Kafka 提供了偏移量,這使得實現這一點變得可行(另請參閱 Kafka Connect)。 否則,Kafka 預設保證At least once交付,並允許用戶通過在處理一批消息之前禁用對producer的重試併在consumer中提交偏移量來實現At most once交付。
八.複製
Kafka 在可配置數量的伺服器上為每個topic的分區複製日誌(您可以逐個topic地設置此複製因數)。 這允許在集群中的伺服器發生故障時自動故障轉移到這些副本,因此消息在出現故障時仍然可用。
其他消息系統提供了一些與複製相關的功能,但是,在我們(完全有偏見的)看來,這似乎是一個附加的東西,沒有被大量使用,並且有很大的缺點:副本不活躍,吞吐量受到嚴重影響,它需要繁瑣的手動配置等, Kafka 旨在預設與複製一起使用——事實上,我們將未複製的topic實現為複製因數為 1 的複製topic。
複製單元是topic分區。 在非故障情況下,Kafka 中的每個分區都有一個leader和0個或多個follower。 包括leader在內的副本總數構成複製因數。 所有寫入都到分區的leader,從分區的leader或follower讀。 通常,分區比broker多得多,leader平均分佈在broker之間。 follower上的日誌與leader的日誌相同——都具有相同的偏移量和相同順序的消息(當然,在任何給定時間,leader可能在其日誌末尾有一些尚未複製的消息).
follower像普通的 Kafka consumer一樣消費來自leader的消息,並將它們應用到自己的日誌中。 讓follower從leader那裡pull有一個很好的特性,那就是允許follower自然地將他們正在應用到他們日誌的時候進行批處理。
與大多數分散式系統一樣,自動處理故障需要精確定義節點“存活”的含義。 在 Kafka 中,一個稱為“controller”的特殊節點負責管理集群中broker的註冊。 Broker 活躍度有兩個條件:
- broker必須與controller保持活躍的會話,以便接收定期的元數據更新。
- 作為follower的broker必須複製leader的數據,而不是落後“太遠”。
“活動會話”的含義取決於集群配置。 對於 KRaft 集群,通過向controller發送定期心跳來維持活動會話。 如果controller在 broker.session.timeout.ms 配置的超時到期之前未能收到心跳,則該節點被視為離線。
對於使用 Zookeeper 的集群,活性是通過broker在其 Zookeeper 會話初始化時創建的臨時節點的存在間接確定的。 如果broker在 zookeeper.session.timeout.ms 到期之前未能向 Zookeeper 發送心跳後丟失其會話,則該節點將被刪除。 然後,controller會通過 Zookeeper 監視通知節點刪除,並將broker標記為離線。
我們將滿足這兩個條件的節點稱為“同步”,以避免“活著”或“失敗”的含糊不清。 leader跟蹤一組“同步”副本,稱為 ISR。 如果這些條件中的任何一個未能滿足,則broker將從 ISR 中移除。 例如,如果一個follower宕機,那麼controller將通過丟失會話來通知失敗,並將從 ISR 中刪除broker。 另一方面,如果 follower 落後於 leader 太遠但仍有活動會話,則 leader 也可以將其從 ISR 中移除。 滯後副本的確定是通過 replica.lag.time.max.ms 配置來控制的。 無法在此配置設置的最長時間內趕上leader日誌末尾的副本將從 ISR 中刪除。
在分散式系統術語中,我們只嘗試處理“失敗/恢復”故障模型,其中節點突然停止工作然後恢復(可能不知道它們已經宕機)。 Kafka 不處理所謂的“拜占庭式”故障,在這種情況下,節點會產生任意或惡意的響應(可能是由於錯誤或犯規)。
我們現在可以更精確地定義,當該分區的 ISR 中的所有副本都將一條消息應用到它們的日誌時,該消息被視為已提交。 只有提交的消息才會發送給leader。 這意味著leader不必擔心如果leader失敗可能會看到一條可能丟失的消息。 另一方面,producer可以選擇是否等待消息被提交,這取決於他們對延遲和持久性之間權衡的偏好。 此首選項由producer使用的 acks 設置控制。 請註意,topic具有同步副本“最小數量”的設置,當producer請求確認消息已寫入完整的同步副本集時,將檢查該設置。 如果producer請求不那麼嚴格的確認,消息可以被提交和消費,即使同步副本的數量低於最小值(例如,它可以與leader一樣低)。
Kafka 提供的保證是提交的消息不會丟失,只要至少有一個同步副本始終處於存活狀態。
在短暫的故障轉移期後,Kafka 將在出現節點故障時保持可用,但在出現網路分區時可能無法保持可用。
複製日誌:Quorums、ISR 和狀態機
Kafka 分區的核心是複製的日誌。 複製日誌是分散式數據系統中最基本的原語之一,實現它的方法有很多種。 複製的日誌可以被其他系統用作以狀態機樣式實現其他分散式系統的原語。
複製的日誌模擬了對一系列值的順序達成共識的過程(通常將日誌條目編號為 0、1、2、...)。 有很多方法可以實現這一點,但最簡單和最快的是使用一個leader來選擇提供給它的值的排序。 只要leader還活著,所有的follower只需要複製leader選擇的值和順序。當然,如果leader沒有失敗,我們就不需要follower了! 當leader宕機時,我們需要從follower中選擇一個新的leader。 但是follower本身可能會落後或崩潰,所以我們必須確保我們選擇了一個最新的follower。 日誌複製演算法必須提供的基本保證是,如果我們告訴客戶端一條消息已提交,而leader失敗了,我們選出的新leader也必須擁有該消息。 這會產生一個權衡:如果leader等待更多的follower在獲取信息之前提交一條信息,那麼將會有更多的選舉leader。
如果您選擇所需的確認數量和必須比較的日誌數量以選擇leader,從而保證有重疊,那麼這稱為Quorum。
這種權衡的一種常見方法是對提交決定和leader選舉都使用多數表決。 這不是 Kafka 所做的,但無論如何讓我們探索它以瞭解權衡。 假設我們有 2f+1 個副本。 如果 f+1 個副本必須在leader聲明提交之前收到一條消息,並且如果我們通過從至少 f+1 個副本中選擇具有最完整日誌的follower來選舉新的leader,那麼,不超過 f失敗時,leader保證擁有所有已提交的消息。 這是因為在任何 f+1 個副本中,必須至少有一個副本包含所有已提交的消息。 該副本的日誌將是最完整的,因此將被選為新的leader。 每個演算法都必須處理許多剩餘的細節(例如精確定義什麼使日誌更完整,確保leader失敗期間日誌的一致性或更改副本集中的伺服器集)但我們現在將忽略這些。
這種多數表決方法有一個非常好的特性:延遲僅取決於最快的伺服器。 也就是說,如果複製因數為 3,則延遲由較低的follower而不是較高的follower決定。
這個家族中有豐富多樣的演算法,包括 ZooKeeper 的 Zab、Raft 和 Viewstamped Replication。 據我們所知,與 Kafka 的實際實現最相似的學術出版物是來自 Microsoft 的 PacificA。
多數表決的不利之處在於,您無需多次失敗就會失去可選舉的leader。 容忍一次故障需要三份數據,容忍兩次故障需要五份數據。 根據我們的經驗,只有足夠的冗餘來容忍單個故障對於實際系統來說是不夠的,但是每次寫入五次,磁碟空間要求是 5 倍,吞吐量是 1/5,對於大容量數據問題來說並不是很實用。 這可能就是為什麼Quorum演算法更常出現在 ZooKeeper 等共用集群配置中,但不太常見於主數據存儲的原因。 例如,在 HDFS 中,namenode 的高可用性功能是建立在基於多數投票的日誌上的,但這種更昂貴的方法並不用於數據本身。
Kafka 採用略微不同的方法來選擇其Quorum集。 Kafka 不是多數表決,而是動態維護一組同步到leader的同步副本(ISR)。 只有這個集合中的成員才有資格被選為leader。 在所有同步副本都收到寫入之前,不會將對 Kafka 分區的寫入視為已提交。 只要 ISR 集發生變化,它就會保留在集群元數據中。 正因為如此,ISR 中的任何副本都有資格被選為leader。 這是 Kafka 的使用模型的一個重要因素,其中有許多分區並且確保leader平衡很重要。 使用此 ISR 模型和 f+1 個副本,Kafka topic可以容忍 f 次故障而不會丟失已提交的消息。
對於我們希望處理的大多數用例,我們認為這種權衡是合理的。 實際上,為了容忍 f 次失敗,多數投票和 ISR 方法都將在提交消息之前等待相同數量的副本確認(例如,為了在一次失敗中幸存下來,多數群體需要三個副本和一個確認,而 ISR 方法需要兩個副本和一個確認)。 在沒有最慢伺服器的情況下提交的能力是多數表決方法的一個優勢。 但是,我們認為通過允許客戶端選擇是否阻塞消息提交來改進它,並且由於所需的複製因數較低而帶來的額外吞吐量和磁碟空間是值得的。
另一個重要的設計區別是 Kafka 不要求崩潰的節點恢復時所有數據都完好無損。 在這個空間中,複製演算法依賴於“穩定存儲”的存在並不少見,這種存儲在任何故障恢復場景中都不會丟失,而不會出現潛在的一致性違規。 這個假設有兩個主要問題。 首先,磁碟錯誤是我們在持久數據系統的實際操作中觀察到的最常見問題,它們通常不會讓數據保持完整。 其次,即使這不是問題,我們也不希望在每次寫入時都使用 fsync 來保證一致性,因為這會使性能降低兩到三個數量級。 我們允許副本重新加入 ISR 的協議,確保在重新加入之前,它必須再次完全重新同步,即使它在崩潰中丟失了未刷新的數據。
tips: es使用的是Quorum演算法。
unclean leader選舉:如果他們都宕機了怎麼辦?
請註意,Kafka 對數據丟失的保證基於至少一個副本保持同步。 如果複製分區的所有節點都宕機,則此保證不再有效。
然而,當所有副本都宕機時,實際系統需要做一些合理的事情。 如果您不幸遇到這種情況,請務必考慮會發生什麼。 有兩種行為可以實現:
- 等待 ISR 中的一個副本恢復並選擇這個副本作為leader(希望它仍然擁有所有數據)。
- 選擇第一個的副本(不一定在 ISR 中)作為leader。
這是可用性和一致性之間的簡單權衡。 如果我們在 ISR 中等待副本,那麼只要這些副本關閉,我們就將保持不可用狀態。 如果此類副本被破壞或數據丟失,那麼我們將永久關閉。 另一方面,如果一個不同步的副本恢復並且我們允許它成為leader,那麼它的日誌就會成為真實的來源,即使它不能保證有每條提交的消息。 預設情況下,從 0.11.0.0 版本開始,Kafka 選擇第一種策略並傾向於等待一致的副本。 可以使用配置屬性 unclean.leader.election.enable 更改此行為,以支持正常運行時間優於一致性的用例。
這種困境並不是 Kafka 特有的。 它存在於任何基於群體的方案中。 例如,在多數表決方案中,如果大多數伺服器遭受永久性故障,那麼您必須選擇丟失 100% 的數據,或者通過將現有伺服器上保留的內容作為新的真實來源來違反一致性。
可用性和耐用性保證
寫入 Kafka 時,producer可以選擇是等待消息被 0,1 還是所有 (-1) 個副本確認。 請註意,“所有副本的確認”並不能保證所有分配的副本都已收到消息。 預設情況下,當 acks=all 時,一旦所有當前同步副本都收到消息,就會發生確認。 例如,如果一個topic只配置了兩個副本並且一個失敗了(即只剩下一個同步副本),那麼指定 acks=all 的寫入將成功。 但是,如果其餘副本也發生故障,這些寫入可能會丟失。 雖然這確保了分區的最大可用性,但對於一些更喜歡持久性而不是可用性的用戶來說,這種行為可能是不受歡迎的。 因此,我們提供了兩個topic級配置,可用於使消息持久性優於可用性:
- 禁用unclean的leader選舉——如果所有副本都不可用,那麼分區將保持不可用狀態,直到最近的leader再次可用。 這實際上更傾向於不可用而不是消息丟失的風險。 請參閱上一節關於 Unclean Leader Election 的說明。
- 指定最小 ISR 大小 - 如果 ISR 的大小超過某個最小值,分區將僅接受寫入,以防止丟失僅寫入單個副本的消息,該副本隨後變得不可用。 此設置僅在producer使用 acks=all 並保證消息將被至少這麼多同步副本確認時才生效。 此設置提供了一致性和可用性之間的權衡。 最小 ISR 大小的較高設置可保證更好的一致性,因為可以保證將消息寫入更多副本,從而降低丟失消息的可能性。 但是,它會降低可用性,因為如果同步副本的數量低於最小閾值,分區將不可用於寫入。
副本管理
上面關於複製日誌的討論實際上只涵蓋了一個日誌,即 一個topic分區。 但是,Kafka 集群將管理成百上千個這樣的分區。 我們嘗試以迴圈方式平衡集群內的分區,以避免將高容量topic分區聚集在少量節點上。 同樣,我們嘗試平衡leader,以便每個節點包含一定比率的leader。
優化leader選舉過程也很重要,因為這是不可用的關鍵視窗。 leader選舉的簡單實現最終會在節點失敗時為該節點托管的所有分區運行每個分區的選舉。 正如上面關於複製的部分所討論的,Kafka 集群有一個特殊的角色,稱為“controller”,負責管理broker的註冊。 如果 controller 檢測到 broker 發生故障,它會負責選舉 ISR 的剩餘成員之一作為新的 leader。 結果是我們能夠將許多所需的leader層變更通知在一起批處理,這使得大量分區的選舉過程成本更低、速度更快。 如果controller本身出現故障,則將選舉另一個controller。
九.日誌壓縮
日誌壓縮確保 Kafka 始終至少保留單個topic分區數據日誌中每個消息鍵的最後一個已知值。 它解決瞭如下用例和場景,例如在應用程式崩潰或系統故障後恢復狀態,或在操作維護期間應用程式重啟後重新載入緩存。 讓我們更詳細地研究這些用例,然後描述壓縮的工作原理。
到目前為止,我們只描述了更簡單的數據保留方法,即在固定時間段後或當日誌達到某個預定大小時丟棄舊日誌數據。 這適用於時間事件數據,例如每條記錄獨立的日誌記錄。 然而,一類重要的數據流是對給予鍵可變數據的變更日誌(例如,對資料庫表的更改)。讓我們討論這種流的具體示例。 假設我們有一個包含用戶電子郵件地址的topic; 每次用戶更新他們的電子郵件地址時,我們都會使用他們的用戶 ID 作為主鍵向該topic發送一條消息。 現在假設我們在一段時間內為 ID 為 123 的用戶發送以下消息,每條消息對應於電子郵件地址的更改(省略其他 ID 的消息):
123 => [email protected] . . . 123 => [email protected] . . . 123 => [email protected]
日誌壓縮為我們提供了更細粒度的保留機制,因此我們保證至少保留每個主鍵的最後更新(例如 [email protected])。 通過這樣做,我們保證日誌包含每個鍵的最終值的完整快照,而不僅僅是最近更改的鍵。 這意味著下游leader可以在這個topic之外恢復他們自己的狀態,而我們不必保留所有更改的完整日誌。
讓我們先看看一些有用的用例,然後再看看如何使用它。
- 資料庫更改訂閱。 通常需要在多個數據系統中擁有一個數據集,並且這些系統中的一個通常是某種資料庫(RDBMS 或者可能是其他的鍵值存儲)。 例如,您可能有一個資料庫、一個緩存、一個搜索集群和一個 Hadoop 集群。 對資料庫的每次更改都需要反映在緩存、搜索集群中,並最終反映在 Hadoop 中。 在只處理實時更新的情況下,您只需要最近的日誌。 但是,如果您希望能夠重新載入緩存或恢復失敗的搜索節點,您可能需要一個完整的數據集。
- Event sourcing。 這是一種應用程式設計風格,它將查詢處理與應用程式設計放在一起,並使用更改日誌作為應用程式的主要存儲。
- 日誌記錄以實現高可用性。 一個執行本地計算的進程可以通過註銷它對其本地狀態所做的更改來實現容錯,這樣另一個進程可以重新載入這些更改併在它失敗時繼續執行。 一個具體的例子是在流查詢系統中處理計數、聚合和其他類似“分組依據”的處理。 Samza 是一個實時流處理框架,正是為此目的而使用此功能。
tips:mysql中的redo log,es中的transaction log,redis中的aof持久化都是日誌記錄以實現高可用性的例子。這裡的日誌壓縮類比redis中的aof日誌重寫。
在這些情況中的每一種情況下,都需要主要處理實時變化,但偶爾,當機器崩潰或數據需要重新載入或重新處理時,需要進行完全載入。 日誌壓縮可以同時支持這兩個用例。 這篇博文this blog post中更詳細地描述了這種日誌的使用方式。
總體思路很簡單。 如果我們有無限的日誌保留,並且我們記錄了上述情況下的每個更改,那麼我們將捕獲系統從第一次開始時開始的每次狀態。 使用這個完整的日誌,我們可以通過重放日誌中的前 N 條記錄來恢復到任何時間點。 這個假設的完整日誌對於多次更新單個記錄的系統不是很實用,因為即使對於穩定的數據集,日誌也會無限增長。 丟棄舊更新的簡單日誌保留機制將限制空間,但日誌不再是恢復當前狀態的一種方式——現在從日誌開頭恢復不再重新創建當前狀態,因為舊更新可能根本無法捕獲.日誌壓縮是一種提供更細粒度的保留每條記錄的機制,而不是粗粒度的基於時間的保留。 我們的想法是有選擇地刪除我們使用相同主鍵進行更新的記錄。 這樣可以保證日誌至少具有每個鍵的最後狀態。
可以為每個topic設置此保留策略,因此單個集群可以有一些topic,其中保留是通過大小或時間強制執行的,而其他topic的保留是通過壓縮強制執行的。
此功能的靈感來自 LinkedIn 最古老、最成功的基礎設施之一——稱為 Databus 的資料庫變更日誌緩存服務。 與大多數日誌結構存儲系統不同,Kafka 是為訂閱而構建的,並組織數據以實現快速線性讀寫。 與 Databus 不同,Kafka 充當真實來源存儲,因此即使在上游數據源無法重放的情況下,它也很有用。
日誌壓縮基礎
這是一張high-level圖片,顯示了 Kafka 日誌的邏輯結構以及每條消息的偏移量。日誌的頭部與傳統的 Kafka 日誌相同。 它具有密集的、順序的偏移量並保留所有消息。 日誌壓縮添加了一個用於處理日誌尾部的選項。 上圖顯示了帶有壓縮尾部 的 log。 請註意,日誌尾部的消息保留了首次寫入時分配的原始偏移量——永遠不會改變。 另請註意,即使具有該偏移量的消息已被壓縮,所有偏移量仍會保留在日誌中的有效位置; 在這種情況下,該位置與日誌中出現的下一個最高偏移量無法區分。 例如,在上圖中,偏移量 36、37 和 38 都是等效位置,從這些偏移量中的任何一個開始的讀取都將返回以 38 開頭的消息集。
壓縮也允許刪除。 帶有鍵和空負載的消息將被視為從日誌中刪除。 這樣的記錄有時被稱為tombstone。 此刪除標記將導致刪除任何具有該鍵的先前消息(就像任何具有該鍵的新消息一樣),但刪除標記是特殊的,因為它們將在一段時間後從日誌中清除以釋放空間. 不再保留刪除的時間點在上圖中標記為“刪除保留點”。
壓縮是通過定期重新複製日誌段在後臺完成的。 清理不會阻止讀取,並且可以限制使用不超過可配置的 I/O 吞吐量,以避免影響producer和leader。 壓縮日誌段的實際過程如下所示:
日誌壓縮提供什麼保證?
日誌壓縮保證以下內容:
- 任何停留在日誌頭部的leader都會看到寫入的每條消息; 這些消息將具有順序偏移量。 topic的 min.compaction.lag.ms 可用於保證消息寫入後必須經過的最短時間長度才能被壓縮。 IE。 它提供了每條消息將在(未壓縮的)頭部保留多長時間的下限。 topic的 max.compaction.lag.ms 可用於保證消息寫入時間和消息符合壓縮條件的時間之間的最大延遲。
- 始終保持消息的順序。 壓縮永遠不會重新排序消息,只是刪除一些。
- 消息的偏移量永遠不會改變。 它是日誌中某個位置的永久標識符。
- 任何從日誌開頭開始的leader都將至少看到所有記錄的最終狀態,按照它們的寫入順序。 此外,如果leader在小於topic的 delete.retention.ms 設置(預設為 24 小時)的時間段內到達日誌頭部,則會看到所有已刪除記錄的刪除標記。 換句話說:由於刪除標記的刪除與讀取同時發生,因此如果leader滯後超過 delete.retention.ms,則它有可能錯過刪除標記。
日誌壓縮詳細信息
日誌壓縮由日誌清理器處理,這是一個重新複製日誌段文件的後臺線程池,刪除其鍵出現在日誌頭部的記錄。 每個壓縮器線程的工作方式如下:
- 它選擇日誌頭與日誌尾的比率最高的日誌
- 它為日誌頭部的每個鍵創建最後一個偏移量的簡潔摘要
- 它從頭到尾複製日誌,刪除日誌中稍後出現的鍵。 新的、乾凈的段會立即交換到日誌中,因此所需的額外磁碟空間只是一個額外的日誌段(不是日誌的完整副本)。
- 日誌頭的摘要本質上只是一個空間緊湊的哈希表。 每個條目正好使用 24 個位元組。 結果,使用 8GB 的清理器緩衝區,一次清理器迭代可以清理大約 366GB 的日誌頭(假設有 1k 條消息)。
配置日誌清理器
預設情況下啟用日誌清理器。 這將啟動cleaner線程池。 要針對特定topic啟用日誌清理,請添加特定於日誌的屬性log.cleanup.policy=compact
log.cleanup.policy 屬性是在broker的 server.properties 文件中定義的broker配置設置; 它會影響集群中所有沒有配置覆蓋的topic,如此處所述。 日誌清理器可以配置為保留最少量的未壓縮的日誌“頭”。 這是通過設置壓縮時間延遲來實現的。log.cleaner.min.compaction.lag.ms
這可用於防止比最小消息年齡更新的消息被壓縮。 如果未設置,則所有日誌段都符合壓縮條件,但最後一段除外,即 當前正在寫入的那個。 活動段不會被壓縮,即使它的所有消息都早於最小壓縮時間延遲。 可以配置日誌清理器以確保最大延遲,在該延遲之後,日誌的未壓縮“頭”有資格進行日誌壓縮。log.cleaner.max.compaction.lag.ms
這可用於防止低生產率的日誌在無限制的持續時間內不符合壓縮條件。 如果未設置,則不壓縮不超過 min.cleanable.dirty.ratio 的日誌。 請註意,這個壓縮截止日期並不是硬性保證,因為它仍然取決於日誌清理器線程的可用性和實際壓縮時間。 您將需要監控 uncleanable-partitions-count、max-clean-time-secs 和 max-compaction-delay-secs 指標。
此處here.描述了更cleanner的配置。
十.配額
Kafka 集群能夠對請求強制執行配額,以控制客戶端使用的broker資源。 Kafka broker可以為共用配額的每組客戶端強制執行兩種類型的客戶端配額:
- 網路帶寬配額定義位元組速率閾值(自 0.9 起)
- 請求率配額將 CPU 利用率閾值定義為網路和 I/O 線程的百分比(自 0.11 起)
tips,這裡的配額可以類比為常見的限流器
為什麼需要配額?
producer和leader有可能生產/消費非常大量的數據或以非常高的速率生成請求,從而壟斷broker資源,導致網路飽和,並且通常 DOS 其他客戶端和broker本身。 擁有配額可以防止這些問題,並且在大型多租戶集群中尤為重要,在這種情況下,一小部分行為不端的客戶端可能會降低行為良好的用戶體驗。 事實上,當將 Kafka 作為服務運行時,這甚至可以根據商定的合同強制執行 API 限制。
client group
Kafka 客戶端的身份是用戶主體,代表安全集群中經過身份驗證的用戶。 在支持未經身份驗證的客戶端的集群中,用戶主體是broker使用可配置的 PrincipalBuilder 選擇的一組未經身份驗證的用戶。 Client-id 是客戶端的邏輯分組,具有由客戶端應用程式選擇的有意義的名稱。 元組 (user, client-id) 定義了共用用戶主體和客戶端 id 的安全邏輯客戶端組。
配額可以應用於(用戶、客戶端 ID)、用戶或客戶端 ID 組。 對於給定的連接,將應用與該連接匹配的最具體的配額。 配額組的所有連接共用為該組配置的配額。 例如,如果 (user="test-user", client-id="test-client") 的生產配額為 10MB/秒,這將在用戶“test-user”的所有producer實例與客戶端之間共用- ID“測試客戶端”。
配額配置
可以為(用戶、客戶端 ID)、用戶和客戶端 ID 組定義配額配置。 可以在任何需要更高(或更低)配額的配額級別覆蓋預設配額。 該機制類似於每個topic的日誌配置覆蓋。 用戶和 (user, client-id) 配額覆蓋寫入 /config/users 下的 ZooKeeper,client-id 配額覆蓋寫入 /config/clients 下。 這些覆蓋被所有broker讀取並立即生效。 這讓我們無需滾動重啟整個集群即可更改配額。 有關詳細信息,請參見此處 here 。 每個組的預設配額也可以使用相同的機制動態更新。配額配置的優先順序是:
- /config/users/<user>/clients/<client-id>
- /config/users/<user>/clients/<default>
- /config/users/<user>
- /config/users/<default>/clients/<client-id>
- /config/users/<default>/clients/<default>
- /config/users/<default>
- /config/clients/<client-id>
- /config/clients/<default>
網路帶寬配額
網路帶寬配額定義為共用配額的每組客戶端的位元組速率閾值。 預設情況下,每個唯一的客戶端組都會收到由集群配置的以位元組/秒為單位的固定配額。 此配額是基於每個broker定義的。 在客戶端受到限制之前,每組客戶端最多可以發佈/獲取每個broker的 X 位元組/秒。
請求速率配額
請求率配額定義為客戶端可以在配額視窗內使用每個broker的請求處理程式 I/O 線程和網路線程的時間百分比。 n% 的配額表示一個線程的 n%,因此配額超出總容量 ((num.io.threads + num.network.threads) * 100)%。 在被限制之前,每組客戶端可以在配額視窗中的所有 I/O 和網路線程中使用總百分比最高為 n% 的百分比。 由於為 I/O 和網路線程分配的線程數通常基於broker主機上可用的內核數,因此請求率配額代表每組共用配額的客戶端可能使用的 CPU 的總百分比。
執行細節
預設情況下,每個唯一的客戶端組都會收到集群配置的固定配額。 此配額是基於每個broker定義的。 在受到限制之前,每個客戶端都可以使用每個broker的這個配額。 我們決定為每個broker定義這些配額比為每個客戶端設置固定的集群帶寬要好得多,因為這需要一種機制來在所有broker之間共用客戶端配額使用情況。 這可能比配額實施本身更難做到!broker在檢測到配額違規時如何反應? 在我們的解決方案中,broker首先計算將違規客戶置於其配額之下所需的延遲量,並立即返回包含延遲的響應。 在獲取請求的情況下,響應將不包含任何數據。 然後,broker將到客戶端的通道靜音,不再處理來自客戶端的請求,直到延遲結束。 在收到具有非零延遲持續時間的響應後,Kafka 客戶端還將避免在延遲期間向broker發送進一步的請求。 因此,來自受限客戶端的請求會被雙方有效阻止。 即使使用不尊重broker延遲響應的舊客戶端實現,broker通過靜音其套接字通道施加的背壓仍然可以處理行為不端的客戶端的節流。 那些向受限通道發送進一步請求的客戶端只有在延遲結束後才會收到響應。
在多個小視窗(例如 30 個視窗,每個 1 秒)上測量位元組速率和線程利用率,以便快速檢測和糾正配額違規。 通常,具有較大的測量視窗(例如 10 個視窗,每個視窗 30 秒)會導致大量流量突發,隨後出現長時間延遲,這在用戶體驗方面並不是很好。
十一.參考
https://kafka.apache.org/documentation/