http://blog.talkingdata.net/?p=3165背景當時的現狀:開始使用Kafka的時候,使用的版本是0.7.2,當時的目的是為了替代kestrel,主要是使用Kafka來做消息傳輸中間件。Kafka解決了我們當時使用Kestrel遇到的最大的三個問題:吞吐量、數據量、一份數據...
http://blog.talkingdata.net/?p=3165
- 背景
當時的現狀:開始使用Kafka的時候,使用的版本是0.7.2,當時的目的是為了替代kestrel,主要是使用Kafka來做消息傳輸中間件。Kafka解決了我們當時使用Kestrel遇到的最大的三個問題:吞吐量、數據量、一份數據多次消費。
- 為什麼要升級
相比其它開源項目,Kafka的升級比較麻煩,其根本原因主要是作為消息傳輸中間件,涉及的系統多。既然升級麻煩,而且Kafka 0.7在這一年多來運行穩定,性能優異,那麼我們為什麼要升級呢?
其實之所以決定升級Kafka,主要有兩個原因,一是因為Kafka 0.7的scala版本是2.8,目前使用的多數開源框架(Spark、Play等)都是基於2.10的scala了,而scala 2.10和2.8兩個版本是不相容的。二,是因為之前曾發生過Kafka伺服器RAID卡損壞的故障,期待Kafka 0.8的Replication功能。
- 升級中遇到的問題及解決方案
1. 配置問題:消費者找不到Broker
2. 問題描述:消費者在消費數據的時候,連不上Broker
3. 問題原因:Broker在正常啟動之後會在zookeeper中註冊自己的信息。消費者會根據這裡面的host和port去連接broker,host是在server.properties配置host.name配置的值,這個值如果不配置,那麼在Zookeeper中存放的就是Kafka這台伺服器的主機名而不是ip,所以消費者才會連不上。而在Kafka0.7中,類似的配置叫hostname,這個值如果不配置,它會調用InetAddress.getLocalHost()去獲取,獲取的值不一定是你想要的,在我們當時它獲取恰好是Kafka伺服器的ip。
4. 解決方案(其中一種)
修改server.properties中host.name的配置,把它改成ip。
修改消費者所在機器的hosts文件,加入Kafka主機名與ip的映射。
使用DNS(推薦)
- 數據去哪了
1. 問題描述:
生產者生產10000條數據後停止,這時啟動消費者,發現消費者多不到任何東西,而且zookeeper中的offset居然和生產者生產了的offset一樣。如果這時啟動生產者繼續發送數據,消費者從第10001條數據開始讀取。之前的10000條數據都不見了。
2. 問題原因:Kafka的官方wiki的原話:
In 0.8, wehave moved to logical offsets from physical offsets. This means that theoffsets are not compatible. When you try to consume using the 0.7 offsets, youwould hit “OffsetOutOfRangeException”. The default behavior of theconsumer when this happens is based on the config value of “auto.offset.reset”.If it is set to “smallest”, the consumer will start consuming fromthe beginning. If it is set to “largest”, the consumer will startconsuming from the end.
3. 解決方案:給消費者增加auto.offset.reset配置,auto.offset.reset=smallest
- 給”消息”減減肥
1. 問題描述:生產者或消費者拋出MessageSizeTooLargeException異常
2. 問題原因:這個異常的命名還是很直白的,消息太大了,去官網找找配置就解決了,比較鬱悶的就是同樣的消息大小,在Kafka0.7沒有配置相應的參數也不報錯。
3. 解決方案:
如果是生產者報錯,修改Kafka Broker的配置,在server.properties中配置message.max.bytes,預設是1M(約)。
如果是消費者報錯,修改消費者中增加fetch.message.max.bytes的配置,這個配置的值要大於Broker的message.max.bytes配置。
- 性能問題:ACK參數配置
1. 問題描述:生產者上線後,吞吐量下降了1倍。
2. 問題原因:
首先檢查Kafka Broker,發現的不管是網路、IO、CPU等都沒有出現瓶頸,並且增加生產者線程或者生產者實例可以解決問題,假設生產者寫Kafka的速度是10000條每秒,那麼再部署一個生產者,兩者寫入速度均可以達到10000條,遂斷定問題出在生產者本身,通過jstack可以發現,線程都在做寫Kafka的操作,那麼寫Kafka究竟和0.7有什麼不一樣呢?
Kafka0.8有Replication功能,消息寫入Kafka中後,Followers會創建副本,生產者有個配置叫request.required.acks,當時配置的是1,生產者會等至少1個Followers創建完副本之後才算發送成功,平均響應時間變長,所以速度變慢。
3. 解決方案(其中一種)
增加生產者線程數或者生產者實例,系統的相應時間增加,但是系統的併發數並沒有到達上限,並且Kafka Broker可以平行擴展。設置request.required.acks=0,這樣做會有丟失數據的風險。
- Producer鎖
1. 問題描述:生產者有很多的線程狀態都是BLOCKED,導致系統性能大幅度下降。
2. 問題原因:根據源代碼可以看出,生產者發送時是有鎖的,但這個鎖每個Producer對象各自持有各自的。
3. 解決方案:對於不同線程使其持有不同的producer對象。
- 坑:文件分段大小配置有bug
1. 問題描述:
系統上線後我們遇到了文件句柄數過多的問題,如果配置的分段文件大小一樣,0.8會比0.7多4倍的文件數目,所以我們當時決定增加分段文件的大小,必須是一個Int,於是改成Int.Max,結果數據整個offset錯亂,文件損壞,集群不可用了。
2. 問題原因:
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize – messagesSize ||
segment.size > 0 && time.milliseconds – segment.created > config.segmentMs – segment.rollJitterMs ||
segment.index.isFull) {
……
roll()
} else {
segment
}
}
3. 解決方案:配小一些或者升級到0.8.2
- 神一般的錯誤提示
1. 問題描述:Consumer消費時出現Iterator is in failed state的錯誤提示,錯誤量很多。
2. 問題原因:
這個錯誤並不是真正的錯誤,是因為MessageSizeTooLargeException導致的,發生MessageSizeTooLargeException異常會導致Iterator is in failed state錯誤的發生,但是MessageSizeTooLargeException只會列印一次,而那個錯誤會隨著讀取方法的調用不停的打,完全被帶跑偏了。
3. 解決方案:解決MessageSizeTooLargeException即可。