## 前言 本篇文章主要介紹的關於本人從剛工作到現在使用kafka的經驗,內容非常多,包含了kafka的常用命令,在生產環境中遇到的一些場景處理,kafka的一些web工具推薦等等。由於kafka這塊的記錄以及經驗是從我剛開始使用kafka,從2017年開始,可能裡面有些內容過時,請見諒。溫馨提醒, ...
前言
本篇文章主要介紹的關於本人從剛工作到現在使用kafka的經驗,內容非常多,包含了kafka的常用命令,在生產環境中遇到的一些場景處理,kafka的一些web工具推薦等等。由於kafka這塊的記錄以及經驗是從我剛開始使用kafka,從2017年開始,可能裡面有些內容過時,請見諒。溫馨提醒,本文有3w多字,建議收藏觀看~
Kafka理論知識
kafka基本介紹
Kafka是一種高吞吐量的分散式發佈訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。
Kafka 有如下特性:
-以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間複雜度的訪問性能。
-高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。
- 支持KafkaServer間的消息分區,及分散式消費,同時保證每個Partition內的消息順序傳輸。
- 同時支持離線數據處理和實時數據處理。
- Scale out:支持線上水平擴展。
kafka的術語
- Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker。
-Topic:每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
-Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition。
- Producer:負責發佈消息到Kafka broker。
- Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
- Consumer Group:每個Consumer屬於一個特定的Consumer
Group(可為每個Consumer指定group name,若不指定group
name則屬於預設的group)。
kafka核心Api
kafka有四個核心API
- 應用程式使用producer API發佈消息到1個或多個topic中。
- 應用程式使用consumer API來訂閱一個或多個topic,並處理產生的消息。
- 應用程式使用streams
API充當一個流處理器,從1個或多個topic消費輸入流,並產生一個輸出流到1個或多個topic,有效地將輸入流轉換到輸出流。
- connector
API允許構建或運行可重覆使用的生產者或消費者,將topic鏈接到現有的應用程式或數據系統。
示例圖如下:
kafka面試問題
Kafka的用途有哪些?使用場景如何?
使用kafka的目的是為瞭解耦、非同步、削峰。
消息系統: Kafka
和傳統的消息系統(也稱作消息中間件)都具備系統解耦、冗餘存儲、流量削峰、緩衝、非同步通信、擴展性、可恢復性等功能。與此同時,Kafka
還提供了大多數消息系統難以實現的消息順序性保障及回溯消費的功能。
存儲系統: Kafka
把消息持久化到磁碟,相比於其他基於記憶體存儲的系統而言,有效地降低了數據丟失的風險。也正是得益於
Kafka 的消息持久化功能和多副本機制,我們可以把 Kafka
作為長期的數據存儲系統來使用,只需要把對應的數據保留策略設置為"永久"或啟用主題的日誌壓縮功能即可。
流式處理平臺: Kafka
不僅為每個流行的流式處理框架提供了可靠的數據來源,還提供了一個完整的流式處理類庫,比如視窗、連接、變換和聚合等各類操作。
Kafka中的ISR、AR又代表什麼?ISR的伸縮又指什麼
分區中的所有副本統稱為 AR(Assigned Replicas)。所有與 leader
副本保持一定程度同步的副本(包括 leader 副本在內)組成ISR(In-Sync
Replicas),ISR 集合是 AR 集合中的一個子集。
ISR的伸縮:
leader 副本負責維護和跟蹤 ISR 集合中所有 follower 副本的滯後狀態,當
follower 副本落後太多或失效時,leader 副本會把它從 ISR 集合中剔除。如果
OSR 集合中有 follower 副本"追上"了 leader 副本,那麼 leader 副本會把它從
OSR 集合轉移至 ISR 集合。預設情況下,當 leader 副本發生故障時,只有在
ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR
集合中的副本則沒有任何機會(不過這個原則也可以通過修改相應的參數配置來改變)。
replica.lag.time.max.ms : 這個參數的含義是 Follower 副本能夠落後 Leader
副本的最長時間間隔,當前預設值是 10 秒。
unclean.leader.election.enable:是否允許 Unclean 領導者選舉。開啟
Unclean 領導者選舉可能會造成數據丟失,但好處是,它使得分區 Leader
副本一直存在,不至於停止對外提供服務,因此提升了高可用性。
Kafka中的HW、LEO、LSO、LW等分別代表什麼?
HW 是 High Watermark
的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個
offset 之前的消息。
LSO是LogStartOffset,一般情況下,日誌文件的起始偏移量 logStartOffset
等於第一個日誌分段的 baseOffset,但這並不是絕對的,logStartOffset
的值可以通過 DeleteRecordsRequest 請求(比如使用 KafkaAdminClient 的
deleteRecords()方法、使用 kafka-delete-records.sh
腳本、日誌的清理和截斷等操作進行修改。
如上圖所示,它代表一個日誌文件,這個日誌文件中有9條消息,第一條消息的
offset(LogStartOffset)為0,最後一條消息的 offset 為8,offset
為9的消息用虛線框表示,代表下一條待寫入的消息。日誌文件的 HW
為6,表示消費者只能拉取到 offset 在0至5之間的消息,而 offset
為6的消息對消費者而言是不可見的。
LEO 是 Log End Offset 的縮寫,它標識當前日誌文件中下一條待寫入消息的
offset,上圖中 offset 為9的位置即為當前日誌文件的 LEO,LEO
的大小相當於當前日誌分區中最後一條消息的 offset 值加1。分區 ISR
集合中的每個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即為分區的
HW,對消費者而言只能消費 HW 之前的消息。
LW 是 Low Watermark 的縮寫,俗稱"低水位",代表 AR 集合中最小的
logStartOffset
值。副本的拉取請求(FetchRequest,它有可能觸發新建日誌分段而舊的被清理,進而導致
logStartOffset 的增加)和刪除消息請求(DeleteRecordRequest)都有可能促使 LW
的增長。
Kafka中是怎麼體現消息順序性的?
可以通過分區策略體現消息順序性。
分區策略有輪詢策略、隨機策略、按消息鍵保序策略。
按消息鍵保序策略:一旦消息被定義了 Key,那麼你就可以保證同一個 Key
的所有消息都進入到相同的分區裡面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略
Kafka中的分區器、序列化器、攔截器是否瞭解?它們之間的處理順序是什麼?
序列化器:生產者需要用序列化器(Serializer)把對象轉換成位元組數組才能通過網路發送給
Kafka。而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka
中收到的位元組數組轉換成相應的對象。
分區器:分區器的作用就是為消息分配分區。如果消息 ProducerRecord
中沒有指定 partition 欄位,那麼就需要依賴分區器,根據 key 這個欄位來計算
partition 的值。
Kafka 一共有兩種攔截器:生產者攔截器和消費者攔截器。
生產者攔截器既可以用來在消息發送前做一些準備工作,比如按照某個規則過濾不符合要求的消息、修改消息的內容等,也可以用來在發送回調邏輯前做一些定製化的需求,比如統計類工作。
消費者攔截器主要在消費到消息或在提交消費位移時進行一些定製化的操作。
消息在通過 send() 方法發往 broker
的過程中,有可能需要經過攔截器(Interceptor)、序列化器(Serializer)和分區器(Partitioner)的一系列作用之後才能被真正地發往
broker。攔截器(下一章會詳細介紹)一般不是必需的,而序列化器是必需的。消息經過序列化之後就需要確定它發往的分區,如果消息
ProducerRecord 中指定了 partition 欄位,那麼就不需要分區器的作用,因為
partition 代表的就是所要發往的分區號。
處理順序 :攔截器->序列化器->分區器
KafkaProducer 在將消息序列化和計算分區之前會調用生產者攔截器的 onSend()
方法來對消息進行相應的定製化操作。
然後生產者需要用序列化器(Serializer)把對象轉換成位元組數組才能通過網路發送給
Kafka。
最後可能會被髮往分區器為消息分配分區。
Kafka生產者客戶端的整體結構是什麼樣子的?
整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender
線程(發送線程)。
在主線程中由 KafkaProducer
創建消息,然後通過可能的攔截器、序列化器和分區器的作用之後緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。
Sender 線程負責從 RecordAccumulator 中獲取消息並將其發送到 Kafka 中。
RecordAccumulator 主要用來緩存消息以便 Sender
線程可以批量發送,進而減少網路傳輸的資源消耗以提升性能。
Kafka生產者客戶端中使用了幾個線程來處理?分別是什麼?
整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender
線程(發送線程)。在主線程中由 KafkaProducer
創建消息,然後通過可能的攔截器、序列化器和分區器的作用之後緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender
線程負責從 RecordAccumulator 中獲取消息並將其發送到 Kafka 中。
Kafka的舊版Scala的消費者客戶端的設計有什麼缺陷?
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper
是一個分散式的協調服務框架,Kafka
重度依賴它實現各種各樣的協調管理。將位移保存在 ZooKeeper
外部系統的做法,最顯而易見的好處就是減少了 Kafka Broker
端的狀態保存開銷。
ZooKeeper 這類元框架其實並不適合進行頻繁的寫更新,而 Consumer Group
的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢
ZooKeeper 集群的性能
"消費組中的消費者個數如果超過topic的分區,那麼就會有消費者消費不到數據"這句話是否正確?如果正確,那麼有沒有什麼hack的手段?
一般來說如果消費者過多,出現了消費者的個數大於分區個數的情況,就會有消費者分配不到任何分區。
開發者可以繼承AbstractPartitionAssignor實現自定義消費策略,從而實現同一消費組內的任意消費者都可以消費訂閱主題的所有分區:
消費者提交消費位移時提交的是當前消費到的最新消息的offset還是offset+1?
在舊消費者客戶端中,消費位移是存儲在 ZooKeeper
中的。而在新消費者客戶端中,消費位移存儲在 Kafka
內部的主題__consumer_offsets 中。
當前消費者需要提交的消費位移是offset+1
有哪些情形會造成重覆消費?
Rebalance
一個consumer正在消費一個分區的一條消息,還沒有消費完,發生了rebalance(加入了一個consumer),從而導致這條消息沒有消費成功,rebalance後,另一個consumer又把這條消息消費一遍。
消費者端手動提交
如果先消費消息,再更新offset位置,導致消息重覆消費。
消費者端自動提交
設置offset為自動提交,關閉kafka時,如果在close之前,調用
consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重覆消費。
生產者端
生產者因為業務問題導致的宕機,在重啟之後可能數據會重發
那些情景下會造成消息漏消費?
自動提交
設置offset為自動定時提交,當offset被自動定時提交時,數據還在記憶體中未處理,此時剛好把線程kill掉,那麼offset已經提交,但是數據未處理,導致這部分記憶體中的數據丟失。
生產者發送消息
發送消息設置的是fire-and-forget(發後即忘),它只管往 Kafka
中發送消息而並不關心消息是否正確到達。不過在某些時候(比如發生不可重試異常時)會造成消息的丟失。這種發送方式的性能最高,可靠性也最差。
消費者端
先提交位移,但是消息還沒消費完就宕機了,造成了消息沒有被消費。自動位移提交同理
acks沒有設置為all
如果在broker還沒把消息同步到其他broker的時候宕機了,那麼消息將會丟失
KafkaConsumer是非線程安全的,那麼怎麼樣實現多線程消費?#
線程封閉,即為每個線程實例化一個 KafkaConsumer 對象
一個線程對應一個 KafkaConsumer
實例,我們可以稱之為消費線程。一個消費線程可以消費一個或多個分區中的消息,所有的消費線程都隸屬於同一個消費組。
消費者程式使用單或多線程獲取消息,同時創建多個消費線程執行消息處理邏輯。
獲取消息的線程可以是一個,也可以是多個,每個線程維護專屬的 KafkaConsumer
實例,處理消息則交由特定的線程池來做,從而實現消息獲取與消息處理的真正解耦。具體架構如下圖所示:
簡述消費者與消費組之間的關係
Consumer Group 下可以有一個或多個 Consumer
實例。這裡的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。
Group ID 是一個字元串,在一個 Kafka 集群中,它標識唯一的一個 Consumer
Group。
Consumer Group 下所有實例訂閱的主題的單個分區,只能分配給組內的某個
Consumer 實例消費。這個分區當然也可以被其他的 Group 消費。
當你使用kafka-topics.sh創建(刪除)了一個topic之後,Kafka背後會執行什麼邏輯?
在執行完腳本之後,Kafka 會在 log.dir 或 log.dirs
參數所配置的目錄下創建相應的主題分區,預設情況下這個目錄為/tmp/kafka-logs/。
在 ZooKeeper
的/brokers/topics/目錄下創建一個同名的實節點,該節點中記錄了該主題的分區副本分配方案。示例如下:
[zk: localhost:2181/kafka(CONNECTED) 2] get
/brokers/topics/topic-create
{"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
topic的分區數可不可以增加?如果可以怎麼增加?如果不可以,那又是為什麼?
可以增加,使用 kafka-topics 腳本,結合 --alter
參數來增加某個主題的分區數,命令如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter
--topic <topic_name> --partitions <新分區數>
當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。
首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。在 Rebalance
過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。這是
Rebalance 為人詬病的一個方面。
其次,目前 Rebalance 的設計是所有 Consumer
實例共同參與,全部重新分配所有分區。其實更高效的做法是儘量減少分配方案的變動。
最後,Rebalance 實在是太慢了。
topic的分區數可不可以減少?如果可以怎麼減少?如果不可以,那又是為什麼?
不支持,因為刪除的分區中的消息不好處理。如果直接存儲到現有分區的尾部,消息的時間戳就不會遞增,如此對於
Spark、Flink
這類需要消息時間戳(事件時間)的組件將會受到影響;如果分散插入現有的分區,那麼在消息量很大的時候,內部的數據複製會占用很大的資源,而且在複製期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務性問題,以及分區和副本的狀態機切換問題都是不得不面對的。
創建topic時如何選擇合適的分區數?副本數?
分區
在 Kafka
中,性能與分區數有著必然的關係,在設定分區數時一般也需要考慮性能的因素。對不同的硬體而言,其對應的性能也會不太一樣。
可以使用Kafka 本身提供的用於生產者性能測試的 kafka-producer-
perf-test.sh 和用於消費者性能測試的
kafka-consumer-perf-test.sh來進行測試。
增加合適的分區數可以在一定程度上提升整體吞吐量,但超過對應的閾值之後吞吐量不升反降。如果應用對吞吐量有一定程度上的要求,則建議在投入生產環境之前對同款硬體資源做一個完備的吞吐量相關的測試,以找到合適的分區數閾值區間。
分區數的多少還會影響系統的可用性。如果分區數非常多,如果集群中的某個
broker 節點宕機,那麼就會有大量的分區需要同時進行 leader
角色切換,這個切換的過程會耗費一筆可觀的時間,並且在這個時間視窗內這些分區也會變得不可用。
分區數越多也會讓 Kafka
的正常啟動和關閉的耗時變得越長,與此同時,主題的分區數越多不僅會增加日誌清理的耗時,而且在被刪除時也會耗費更多的時間。
如何設置合理的分區數量
可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分區數 = Tt / max(Tp, Tc)
說明:Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關係更大, 因為Tc的值取決於你拿到消息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。
副本
Producer在發佈消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。
Kafka分配Replica的演算法如下:
將所有Broker(假設共n個Broker)和待分配的Partition排序
將第i個Partition分配到第(imod n)個Broker上
將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
如何保證kafka的數據完整性
生產者不丟數據:
- 設置 acks=all,leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。
2.retries = max ,客戶端會在消息發送失敗時重新發送,直到發送成功為止。為0表示不重新發送。
消息隊列不丟數據:
1.replication.factor 也就是topic的副本數,必須大於1
2.min.insync.replicas 也要大於1,要求一個leader至少感知到有至少一個follower還跟自己保持聯繫
消費者不丟數據:
改為手動提交。
kafka配置參數
kafka配置參數
-
broker.id:broker的id,id是唯一的非負整數,集群的broker.id不能重覆。
-
log.dirs:kafka存放數據的路徑。可以是多個,多個使用逗號分隔即可。
-
port:server接受客戶端連接的埠,預設6667
-
zookeeper.connect:zookeeper集群連接地址。
格式如:zookeeper.connect=server01:2181,server02:2181,server03:2181。
如果需要指定zookeeper集群的路徑位置,可以:zookeeper.connect=server01:2181,server02:2181,server03:2181/kafka/cluster。這樣設置後,在啟動kafka集群前,需要在zookeeper集群創建這個路徑/kafka/cluster。 -
message.max.bytes:server可以接受的消息最大尺寸。預設1000000。
重要的是,consumer和producer有關這個屬性的設置必須同步,否則producer發佈的消息對consumer來說太大。 -
num.network.threads:server用來處理網路請求的線程數,預設3。
-
num.io.threads:server用來處理請求的I/O線程數。這個線程數至少等於磁碟的個數。
-
background.threads:用於後臺處理的線程數。例如文件的刪除。預設4。
-
queued.max.requests:在網路線程停止讀取新請求之前,可以排隊等待I/O線程處理的最大請求個數。預設500。
-
host.name:broker的hostname
如果hostname已經設置的話,broker將只會綁定到這個地址上;如果沒有設置,它將綁定到所有介面,併發布一份到ZK -
advertised.host.name:如果設置,則就作為broker
的hostname發往producer、consumers以及其他brokers -
advertised.port:此埠將給與producers、consumers、以及其他brokers,它會在建立連接時用到;
它僅在實際埠和server需要綁定的埠不一樣時才需要設置。 -
socket.send.buffer.bytes:SO_SNDBUFF 緩存大小,server進行socket
連接所用,預設100*1024。 -
socket.receive.buffer.bytes:SO_RCVBUFF緩存大小,server進行socket連接時所用。預設100
* 1024。 -
socket.request.max.bytes:server允許的最大請求尺寸;這將避免server溢出,它應該小於Java
heap size。 -
num.partitions:如果創建topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的預設數值。預設1。
-
log.segment.bytes:topic
partition的日誌存放在某個目錄下諸多文件中,這些文件將partition的日誌切分成一段一段的;這個屬性就是每個文件的最大尺寸;當尺寸達到這個數值時,就會創建新文件。此設置可以由每個topic基礎設置時進行覆蓋。預設1014*1024*1024 -
log.roll.hours:即使文件沒有到達log.segment.bytes,只要文件創建時間到達此屬性,就會創建新文件。這個設置也可以有topic層面的設置進行覆蓋。預設24*7
-
log.cleanup.policy:log清除策略。預設delete。
-
log.retention.minutes和log.retention.hours:每個日誌文件刪除之前保存的時間。預設數據保存時間對所有topic都一樣。
-
log.retention.minutes 和 log.retention.bytes
都是用來設置刪除日誌文件的,無論哪個屬性已經溢出。這個屬性設置可以在topic基本設置時進行覆蓋。 -
log.retention.bytes:每個topic下每個partition保存數據的總量。
註意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的數據總量。如果log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會造成刪除一個段文件。註意,這項設置可以由每個topic設置時進行覆蓋。 -
log.retention.check.interval.ms:檢查日誌分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。預設5min。
-
log.cleaner.enable:當這個屬性設置為false時,一旦日誌的保存時間或者大小達到上限時,就會被刪除;如果設置為true,則當保存屬性達到上限時,就會進行log
compaction。預設false。 -
log.cleaner.threads:進行日誌壓縮的線程數。預設1。
-
log.cleaner.io.max.bytes.per.second:進行log compaction時,log
cleaner可以擁有的最大I/O數目。這項設置限制了cleaner,以避免干擾活動的請求服務。 -
log.cleaner.io.buffer.size:log
cleaner清除過程中針對日誌進行索引化以及精簡化所用到的緩存大小。最好設置大點,以提供充足的記憶體。預設500*1024*1024。 -
log.cleaner.io.buffer.load.factor:進行log cleaning時所需要的I/O
chunk尺寸。你不需要更改這項設置。預設512*1024。 -
log.cleaner.io.buffer.load.factor:log
cleaning中所使用的hash表的負載因數;你不需要更改這個選項。預設0.9 -
log.cleaner.backoff.ms:進行日誌是否清理檢查的時間間隔,預設15000。
-
log.cleaner.min.cleanable.ratio:這項配置控制log
compactor試圖清理日誌的頻率(假定log compaction是打開的)。預設避免清理壓縮超過50%的日誌。這個比率綁定了備份日誌所消耗的最大空間(50%的日誌備份時壓縮率為50%)。更高的比率則意味著浪費消耗更少,也就可以更有效的清理更多的空間。這項設置在每個topic設置中可以覆蓋。 -
log.cleaner.delete.retention.ms:保存時間;保存壓縮日誌的最長時間;也是客戶端消費消息的最長時間,與log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮後的數據;會被topic創建時的指定時間覆蓋。
-
log.index.size.max.bytes:每個log
segment的最大尺寸。註意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log
segment。預設10*1024*1024。 -
log.index.interval.bytes:當執行一次fetch後,需要一定的空間掃描最近的offset,設置的越大越好,一般使用預設值就可以。預設4096。
-
log.flush.interval.messages:log文件"sync"到磁碟之前累積的消息條數。
因為磁碟IO操作是一個慢操作,但又是一個"數據可靠性"的必要手段,所以檢查是否需要固化到硬碟的時間間隔。需要在"數據可靠性"與"性能"之間做必要的權衡,如果此值過大,將會導致每次"發sync"的時間過長(IO阻塞),如果此值過小,將會導致"fsync"的時間較長(IO阻塞),導致"發sync"的次數較多,這也就意味著整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的消息丟失。 -
log.flush.scheduler.interval.ms:檢查是否需要fsync的時間間隔。預設Long.MaxValue
-
log.flush.interval.ms:僅僅通過interval來控制消息的磁碟寫入時機,是不足的,這個數用來控制"fsync"的時間間隔,如果消息量始終沒有達到固化到磁碟的消息數,但是離上次磁碟同步的時間間隔達到閾值,也將觸發磁碟同步。
-
log.delete.delay.ms:文件在索引中清除後的保留時間,一般不需要修改。預設60000。
-
auto.create.topics.enable:是否允許自動創建topic。如果是true,則produce或者fetch
不存在的topic時,會自動創建這個topic。否則需要使用命令行創建topic。預設true。 -
controller.socket.timeout.ms:partition管理控制器進行備份時,socket的超時時間。預設30000。
-
controller.message.queue.size:controller-to-broker-channles的buffer尺寸,預設Int.MaxValue。
-
default.replication.factor:預設備份份數,僅指自動創建的topics。預設1。
-
replica.lag.time.max.ms:如果一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,並認為這個follower已經掛了,預設10000。
-
replica.lag.max.messages:如果一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,並認為這個follower已經掛了,預設4000。
-
replica.socket.timeout.ms:leader
備份數據時的socket網路請求的超時時間,預設30*1000 -
replica.socket.receive.buffer.bytes:備份時向leader發送網路請求時的socket
receive buffer。預設64*1024。 -
replica.fetch.max.bytes:備份時每次fetch的最大值。預設1024*1024。
-
replica.fetch.max.bytes:leader發出備份請求時,數據到達leader的最長等待時間。預設500。
-
replica.fetch.min.bytes:備份時每次fetch之後回應的最小尺寸。預設1。
-
num.replica.fetchers:從leader備份數據的線程數。預設1。
-
replica.high.watermark.checkpoint.interval.ms:每個replica檢查是否將最高水位進行固化的頻率。預設5000.
-
fetch.purgatory.purge.interval.requests:fetch
請求清除時的清除間隔,預設1000 -
producer.purgatory.purge.interval.requests:producer請求清除時的清除間隔,預設1000
-
zookeeper.session.timeout.ms:zookeeper會話超時時間。預設6000
-
zookeeper.connection.timeout.ms:客戶端等待和zookeeper建立連接的最大時間。預設6000
-
zookeeper.sync.time.ms:zk follower落後於zk leader的最長時間。預設2000
-
controlled.shutdown.enable:是否能夠控制broker的關閉。如果能夠,broker將可以移動所有leaders到其他的broker上,在關閉之前。這減少了不可用性在關機過程中。預設true。
-
controlled.shutdown.max.retries:在執行不徹底的關機之前,可以成功執行關機的命令數。預設3.
-
controlled.shutdown.retry.backoff.ms:在關機之間的backoff時間。預設5000
-
auto.leader.rebalance.enable:如果這是true,控制者將會自動平衡brokers對於partitions的leadership。預設true。
-
leader.imbalance.per.broker.percentage:每個broker所允許的leader最大不平衡比率,預設10。
-
leader.imbalance.check.interval.seconds:檢查leader不平衡的頻率,預設300
-
offset.metadata.max.bytes:允許客戶端保存他們offsets的最大個數。預設4096
-
max.connections.per.ip:每個ip地址上每個broker可以被連接的最大數目。預設Int.MaxValue。
-
max.connections.per.ip.overrides:每個ip或者hostname預設的連接的最大覆蓋。
-
connections.max.idle.ms:空連接的超時限制,預設600000
-
log.roll.jitter.{ms,hours}:從logRollTimeMillis抽離的jitter最大數目。預設0
-
num.recovery.threads.per.data.dir:每個數據目錄用來日誌恢復的線程數目。預設1。
-
unclean.leader.election.enable:指明瞭是否能夠使不在ISR中replicas設置用來作為leader。預設true
-
delete.topic.enable:能夠刪除topic,預設false。
-
offsets.topic.num.partitions:預設50。
由於部署後更改不受支持,因此建議使用更高的設置來進行生產(例如100-200)。 -
offsets.topic.retention.minutes:存在時間超過這個時間限制的offsets都將被標記為待刪除。預設1440。
-
offsets.retention.check.interval.ms:offset管理器檢查陳舊offsets的頻率。預設600000。
-
offsets.topic.replication.factor:topic的offset的備份份數。建議設置更高的數字保證更高的可用性。預設3
-
offset.topic.segment.bytes:offsets topic的segment尺寸。預設104857600
-
offsets.load.buffer.size:這項設置與批量尺寸相關,當從offsets
segment中讀取時使用。預設5242880 -
offsets.commit.required.acks:在offset
commit可以接受之前,需要設置確認的數目,一般不需要更改。預設-1。
kafka生產者配置參數
-
boostrap.servers:用於建立與kafka集群連接的host/port組。
數據將會在所有servers上均衡載入,不管哪些server是指定用於bootstrapping。
這個列表格式:host1:port1,host2:port2,... -
acks:此配置實際上代表了數據備份的可用性。
-
acks=0:
設置為0表示producer不需要等待任何確認收到的信息。副本將立即加到socket
buffer並認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收數據,同時重試配置不會發生作用 -
acks=1:
這意味著至少要等待leader已經成功將數據寫入本地log,但是並沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。 -
acks=all:
這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。 -
buffer.memory:producer可以用來緩存數據的記憶體大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以"block.on.buffer.full"來表明。
-
compression.type:producer用於壓縮數據的壓縮類型。預設是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好。
-
retries:設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。註意,這些重試與客戶端接收到發送錯誤時的重試沒有什麼不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
-
batch.size:producer將試圖批處理消息記錄,以減少請求次數。這將改善client與server之間的性能。這項配置控制預設的批量處理消息位元組數。
-
client.id:當向server發出請求時,這個字元串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字元串,因為沒有任何功能性的目的,除了記錄和跟蹤。
-
linger.ms:producer組將會彙總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這隻有在記錄產生速度大於發送速度的時候才能發生。
-
max.request.size:請求的最大位元組數。這也是對最大記錄尺寸的有效覆蓋。註意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。
-
receive.buffer.bytes:TCP receive緩存大小,當閱讀數據時使用。
-
send.buffer.bytes:TCP send緩存大小,當發送數據時使用。
-
timeout.ms:此配置選項控制server等待來自followers的確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網路延遲。
-
block.on.buffer.full:當我們記憶體緩存用盡時,必須停止接收新消息記錄或者拋出錯誤。
預設情況下,這個設置為真,然而某些阻塞可能不值得期待,因此立即拋出錯誤更好。設置為false則會這樣:producer會拋出一個異常錯誤:BufferExhaustedException,
如果記錄已經發送同時緩存已滿。 -
metadata.fetch.timeout.ms:是指我們所獲取的一些元素據的第一個時間數據。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會拋出異常給客戶端。
-
metadata.max.age.ms:以微秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition
leadership改變。 -
metric.reporters:類的列表,用於衡量指標。實現MetricReporter介面,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於註冊JMX統計
-
metrics.num.samples:用於維護metrics的樣本數。
-
metrics.sample.window.ms:metrics系統維護可配置的樣本數量,在一個可修正的window
size。這項配置配置了視窗大小,例如。我們可能在30s的期間維護兩個樣本。當一個視窗推出後,我們會擦除並重寫最老的視窗。 -
recoonect.backoff.ms:連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反覆重連。
-
retry.backoff.ms:在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死迴圈中。
kafka消費者配置參數
-
group.id:用來唯一標識consumer進程所在組的字元串,如果設置同樣的group
id,表示這些processes都是屬於同一個consumer group。 -
zookeeper.connect:指定zookeeper的連接的字元串,格式是hostname:port,
hostname:port... -
consumer.id:不需要設置,一般自動產生
-
socket.timeout.ms:網路請求的超時限制。真實的超時限制是max.fetch.wait+socket.timeout.ms。預設3000
-
socket.receive.buffer.bytes:socket用於接收網路請求的緩存大小。預設64*1024。
-
fetch.message.max.bytes:每次fetch請求中,針對每次fetch消息的最大位元組數。預設1024*1024
這些位元組將會督導用於每個partition的記憶體中,因此,此設置將會控制consumer所使用的memory大小。
這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。 -
num.consumer.fetchers:用於fetch數據的fetcher線程數。預設1
-
auto.commit.enable:如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用。預設true。
-
auto.commit.interval.ms:consumer向zookeeper提交offset的頻率,單位是秒。預設60*1000。
-
queued.max.message.chunks:用於緩存消息的最大數目,每個chunk必須和fetch.message.max.bytes相同。預設2。
-
rebalance.max.retries:當新的consumer加入到consumer
group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目。如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗並重入。預設4 -
fetch.min.bytes:每次fetch請求時,server應該返回的最小位元組數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。
-
fetch.wait.max.ms:如果沒有足夠的數據能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。預設100
-
rebalance.backoff.ms:在重試reblance之前backoff時間。預設2000
-
refresh.leader.backoff.ms:在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間。預設200
-
auto.offset.reset:zookeeper中沒有初始化的offset時,如果offset是以下值的回應:
-
lastest:自動複位offset為lastest的offset
-
earliest:自動複位offset為earliest的offset
-
none:向consumer拋出異常
-
consumer.timeout.ms:如果沒有消息可用,即使等待特定的時間之後也沒有,則拋出超時異常
-
exclude.internal.topics:是否將內部topics的消息暴露給consumer。預設true。
-
paritition.assignment.strategy:選擇向consumer
流分配partitions的策略,可選值:range,roundrobin。預設range。 -
client.id:是用戶特定的字元串,用來在每次請求中幫助跟蹤調用。它應該可以邏輯上確認產生這個請求的應用。
-
zookeeper.session.timeout.ms:zookeeper 會話的超時限制。預設6000
如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,並且reblance將會產生 -
zookeeper.connection.timeout.ms:客戶端在建立通zookeeper連接中的最大等待時間。預設6000
-
zookeeper.sync.time.ms:ZK follower可以落後ZK leader的最大時間。預設1000
-
offsets.storage:用於存放offsets的地點:
zookeeper或者kafka。預設zookeeper。 -
offset.channel.backoff.ms:重新連接offsets
channel或者是重試失敗的offset的fetch/commit請求的backoff時間。預設1000 -
offsets.channel.socket.timeout.ms:當讀取offset的fetch/commit請求回應的socket
超時限制。此超時限制是被consumerMetadata請求用來請求offset管理。預設10000。 -
offsets.commit.max.retries:重試offset
commit的次數。這個重試只應用於offset commits在shut-down之間。預設5。 -
dual.commit.enabled:如果使用"kafka"作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka)。
在zookeeper-based的offset storage到kafka-based的offset
storage遷移時,這是必須的。對任意給定的consumer
group來說,比較安全的建議是當完成遷移之後就關閉這個選項 -
partition.assignment.strategy:在"range"和"roundrobin"策略之間選擇一種作為分配partitions給consumer
數據流的策略。
迴圈的partition分配器分配所有可用的partitions以及所有可用consumer線程。它會將partition迴圈的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的劃分是確定的分佈。
迴圈分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer
group中每個consumer實例來說都是確定的
kafka ack容錯機制(應答機制)
在Producer(生產者)向kafka集群發送消息,kafka集群會在接受完消息後,給出應答,成功或失敗,如果失敗,producer(生產者)會再次發送,直到成功為止。
producer(生產者)發送數據給kafka集群,kafka集群反饋有3種模式:
-
0:producer(生產者)不會等待kafka集群發送ack,producer(生產者)發送完消息就算成功。
-
1:producer(生產者)等待kafka集群的leader接受到消息後,發送ack。producer(生產者)接收到ack,表示消息發送成功。
-
-1:producer(生產者)等待kafka集群所有包含分區的follower都同步消息成功後,發送ack。producer(生產者)接受到ack,表示消息發送成功。
kafka segment
在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment
file消息數量不一定相等,這種特性方便old segment
file快速被刪除。預設保留7天的數據。
每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。(什麼時候創建,什麼時候刪除)
數據有序性:只有在一個partition分區內,數據才是有序的。
Segment file組成:由2大部分組成,分別為i**ndex file**和data
file,此2個文件一一對應,成對出現,尾碼".index"和".log"分別表示為segment索引文件、數據文件。(在目前最新版本,又添加了另外的約束)。
Segment文件命名規則:partion全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。
索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。
segment機制的作用:
- 可以通過索引快速找到消息所在的位置。
用於超過kafka設置的預設時間,清除比較方便。
kafka從零開始使用
這裡之前寫過一些kafka使用的文章,這裡就不在複製到此文章上面來了,以免文章內容太多太多了。
kafka安裝
文章:
kafka的可視化軟體
kafka-eagle
地址:https://github.com/smartloli/kafka-eagle
下載之後解壓
需要配置環境
Windows環境
KE_HOME = D:\\kafka_eagle\\kafka-eagle-web-1.2.3
LINUX環境
export KE_HOME=/home/jars/kafka_eagle/kafka-eagle-web-1.2.3
配置mysql,執行ke.sql
腳本,然後在D:\kafka_eagle\kafka-eagle-web-1.2.0\conf
中修改system-config.properties 配置文件
zookeeper 服務的配置地址,支持多個集群,多個用逗號隔開
kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=192.169.0.23:2181,192.169.0.24:2181,192.169.0.25:2181
cluster2.zk.list=192.169.2.156:2181,192.169.2.98:2181,192.169.2.188:2181
然後配置mysql服務的地址
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
配置完成之後,在切換到/bin目錄下,windows雙擊ke.bat ,linux輸入 ke.sh
start,啟動程式,然後在瀏覽器輸入ip:port/ke
進入登錄界面,輸入ke資料庫中的ke_users設置的用戶名和密碼,即可查看。
kafka-manager
地址:https://github.com/yahoo/kafka-manager
下載編譯
git clone https://github.com/yahoo/kafka-manager
cd kafka-manager
sbt clean distcd target/
編譯完成之後,解壓該文件
在 conf/application.properties路徑下找到 kafka-manager.zkhosts 配置,添加zookeeper的地址,如果是多個,用逗號隔開。
kafka-manager.zkhosts = master:2181,slave1:2181,slave2:2181
修改完成之後,就可以進行啟動了。
kafka-manager 預設的埠是9000,我們可以通過 -Dhttp.port來指定埠。
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8765 &
啟動成功之後,在瀏覽器輸入地址即可進行訪問了。
Kafka Tool(offset Explorer)
Offset Explorer(以前稱為Kafka Tool)是一個用於管理和使用Apache Kafka ®集群的GUI應用程式。它提供了一個直觀的用戶界面,允許人們快速查看其中的對象 一個 Kafka 集群以及存儲在集群主題中的消息。它包含面向開發人員和管理員的功能。一些主要功能包括
- 快速查看所有 Kafka 集群,包括其代理、主題和使用者
- 查看分區中的消息內容並添加新消息
- 查看消費者的偏移量,包括 Apache Storm Kafka 噴口消費者
- 以漂亮的列印格式顯示 JSON、XML 和 Avro 消息
- 添加和刪除主題以及其他管理功能
- 將分區中的單個消息保存到本地硬碟驅動器
- 編寫自己的插件,允許您查看自定義數據格式
- Offset Explorer 可在 Windows、Linux 和 Mac OS 上運行
demo代碼
文章:
代碼地址:
https://github.com/xuwujing/kafka-study
https://github.com/xuwujing/java-study/tree/master/src/main/java/com/pancm/mq/kafka
kafka生產環境問題排查和解決方案
這裡主要是記錄在使用kafka的時候遇到的一些生產環境問題和解決方案,有的可能不是問題,而是需求,有的問題解決方案按照現在來說不完美,畢竟很多時候,快速解決才是第一要素。總之這些就按照我之前的筆記記錄進行分享吧,如有更好的思路或者解決辦法,歡迎提出!
先介紹一些kafka的常用命令
kafka常用命令
官方文檔: http://kafka.apache.org/quickstart
1.啟動和關閉kafka
bin/kafka-server-start.sh config/server.properties \>\>/dev/null 2\>&1 &
bin/kafka-server-stop.sh
zookeeper啟動命令:
./zookeeper-server-start.sh -daemon
../config/zookeeper.properties
kafka啟用命令:
./kafka-server-start.sh -daemon
../config/server.properties
2.查看kafka集群中的消息隊列和具體隊列
查看集群所有的topic
kafka-topics.sh \--zookeeper master:2181,slave1:2181,slave2:2181 \--list
查看一個topic的信息
kafka-topics.sh \--zookeeper master:2181 \--describe \--topic
1004_INSERT
查看kafka consumer消費的offset
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \--zookeeper
master:2181 \--group groupB \--topic KAFKA_TEST
在kafka中查詢數據
./kafka-console-consumer.sh \--zookeeper 172.16.253.91:2181 \--topic
MO_RVOK \--from-beginning \| grep -c \'13339309600\'
3.創建Topic
partitions指定topic分區數,replication-factor指定topic每個分區的副本數
kafka-topics.sh \--zookeeper master:2181 \--create \--topic t_test
\--partitions 30 \--replication-factor 1
4.生產數據和消費數據
kafka-console-producer.sh \--broker-list master:9092 \--topic t_test
Ctrl+D 退出
kafka-console-consumer.sh \--zookeeper master:2181 \--topic t_test
\--from-beginning
kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic
t_test \--from-beginning
--from-beginning 是表示從頭開始消費
Ctrl+C 退出
5.kafka的刪除命令
1.kafka命令刪除
kafka-topics.sh \--delete \--zookeeper
master:2181,slave1:2181,slave2:2181 \--topic test
註:如果出現 This will have no impact if delete.topic.enable is not set
to true. 表示沒有徹底的刪除,而是把topic標記為:marked for deletion
。可以在server.properties中配置delete.topic.enable=true 來刪除。
2.進入zk刪除
zkCli.sh -server master:2181,slave1:2181,slave2:2181
找到topic所在的目錄:ls /brokers/topics
找到要刪除的topic,執行命令:rmr /brokers/topics/【topic
name】即可,此時topic被徹底刪除。
進入/admin/delete_topics目錄下,找到刪除的topic,刪除對應的信息。
6.添加分區
kafka-topics.sh \--alter \--topic INSERT_TEST1 \--zookeeper master:2181
\--partitions 15
7.查看消費組
查看所有
kafka-consumer-groups.sh \--bootstrap-server master:9092 \--list
查看某一個消費組
kafka-consumer-groups.sh \--bootstrap-server master:9092 \--describe
\--group groupT
8.查看offset的值
最小值:
kafka-run-class.sh kafka.tools.GetOffsetShell \--broker-list master:9092
-topic KAFKA_TEST \--time -2
最大值:
kafka-run-class.sh kafka.tools.GetOffsetShell \--broker-list master:9092
-topic KAFKA_TEST \--time -1
9,查看kafka日誌文件中某一個topic占用的空間
du -lh --max-depth=1 TEST*
遇到的問題
offset下標丟失問題
kafka版本:V1.0
因為某種原因,kafka集群中的topic數據有一段時間(一天左右)沒有被消費,再次消費的時候,所有的消費程式讀取的offset是從頭開始消費,產生了大量重覆數據。
問題原因:offset的過期時間惹的禍,offsets.retention.minutes這個過期時間在kafka低版本的預設配置時間是1天,如果超過1天沒有消費,那麼offset就會過期清理,因此導致數據重覆消費。在2.0之後的版本這個預設值就設置為了7天。
解決辦法:
臨時解決辦法,根據程式最近列印的日誌內容,找到最後消費的offset值,然後批量更改kafka集群的offset。
kafka 的offset偏移量更改
首先通過下麵的命令查看當前消費組的消費情況:
kafka-consumer-groups.sh \--bootstrap-server master:9092 \--group groupA
\--describe
current-offset 和 log-end-offset還有 lag
,分別為當前偏移量,結束的偏移量,落後的偏移量。
然後進行offset更改
這是一個示例,offset(所有分區)更改為100之後
kafka-consumer-groups.sh \--bootstrap-server master:9092 \--group groupA
\--topic KAFKA_TEST2 \--execute \--reset-offsets \--to-offset 100
--group 代表你的消費者分組
--topic 代表你消費的主題
--execute 代表支持複位偏移
--reset-offsets 代表要進行偏移操作
--to-offset 代表你要偏移到哪個位置,是long類型數值,只能比前面查詢出來的小
還一種臨時方案,就是更改代碼,指定kafka的分區和offset,從指定點開始消費!對應的代碼示例也在上述貼出的github鏈接中。
最終解決辦法:將offset的過期時間值(offsets.retention.minutes)設置調大。
Kafka增加節點數據重新分配
背景:為了緩解之前kafka集群服務的壓力,需要新增kafka節點,並且對數據進行重新分配。
解決方案:利用kafka自身的分區重新分配原理進行數據重新分配。需要提前將新增的kafka節點添加到zookeeper集群中,可以在zookeeper裡面通過ls /brokers/ids 查看節點名稱。
1,創建文件
創建一個topics-to-move.json的文件,文件中編輯如下參數,多個topic用逗號隔開。
{\"topics\": \[{\"topic\": \"t1\"},{\"topic\": \"t2\"}\],\"version\":1}
命令示例:
touch topics-to-move.json
vim topics-to-move.json
2,獲取建議數據遷移文本
在${kakfa}/bin 目錄下輸入如下命令,文件和命令可以放在同一級。
命令示例:
./kafka-reassign-partitions.sh \--zookeeper 192.168.124.111:2181
\--topics-to-move-json-file topics-to-move.json \--broker-list
\"111,112,113,114\" \--generate
broker-list
後面的數字就是kafka每個節點的名稱,需要填寫kafka所有集群的節點名稱。
執行完畢之後,複製Proposed partition reassignment configuration
下的文本到一個新的json文件中,命名為reality.json。
3,執行重新分配任務
執行如下命令即可。
./kafka-reassign-partitions.sh \--zookeeper 192.168.124.111:2181
\--reassignment-json-file reality.json \--execute
出現successfully表示執行成功完畢
查看執行的任務進度,輸入以下命令即可:
kafka-reassign-partitions.sh \--zookeeper ip:host,ip:host,ip:host
\--reassignment-json-file reality.json \--verify
kafka集群同步
背景:因機房問題,需要將kafka集群進行遷移,並且保證數據同步。
解決方案:使用MirrorMaker進行同步。
1.介紹
MirrorMaker是為解決Kafka跨集群同步、創建鏡像集群而存在的。下圖展示了其工作原理。該工具消費源集群消息然後將數據又一次推送到目標集群。
2.使用
這裡分為兩個kafka集群,名稱為源kafka集群和目標kafka集群,我們是要把源kafka集群的數據同步到目標kafka集群中,可以指定全部的topic或部分的topic進行同步。
其中同步的topic的名稱須一致,需提前創建好,分區數和副本可以不一致!
主要參數說明:
1\. --consumer.config:消費端相關配置文件
2\. --producer.config:生產端相關配置文件
3\. --num.streams: consumer的線程數 預設1
4\. --num.producers: producer的線程數 預設1
5\. --blacklist: 不需要同步topic的黑名單,支持Java正則表達式
6.--whitelist:需要同步topic的白名單,符合java正則表達式形式
7\. -queue.size:consumer和producer之間緩存的queue size,預設10000
在源kafka集群創建consumer.config和producer.config文件,然後配置如下信息:
consumer.config配置
bootstrap.servers=192.169.2.144:9092
group.id=MW-MirrorMaker
auto.commit.enable=true
auto.commit.interval.ms=1000
fetch.min.bytes=6553600
auto.offset.reset = earliest
max.poll.records = 1000
producer.config配置
bootstrap.servers=192.169.2.249:9092
retries = 3
acks = all
batch.size = 16384
producer.type=sync
batch.num.messages=1000
其中 consumer.config的
bootstrap.servers是源kafka集群的地址,producer.config是目標kafka的地址,可以填寫多個,用逗號隔開!
同步啟動命令示例:
nohup ../bin/kafka-mirror-maker.sh \--consumer.config consumer.config
\--num.streams 10 \--producer.config producer.config ---num.producers 10
\--whitelist \"MT_RVOK_TEST9\" \>/dev/null 2\>&1 &
可以使用jps在進程中查詢得到,查看具體同步信息可以查看kafka消費組的offset得到。
3.測試
用程式往 MT_RVOK_TEST9
先往源kafka(192.169.2.144:9092)發送10000條數據,然後啟動同步命令,查看目標kafka集群(192.169.2.249:9092),同步成功!
內外網kafka穿透(網閘)
背景:因為傳輸原因,需要kafka能夠在內外網傳輸,通過網閘。
解決方案:
1.網閘kafka內外網傳輸必要條件
1.網閘內外網可用,且網閘開放的埠和kakfa開放的埠必須一致,比如kafka預設是9092,那麼網閘開放的埠也是9092;
2.網閘開放埠,必須雙向數據同步,不能只單向傳輸,網閘和外網以及kakfa內網之間互信;
3.kafka配置需要添加額外配置參數,server.properties核心配置如下:
listeners=PLAINTEXT://kafka-cluster:9092
advertised.listeners=PLAINTEXT://kafka-cluster:9092
1.kafka服務、內網訪問服務、外網訪問服務,均需設置ip和功能變數名稱映射。linux在/etc/hosts文件中,添加ip和功能變數名稱映射關係,內網訪問,則ip為kafka內網的i