# 日誌分段切分條件 日誌分段文件切分包含以下4個條件,滿足其一即可: 1. 當前日誌分段文件的大小超過了broker端參數 `log.segment.bytes` 配置的值。`log.segment.bytes`參數的預設值為 `1073741824`,即1GB 2. 當前日誌分段中消息的最小時間 ...
日誌分段切分條件
日誌分段文件切分包含以下4個條件,滿足其一即可:
- 當前日誌分段文件的大小超過了broker端參數
log.segment.bytes
配置的值。log.segment.bytes
參數的預設值為1073741824
,即1GB - 當前日誌分段中消息的最小時間戳與當前系統的時間戳的差值大於
log.roll.ms
或log.roll.hours
參數配置的值。如果同時配置了log.roll.ms
和log.roll.hours
參數,那麼log.roll.ms
的優先順序高,預設情況下,只配置了log.roll.hours
參數,其值為168,即7天。 - 偏移量索引文件或時間戳索引文件的大小達到 broker 端參數
log.index.size.max.bytes
配置的值。log.index.size .max.bytes
的預設值為10485760
,即10MB - 追加的消息的偏移量與當前日誌分段的起始偏移量之間的差值大於
Integer.MAX_VALUE
, 即要追加的消息的偏移量不能轉變為相對偏移量(offset - baseOffset > Integer.MAX_VALUE)。
什麼是Controller
Controller作為Kafka集群中的核心組件,它的主要作用是在Apache ZooKeeper的幫助下管理和協調整個Kafka集群。
Controller與Zookeeper進行交互,獲取與更新集群中的元數據信息。其他broker並不直接與zookeeper進行通信,而是與Controller進行通信並同步Controller中的元數據信息。
Kafka集群中每個節點都可以充當Controller節點,但集群中同時只能有一個Controller節點。
Controller簡單來說,就是kafka集群的狀態管理者
controller競選機制:簡單說,先來先上!
Broker 在啟動時,會嘗試去 ZooKeeper 中創建 /controller 節點。Kafka 當前選舉控制器的規則是:第一個成功創建 /controller 節點的 Broker 會被指定為控制器。
在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責維護整個集群中所有分區和副本的狀態及分區leader的選舉。
當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責分區的重新分配。
Kafka中的控制器選舉的工作依賴於Zookeeper,成功競選為控制器的broker會在Zookeeper中創建/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定為1,brokerid表示成為控制器的broker的id編號,timestamp表示競選成為控制器時的時間戳。
在任意時刻,集群中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取zookeeper上的/controller節點的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經有其它broker節點成功競選為控制器,所以當前broker就會放棄競選;如果Zookeeper中不存在/controller這個節點,或者這個節點中的數據異常,那麼就會嘗試去創建/controller這個節點,當前broker去創建節點的時候,也有可能其他broker同時去嘗試創建這個節點,只有創建成功的那個broker才會成為控制器,而創建失敗的broker則表示競選失敗。每個broker都會在記憶體中保存當前控制器的brokerid值,這個值可以標識為activeControllerId。
controller的職責
- 監聽partition相關變化
對Zookeeper中的/admin/reassign_partitions節點註冊PartitionReassignmentListener,用來處理分區重分配的動作。
對Zookeeper中的/isr_change_notification節點註冊IsrChangeNotificetionListener,用來處理ISR集合變更的動作。
對Zookeeper中的/admin/preferred-replica-election節點添加PreferredReplicaElectionListener,用來處理優先副本選舉。
- 監聽topic增減變化
對Zookeeper中的/brokers/topics節點添加TopicChangeListener,用來處理topic增減的變化;
對Zookeeper中的/admin/delete_topics節點添加TopicDeletionListener,用來處理刪除topic的動作
- 監聽broker相關的變化
對Zookeeper中的/brokers/ids/節點添加BrokerChangeListener,用來處理broker增減的變化
- 更新集群的元數據信息
從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的信息併進行相應的管理。
對各topic所對應的Zookeeper中的/brokers/topics/[topic]節點添加PartitionModificationsListener,用來監聽topic中的分區分配變化。並將最新信息同步給其他所有broker。
- 啟動並管理分區狀態機和副本狀態機。
- 如果參數auto.leader.rebalance.enable設置為true,則還會開啟一個名為“auto-leader-rebalance-task”的定時任務來負責維護分區的leader副本的均衡。
分區的負載分佈
客戶端請求創建一個topic時,每一個分區副本在broker上的分配,是由集群controller來決定;
結論:裡面會創建出來兩個隨機數
第一個隨機數確定0號分區leader的位置,往後1號分區2號分區的leader依次往後順延1
第二個隨機數確定每個分區的第一個副本的位置 在leader所在機器上往後順延(隨機數+1)台機器,該台機器就是第一個副本的位置,剩餘副本依次往後順延1
// 舉例:
// broker_id = 0~19 一共20台機器
// 分區數20,副本數10
// 第一個隨機數:19
// 第二個隨機數:0
(0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8))
(1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
(2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
(3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
(4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
(5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
(6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
(7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16))
(9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17))
(10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18))
(11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19))
(12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0))
(13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1))
(14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2))
(15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3))
(16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4))
(17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5))
(18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6))
(19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))
// 其分佈策略源碼如下:
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分區的個數 10
replicationFactor: Int, //副本的個數 5
brokerList: Seq[Int],//broker的集合 8 0~7
fixedStartIndex: Int//預設值是-1 固定開始的索引位置
startPartitionId: Int): Map[Int, Seq[Int]] //預設值是-1 分區開始的位置
= {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){
nextReplicaShift += 1
}
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1) {
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
-
副本因數不能大於 Broker 的個數(報錯:Replication factor: 4 larger than available brokers: 3.);
-
partition_0的第1個副本(leader副本)放置位置是隨機從 brokerList 選擇的;
-
其他分區的第1個副本(leader)放置位置相對於paritition_0分區依次往後移(也就是如果我們有5個 Broker,5個分區,假設partition0分區放在broker4上,那麼partition1將會放在broker5上;patition2將會放在broker1上;partition3在broker2,依次類);
-
各分區剩餘的副本相對於分區前一個副本偏移隨機數nextReplicaShift+1,然後後面的副本依次加1
分區Leader的選舉機制
分區 leader 副本的選舉由控制器controller負責具體實施。
當創建分區(創建主題或增加分區都有創建分區的動作)或Leader下線(此時分區需要選舉一個新的leader上線來對外提供服務)的時候都需要執行 leader 的選舉動作。
選舉策略:按照 ISR集合中副本的順序查找第一個存活的副本,並且這個副本在 ISR 集合中
一個分區的AR集合在partition分配的時候就被指定,並且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分區的 ISR 集合中副本的順序可能會改變;
生產者原理解析
生產者工作流程圖:
一個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender 線程 。
在主線程中由kafkaProducer創建消息,然後通過可能的攔截器、序列化器和分區器的作用之後緩存到消息累加器(RecordAccumulator, 也稱為消息收集器)中。
Sender 線程負責從RecordAccumulator 獲取消息並將其發送到 Kafka 中;
RecordAccumulator主要用來緩存消息以便Sender 線程可以批量發送,進而減少網路傳輸的資源消耗以提升性能。RecordAccumulator緩存的大小可以通過生產者客戶端參數buffer.memory
配置,預設值為33554432B
,即32M。如果生產者發送消息的速度超過發送到伺服器的速度,則會導致生產者空間不足,這個時候 KafkaProducer.send()方法調用要麼被阻塞,要麼拋出異常,這個取決於參數max.block.ms
的配置,此參數的預設值為60000
,即60秒。
主線程中發送過來的消息都會被迫加到 RecordAccumulator 的某個雙端隊列( Deque )中,
RecordAccumulator內部為每個分區都維護了一個雙端隊列,即Deque<ProducerBatch>。
消息寫入緩存時,追加到雙端隊列的尾部;
Sender讀取消息時,從雙端隊列的頭部讀取。註意:ProducerBatch 是指一個消息批次;
與此同時,會將較小的 ProducerBatch 湊成一個較大 ProducerBatch ,也可以減少網路請求的次數以提升整體的吞吐量。
ProducerBatch 大小和batch.size
參數也有著密切的關係。當一條消息(ProducerRecord ) 流入 RecordAccumulator 時,會先尋找與消息分區所對應的雙端隊列(如果沒有則新建),再從這個雙端隊列的尾部獲取一個ProducerBatch (如果沒有則新建),查看 ProducerBatch中是否還可以寫入這個ProducerRecord,如果可以寫入就直接寫入,如果不可以則需要創建一個新的Producer Batch。在新建 ProducerBatch時評估這條消息的大小是否超過 batch.size
參數大小,如果不超過,那麼就以 batch.size
參數的大小來創建 ProducerBatch。
如果生產者客戶端需要向很多分區發送消息, 則可以將buffer.memory參數適當調大以增加整體的吞吐量。
Sender從 RecordAccumulator 獲取緩存的消息之後,會進一步將<分區,Deque<Producer Batch>的形式轉變成<Node,List<ProducerBatch>>的形式,其中Node表示Kafka集群broker節點。對於網路連接來說,生產者客戶端是與具體broker節點建立的連接,也就是向具體的broker節點發送消息,而並不關心消息屬於哪一個分區;而對於KafkaProducer的應用邏輯而言,我們只關註向哪個分區中發送哪些消息,所以在這裡需要做一個應用邏輯層面到網路I/O層面的轉換。
在轉換成<Node, List<ProducerBatch>>的形式之後, Sender會進一步封裝成<Node,Request> 的形式,這樣就可以將 Request 請求發往各個Node了,這裡的Request是Kafka各種協議請求;
請求在從sender線程發往Kafka之前還會保存到InFlightRequests中,InFlightRequests保存對象的具體形式為 Map<Nodeld, Deque<request>>,它的主要作用是緩存了已經發出去但還沒有收到服務端響應的請求(Nodeld 是一個 String 類型,表示節點的 id 編號)。與此同時,InFlightRequests 還提供了許多管理類的方法,並且通過配置參數還可以限制每個連接(也就是客戶端與 Node之間的連接)最多緩存的請求數。這個配置參數為 max.in.flight.request.per.connection
,預設值為5,即每個連接最多只能緩存5個未響應的請求,超過該數值之後就不能再向這個連接發送更多的請求了,除非有緩存的請求收到了響應( Response )。通過比較 Deque<Request> 的size與這個參數的大小來判斷對應的 Node中是否己經堆積了很多未響應的消息,如果真是如此,那麼說明這個 Node 節點負載較大或網路連接有問題,再繼續發送請求會增大請求超時的可能。
Producer往Broker發送消息應答機制
kafka 在 producer 裡面提供了消息確認機制。我們可以通過配置來決定消息發送到對應分區的幾個副本才算消息發送成功。可以在構造producer 時通過acks參數指定(在 0.8.2.X 前是通過 request.required.acks 參數設置的)。這個參數支持以下三種值:
-
acks = 0:意味著如果生產者能夠通過網路把消息發送出去,那麼就認為消息已成功寫入 kafka 。在這種情況下還是有可能發生錯誤,比如發送的對象不能被序列化或者網卡發生故障,但如果是分區離線或整個集群長時間不可用,那就不會收到任何錯誤。在 acks=0 模式下的運行速度是非常快的(這就是為什麼很多基準測試都是基於這個模式),你可以得到驚人的吞吐量和帶寬利用率,不過如果選擇了這種模式,大概率會丟失一些消息。
-
acks = 1:意味著leader 在收到消息並把它寫入到分區數據文件(不一定同步到磁碟上)時會返回確認或錯誤響應。在這個模式下,如果發生正常的 leader 選舉,生產者會在選舉時收到一個 LeaderNotAvailableException 異常,如果生產者能恰當地處理這個錯誤,它會重試發送悄息,最終消息會安全到達新的 leader 那裡。不過在這個模式下仍然有可能丟失數據,比如消息已經成功寫入 leader,但在消息被覆制到 follower 副本之前 leader發生崩潰。
-
acks = all(這個和 request.required.acks = -1 含義一樣):意味著 leader 在返回確認或錯誤響應之前,會等待所有同步副本都收到悄息。如果和 min.insync.replicas 參數結合起來,就可以決定在返回確認前至少有多少個副本能夠收到悄息,生產者會一直重試直到消息被成功提交。不過這也是最慢的做法,因為生產者在繼續發送其他消息之前需要等待所有副本都收到當前的消息。
acks | 含義 |
---|---|
0 | Producer往集群發送數據不需要等到集群的確認信息,不確保消息發送成功。安全性最低但是效率最高。 |
1 | Producer往集群發送數據只要 leader成功寫入消息就可以發送下一條,只確保Leader 接收成功。 |
-1 或 all | Producer往集群發送數據需要所有的ISR Follower 都完成從 Leader 的同步才會發送下一條,確保 Leader發送成功和所有的副本都成功接收。安全性最高,但是效率最低。 |
生產者將acks設置為all,是否就一定不會丟數據呢?
否!如果在某個時刻ISR列表只剩leader自己了,那麼就算acks=all,收到這條數據還是只有一個點;
可以配合另外一個參數緩解此情況: 最小同步副本數>=2
BROKER端參數: min.insync.replicas(預設1)
生產者的ack=all,也不能完全保證數據發送的100%可靠性
為什麼?因為,如果服務端目標partition的同步副本只有leader自己了,此時,它收到數據就會給生產者反饋成功!
可以修改服務端的一個參數(分區最小ISR數[min.insync.replicas]>=2),來避免此問題;
其他的生產者參數
- acks
acks是控制kafka服務端向生產者應答消息寫入成功的條件;生產者根據得到的確認信息,來判斷消息發送是否成功;
- max.request.size
這個參數用來限制生產者客戶端能發送的消息的最大值,預設值為 1048576B ,即 lMB
一般情況下,這個預設值就可以滿足大多數的應用場景了。
這個參數還涉及一些其它參數的聯動,比如 broker 端(topic級別參數)的 message.max.bytes參數(預設1000012),如果配置錯誤可能會引起一些不必要的異常;比如將 broker 端的 message.max.bytes 參數配置為10B ,而 max.request.size參數配置為20B,那麼當發送一條大小為 15B 的消息時,生產者客戶端就會報出異常;
- retries和retry.backoff.ms
retries參數用來配置生產者重試的次數,預設值為2147483647,即在發生異常的時候不進行任何重試動作。
消息在從生產者發出到成功寫入伺服器之前可能發生一些臨時性的異常,比如網路抖動、 leader 副本的選舉等,這種異常往往是可以自行恢復的,生產者可以通過配置 retries大於0的值,以此通過內部重試來恢復而不是一味地將異常拋給生產者的應用程式。如果重試達到設定的次數,那麼生產者就會放棄重試並返回異常。重試還和另一個參數 retry.backoff.ms 有關,這個參數的預設值為100,它用來設定兩次重試之間的時間間隔,避免無效的頻繁重試 。如果將 retries參數配置為非零值,並且 max .in.flight.requests.per.connection 參數配置為大於1的值,那可能會出現錯序的現象:如果批次1消息寫入失敗,而批次2消息寫入成功,那麼生產者會重試發送批次1的消息,此時如果批次1的消息寫入成功,那麼這兩個批次的消息就出現了錯序。
對於某些應用來說,順序性非常重要 ,比如MySQL binlog的傳輸,如果出現錯誤就會造成非常嚴重的後果;一般而言,在需要保證消息順序的場合建議把參數max.in.flight.requests.per.connection 配置為1 ,而不是把retries配置為0,不過這樣也會影響整體的吞吐。
- compression.type
這個參數用來指定消息的壓縮方式,預設值為“none",即預設情況下,消息不會被壓縮。該參數還可以配置為 "gzip","snappy" 和 "lz4"。對消息進行壓縮可以極大地減少網路傳輸、降低網路I/O,從而提高整體的性能 。消息壓縮是一種以時間換空間的優化方式,如果對時延有一定的要求,則不推薦對消息進行壓縮;
- batch.size
每個Batch要存放batch.size大小的數據後,才可以發送出去。比如說batch.size預設值是16KB,那麼裡面湊夠16KB的數據才會發送。理論上來說,提升batch.size的大小,可以允許更多的數據緩衝在recordAccumulator裡面,那麼一次Request發送出去的數據量就更多了,這樣吞吐量可能會有所提升。但是batch.size也不能過大,要是數據老是緩衝在Batch里遲遲不發送出去,那麼發送消息的延遲就會很高。一般可以嘗試把這個參數調節大些,利用生產環境發消息負載測試一下。
- linger.ms
這個參數用來指定生產者發送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 時間,預設值為0。生產者客戶端會在ProducerBatch填滿或等待時間超過linger.ms 值時發送出去。
增大這個參數的值會增加消息的延遲,但是同時能提升一定的吞吐量。
- enable.idempotence
冪等性,就是一個操作重覆做,也不會影響最終的結果!
int a = 1;
a++; // 非冪等操作
val map = new HashMap()
map.put(“a”,1); // 冪等操作
在kafka中,同一條消息,生產者如果多次重試發送,在伺服器中的結果如果還是只有一條,這就是具備冪等性;否則,就不具備冪等性!
- partitioner.class
用來指定分區器,預設:org.apache.kafka.internals.DefaultPartitioner
預設分區器的分區規則:
- 如果數據中有key,則按key的murmur hash值 % topic分區總數得到目標分區
- 如果數據只有value,則在各個分區間輪詢(老版本,新版本是new出來的一個隨機數)
自定義partitioner需要實現org.apache.kafka.clients.producer.Partitioner介面
消費者組再均衡分區分配策略
會觸發rebalance(消費者)的事件可能是如下任意一種:
- 有新的消費者加入消費組。
- 有消費者宕機下線,消費者並不一定需要真正下線,例如遇到長時間的 GC 、網路延遲導致消費者長時間未向GroupCoordinator發送心跳等情況時,GroupCoordinator 會認為消費者己下線。
- 有消費者主動退出消費組(發送LeaveGroupRequest 請求):比如客戶端調用了unsubscrible()方法取消對某些主題的訂閱。
- 消費組所對應的 GroupCoorinator節點發生了變更。
- 消費組內所訂閱的任一主題或者主題的分區數量發生變化。
將分區的消費權從一個消費者移到另一個消費者稱為再均衡(rebalance),如何rebalance也涉及到分區分配策略。
kafka有兩種的分區分配策略:range(預設) 和 roundrobin(新版本中又新增了另外2種)
我們可以通過partition.assignment.strategy
參數選擇 range 或 roundrobin。
partition.assignment.strategy
參數預設的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
Range Strategy
- 先將消費者按照client.id字典排序,然後按topic逐個處理;
- 針對一個topic,將其partition總數/消費者數得到商n和 餘數m,則每個consumer至少分到n個分區,且前m個consumer每人多分一個分區;
例1:
假設有TOPIC_A有5個分區,由3個consumer(C1,C2,C3)來消費;5/3得到商1,餘2,則每個消費者至少分1個分區,前兩個消費者各多1個分區C1: 2個分區,C2:2個分區,C3:1個分區
接下來,就按照“區間”進行分配:
TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_A_3 TOPIC_A-4
C1: TOPIC_A-0, TOPIC_A-1
C2 : TOPIC_A-2, TOPIC_A_3
C3: TOPIC_A-4
例2:
假設TOPIC_A有5個分區,TOPIC_B有3個分區,由2個consumer(C1,C2)來消費
- 先分配TOPIC_A:
5/2得到商2,餘1,則C1有3個分區,C2有2個分區,得到結果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4
- 再分配TOPIC_B
3/2得到商1,餘1,則C1有2個分區,C2有1個分區,得到結果
C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2
- 最終分配結果:
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
Round-Robin Strategy
- 將所有主題分區組成TopicAndPartition列表,並對TopicAndPartition列表按照其hashCode 排序
- 然後,以輪詢的方式分配給各消費者
以上述“例2”來舉例:
- 先對TopicPartition的hashCode排序,假如排序結果如下:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
- 然後按輪詢方式分配
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3
C3 TOPIC_A-4
Sticky Strategy
對應的類叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特點:
- 要去達成最大化的均衡
- 儘可能保留各消費者原來分配的分區
再均衡的過程中,還是會讓各消費者先取消自身的分區,然後再重新分配(只不過是分配過程中會儘量讓原來屬於誰的分區依然分配給誰)
Cooperative Sticky Strategy
對應的類叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特點:
- 邏輯與sticky策略一致
- 支持cooperative再均衡機制(再均衡的過程中,不會讓所有消費者取消掉所有分區然後再進行重分配)
費者組再均衡流程
消費組在消費數據的時候,有兩個角色進行組內的各事務的協調;
角色1: Group Coordinator (組協調器) 位於服務端(就是某個broker)
組協調器的定位:
coordinator在我們組記偏移量的__consumer_offsets分區的leader所在broker上
查找Group Coordinator的方式:
先根據消費組groupid的hashcode值計算它應該所在__consumer_offsets 中的分區編號; 分區數
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount為__consumer_offsets的分區總數,這個可以通過broker端參數offset.topic.num.partitions來配置,預設值是50;
找到對應的分區號後,再尋找此分區leader副本所在broker節點,則此節點即為自己的Grouping Coordinator;
角色2: Group Leader (組長) 位於消費端(就是消費組中的某個消費者)
組長的定位:隨機選的哦!!!
GroupCoordinator介紹
每個消費組在服務端對應一個GroupCoordinator其進行管理,GroupCoordinator是Kafka服務端中用於管理消費組的組件。
消費者客戶端中由ConsumerCoordinator組件負責與GroupCoordinator行交互;
ConsumerCoordinator和GroupCoordinator最重要的職責就是負責執行消費者rebalance操作
再均衡監聽器
如果想控制消費者在發生再均衡時執行一些特定的工作,可以通過訂閱主題時註冊“再均衡監聽器”來實現;
場景舉例:在發生再均衡時,處理消費位移
如果A消費者消費掉的一批消息還沒來得及提交offset,而它所負責的分區在rebalance中轉移給了B消費者,則有可能發生數據的重覆消費處理。此情形下,可以通過再均衡監聽器做一定程度的補救;
代碼示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
/**
* 消費組再均衡觀察
*/
public class ConsumerDemo2 {
public static void main(String[] args) {
//1.創建kafka的消費者對象,附帶著把配置文件搞定
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.訂閱主題(確定需要消費哪一個或者多個主題)
//我現在想看看如果我的消費者組裡面,多了一個消費者或者少了一個消費者,他有沒有給我做再均衡
consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
/**
* 這個方法是將原來的分配情況全部取消,或者說把所有的分區全部回收了
* 這個全部取消很噁心,原來的消費者消費的好好的,他一下子就給他全部停掉了
* @param collection
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("我原來的均衡情況是:"+collection + "我已經被回收了!!");
}
/**
* 這個方法是當上面的分配情況全部取消以後,調用這個方法,來再次分配,這是在均衡分配後的情況
* @param collection
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
System.out.println("我是重新分配後的結果:"+collection);
}
});
while (true){
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
}
}
}
kafka系統的CAP保證
CAP理論作為分散式系統的基礎理論,它描述的是一個分散式系統在以下三個特性中:
- 一致性(Consistency)
- 可用性(Availability)
- 分區容錯性(Partition tolerance)
最多滿足其中的兩個特性。也就是下圖所描述的。分散式系統要麼滿足CA,要麼CP,要麼AP。無法同時滿足CAP。
分區容錯性:指的分散式系統中的某個節點或者網路分區出現了故障的時候,整個系統仍然能對外提供滿足一致性和可用性的服務。也就是說部分故障不影響整體使用。事實上我們在設計分散式系統時都會考慮到bug,硬體,網路等各種原因造成的故障,所以即使部分節點或者網路出現故障,我們要求整個系統還是要繼續使用的(不繼續使用,相當於只有一個分區,那麼也就沒有後續的一致性和可用性了)
可用性:一直可以正常的做讀寫操作。簡單而言就是客戶端一直可以正常訪問並得到系統的正常響應。用戶角度來看就是不會出現系統操作失敗或者訪問超時等問題。
一致性:在分散式系統完成某寫操作後任何讀操作,都應該獲取到該寫操作寫入的那個最新的值。相當於要求分散式系統中的各節點時時刻刻保持數據的一致性。
Kafka 作為一個商業級消息中間件,數據可靠性和可用性是優先考慮的重點,兼顧數據一致性;
參考文檔:https://www.cnblogs.com/lilpig/p/16840963.html
冪等性
冪等性要點
Kafka 0.11.0.0 版本開始引入了冪等性與事務這兩個特性,以此來實現 EOS ( exactly once semantics ,精確一次處理語義)
生產者在進行發送失敗後的重試時(retries),有可能會重覆寫入消息,而使用 Kafka冪等性功能之後就可以避免這種情況。
開啟冪等性功能,只需要顯式地將生產者參數 enable.idempotence
設置為 true (預設值為 false):
props.put("enable.idempotence",true);
在開啟冪等性功能時,如下幾個參數必須正確配置:
- retries > 0
- max.in.flight.requests.per.connection<=5
- acks = -1
如有違反,則會拋出ConfigException異常
;
kafka冪等性實現機制
- 每一個producer在初始化時會生成一個producer_id,併為每個目標分區維護一個“消息序列號”;
- producer每發送一條消息,會將<producer_id,分區>對應的“序列號”加1
- broker端會為每一對{producer_id,分區}維護一個序列號,對於每收到的一條消息,會判斷服務端的SN_OLD和接收到的消息中的SN_NEW進行對比:
- 如果SN_OLD + 1 == SN_NEW,正常;
- 如果SN_NEW<SN_OLD+1,說明是重覆寫入的數據,直接丟棄
- 如果SN_NEW>SN_OLD+1,說明中間有數據尚未寫入,或者是發生了亂序,或者是數據丟失,將拋出嚴重異常:OutOfOrderSequenceException
producer.send(“aaa”)
消息aaa就擁有了一個唯一的序列號, 如果這條消息發送失敗,producer內部自動重試(retry),此時序列號不變;
producer.send(“bbb”)
消息bbb擁有一個新的序列號
註意:kafka只保證producer單個會話中的單個分區冪等;
kafka事務(偽事務)
事務要點知識
- Kafka的事務控制原理
主要原理:
開始事務-->發送一個ControlBatch消息(事務開始)
提交事務-->發送一個ControlBatch消息(事務提交)
放棄事務-->發送一個ControlBatch消息(事務終止)
- 開啟事務的必須配置參數(不支持數據得回滾,但是我能做到,一榮俱榮,一損俱損)
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// acks
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
// 生產者的重試次數
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
// 飛行中的請求緩存最大數量
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
// 開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
// 設置事務id
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");
事務控制的代碼模板
// 初始化事務
producer.initTransaction( )
// 開啟事務
producer.beginTransaction( )
// 幹活
// 提交事務
producer.commitTransaction( )
// 異常回滾(放棄事務) catch裡面
producer.abortTransaction( )
消費者api是會拉取到尚未提交事務的數據的;只不過可以選擇是否讓用戶看到!
是否讓用戶看到未提交事務的數據,可以通過消費者參數來配置:
isolation.level=read_uncommitted(預設值)
isolation.level=read_committed
- kafka還有一個“高級”事務控制,只針對一種場景:
用戶的程式,要從kafka讀取源數據,數據處理的結果又要寫入kafka
kafka能實現端到端的事務控制(比起上面的“基礎”事務,多了一個功能,通過producer可以將consumer的消費偏移量綁定到事務上提交)
producer.sendOffsetsToTransaction(offsets,consumer_id)
事務api示例
為了實現事務,應用程式必須提供唯一transactional.id,並且開啟生產者的冪等性
properties.put ("transactional.id","transactionid00001");
properties.put ("enable.idempotence",true);
“消費kafka-處理-生產結果到kafka”典型場景下的代碼結構示例:
package com.doit.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Exercise_kafka2kafka {
public static void main(String[] args) {
Properties props = new Properties();
//消費者的
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shouwei");
//自動提交偏移量
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//寫生產者的一些屬性
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//設置ack 開啟冪等性必須設置的三個參數
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
//開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
//開啟事務
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"id_fro_39_19");
//消費數據
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//初始化事務
producer.initTransactions();
//訂閱主題
consumer.subscribe(Arrays.asList("eventlog"));
while (true){
//拉取數據
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
try {
//開啟事務
producer.beginTransaction();
for (ConsumerRecord<String, String> record : poll) {
String value = record.value();
//將value的值寫入到另外一個topic中
producer.send(new ProducerRecord<String,String>("k2k",value));
}
producer.flush();
//提交偏移量
consumer.commitAsync();
//提交事務
producer.commitTransaction();
} catch (ProducerFencedException e) {
//放棄事務
producer.abortTransaction();
}
}
}
}
Kafka速度快的原因
- 消息順序追加(磁碟順序讀寫比記憶體的隨機讀寫還快)
- 頁緩存等技術(數據交給操作系統的頁緩存,並不真正刷入磁碟;而是定期刷入磁碟)
使用Zero-Copy (零拷貝)技術來進一步提升性能;
零拷貝是指將數據直接從磁碟文件複製到網卡設備中,而不需要經由應用程式之手;
零拷貝大大提高了應用程式的性能,減少了內核和用戶模式之間的上下文切換;對於Linux系統而言,零拷貝技術依賴於底層的 sendfile( )方法實現;對應於Java 語言,FileChannal.transferTo( )方法的底層實現就是 sendfile( )方法;