前面章節中的例子,用來作為單個節點的伺服器示例是足夠的,但是如果想要把它應用到生產環境,就遠遠不夠了。在Kafka中有很多參數可以控制它的運行和工作。大部分的選項都可以忽略直接使用預設值就好,遇到一些特殊的情況你可以再考慮使用它們。 本文翻譯自《Kafka權威指南》 Broker的一般配置 有很多參 ...
前面章節中的例子,用來作為單個節點的伺服器示例是足夠的,但是如果想要把它應用到生產環境,就遠遠不夠了。在Kafka中有很多參數可以控制它的運行和工作。大部分的選項都可以忽略直接使用預設值就好,遇到一些特殊的情況你可以再考慮使用它們。
本文翻譯自《Kafka權威指南》
Broker的一般配置
有很多參數在部署集群模式時需要引起重視,這些參數都是broker最基本的配置,很多參數都需要依據集群的broker情況而變化。
broker.id
每個kafka的broker都需要有一個整型的唯一標識,這個標識通過broker.id來設置。預設的情況下,這個數字是0,但是它可以設置成任何值。需要註意的是,需要保證集群中這個id是唯一的。這個值是可以任意填寫的,並且可以在必要的時候從broker集群中刪除。比較好的做法是使用主機名相關的標識來做為id,比如,你的主機名當中有數字相關的信息,如hosts1.example.com,host2.example.com,那麼這個數字就可以用來作為broker.id的值。
port
預設啟動kafka時,監聽的是TCP的9092埠,埠號可以被任意修改。如果埠號設置為小於1024,那麼kafka需要以root身份啟動。但是並不推薦以root身份啟動。
zookeeper.connect
這個參數指定了Zookeeper所在的地址,它存儲了broker的元信息。在前一章節的例子 中,Zookeeper是運行在本機的2181埠上,因此這個值被設置成localhost:2181。這個值可以通過分號設置多個值,每個值的格式都是hostname:port/path,其中每個部分的含義如下:
- hostname是zookeeper伺服器的主機名或者ip地址
- port是伺服器監聽連接的埠號
- /path是kafka在zookeeper上的根目錄。如果預設,會使用根目錄。
如果設置了chroot,但是它又不存在,那麼broker會在啟動的時候直接創建。
PS:為什麼使用Chroot路徑
一個普遍認同的最佳實踐就是kafka集群使用chroot路徑,這樣zookeeper可以與其他應用共用使用,而不會有任何衝突。指定多個zookeeper伺服器的地址也是比較好的做法,這樣當zookeeper集群中有節點失敗的時候,還可以正常連接其他的節點。
log.dirs
這個參數用於配置Kafka保存數據的位置,Kafka中所有的消息都會存在這個目錄下。可以通過逗號來指定多個目錄,kafka會根據最少被使用的原則選擇目錄分配新的parition。註意kafka在分配parition的時候選擇的規則不是按照磁碟的空間大小來定的,而是分配的parition的個數多小。
num.recovery.thread.per.data.dir
kafka可以配置一個線程池,線程池的使用場景如下:
- 當正常啟動的時候,開啟每個parition的文檔塊segment
- 當失敗後重啟時,檢查parition的文檔塊
- 當關閉kafka的時候,清除關閉文檔塊
預設,每個目錄只有一個線程。最好是設置多個線程數,這樣在伺服器啟動或者關閉的時候,都可以並行的進行操作。尤其是當非正常停機後,重啟時,如果有大量的分區數,那麼啟動broker將會花費大量的時間。註意,這個參數是針對每個目錄的。比如,num.recovery.threads.per.data.dir設置為8,如果有3個log.dirs路徑,那麼一共會有24個線程。
auto.create.topics.enable
在下麵場景中,按照預設的配置,如果還沒有創建topic,kafka會在broker上自動創建topic:
- 當producer向一個topic中寫入消息時
- 當cosumer開始從某個topic中讀取數據時
- 當任何的客戶端請求某個topic的信息時
在很多場景下,這都會引發莫名其妙的問題。尤其是沒有什麼辦法判斷某個topic是否存在,因為任何請求都會創建該topic。如果你想嚴格的控制topic的創建,那麼可以設置auto.create.topics.enable為false。
預設的主題配置Topic Defaults
kafka集群在創建topic的時候會設置一些預設的配置,這些參數包括分區的個數、消息的容錯機制,這些信息都可以通過管理員工具以topic為單位進行配置。kafka為我們提供的預設配置,基本也能滿足大多數的應用場景了。
PS:使用per.topic進行參數的覆蓋
在之前的版本中,可以通過log.retention.hours.per.topic,log.retention.bytes.per.topic, log.segment.bytes.per.topic等覆蓋預設的配置。現在的版本不能這麼用了,必須通過管理員工具進行設置。
num.partitions
這個參數用於配置新創建的topic有多少個分區,預設是1個。註意partition的個數只可以被增加,不能被減少。這就意味著如果想要減少主題的分區數,那麼就需要重新創建topic。
在第一章中介紹過,kafka通過分區來對topic進行擴展,因此需要使用分區的個數來做負載均衡,如果新增了broker,那麼就會引發重新負載分配。這並不意味著所有的主題的分區數都需要大於broker的數量,因為kafka是支持多個主題的,其他的主題會使用其餘的broker。需要註意的是,如果消息的吞吐量很高,那麼可以通過設置一個比較大的分區數,來分攤壓力。
log.retention.ms
這個參數用於配置kafka中消息保存的時間,也可以使用log.retention.hours,預設這個參數是168個小時,即一周。另外,還支持log.retention.minutes和log.retention.ms。這三個參數都會控制刪除過期數據的時間,推薦還是使用log.retention.ms。如果多個同時設置,那麼會選擇最小的那個。
PS:過期時間和最後修改時間
過期時間是通過每個log文件的最後修改時間來定的。在正常的集群操作中,這個時間其實就是log段文件關閉的時間,它代表了最後一條消息進入這個文件的時間。然而,如果通過管理員工具,在brokers之間移動了分區,那麼這個時候會被刷新,就不准確了。這就會導致本該過期刪除的文件,被繼續保留了。
log.retention.bytes
這個參數也是用來配置消息過期的,它會應用到每個分區,比如,你有一個主題,有8個分區,並且設置了log.retention.bytes為1G,那麼這個主題總共可以保留8G的數據。註意,所有的過期配置都會應用到patition粒度,而不是主題粒度。這也意味著,如果增加了主題的分區數,那麼主題所能保留的數據也就隨之增加了。
PS:通過大小和時間配置數據過期
如果設置了log.retention.bytes和log.retention.ms(或者其他過期時間的配置),只要滿足一個條件,消息就會被刪除。比如,設置log.retention.ms是86400000(一天),並且log.retention.bytes是1000000000(1G),那麼只要修改時間大於一天或者數據量大於1G,這部分數據都會被刪除。
log.segment.bytes
這個參數用來控制log段文件的大小,而不是消息的大小。在kafka中,所有的消息都會進入broker,然後以追加的方式追加到分區當前最新的segment段文件中。一旦這個段文件到達了log.segment.bytes設置的大小,比如預設的1G,這個段文件就會被關閉,然後創建一個新的。一旦這個文件被關閉,就可以理解成這個文件已經過期了。這個參數設置的越小,那麼關閉文件創建文件的操作就會越頻繁,這樣也會造成大量的磁碟讀寫的開銷。
通過生產者發送過來的消息的情況可以判斷這個值的大小。比如,主題每天接收100M的消息,並且log.segment.bytes為預設設置,那麼10天後,這個段文件才會被填滿。由於段文件在沒有關閉的時候,是不能刪除的,log.retention.ms又是預設的設置,那麼這個消息將會在17天後,才過期刪除。因為10天後,段文件才關閉。再過7天,這個文件才算真正過期,才能被清除。
PS:根據時間戳追蹤offset
段文件的大小也會影響到消息消費offset的操作,因為讀取某一時間的offset時,kafka會尋找關閉時間晚於offset時間的那個段文件。然後返回offset所在的段文件的第一個消息的offset,然後按照偏移值查詢目標的消息。因此越小的段文件,通過時間戳消費offset的時候就會越精確。
log.segment.ms
這個參數也可以控制段文件關閉的時間,它定義了經過多長時間段文件會被關閉。跟log.retention.bytes和log.retention.ms類似,log.segment.ms和log.segment.bytes也不是互斥的。kafka會在任何一個條件滿足時,關閉段文件。預設情況下,是不會設置Log.segment.ms的,也就意味著只會通過段文件的大小來關閉文件。
PS:基於時間關閉段文件的磁碟性能需求
當時使用基於時間的段文件限制,對磁碟的要求會很高。這是因為,一般情況下如果段文件大小這個條件不滿足,會按照時間限制來關閉文件,此時如果分區數很多,主題很多,將會有大量的段文件同時關閉,同時創建。
message.max.bytes
這個參數用於限制生產者消息的大小,預設是1000000,也就是1M。生產者在發送消息給broker的時候,如果出錯,會嘗試重發;但是如果是因為大小的原因,那生產者是不會重發的。另外,broker上的消息可以進行壓縮,這個參數可以使壓縮後的大小,這樣能多存儲很多消息。
需要註意的是,允許發送更大的消息會對性能有很大影響。更大的消息,就意味著broker在處理網路連接的時候需要更長的時間,它也會增加磁碟的寫操作壓力,影響IO吞吐量。
PS:配合消息大小的設置
消息大小的參數message.max.bytes一般都和消費者的參數fetch.message.max.bytes搭配使用。如果fetch.message.max.bytes小於message.max.bytes,那麼當消費者遇到很大的消息時,將會無法消費這些消息。同理,在配置cluster時replica.fetch.max.bytes也是一樣的道理。