近日Kafka發佈了最新版本 2.5.0,增加了很多新功能: 下載地址:https://kafka.apache.org/downloads 2.5.0 對TLS 1.3的支持(預設為1.2) 引入用於 Kafka Streams 的 Co groups 用於 Kafka Consumer 的增量 ...
近日Kafka發佈了最新版本 2.5.0,增加了很多新功能:
下載地址:https://kafka.apache.org/downloads#2.5.0
-
對TLS 1.3的支持(預設為1.2)
-
引入用於 Kafka Streams 的 Co-groups
-
用於 Kafka Consumer 的增量 rebalance 機制
-
為更好的監控操作增加了新的指標
-
升級Zookeeper至 3.5.7
-
取消了對Scala 2.1.1的支持
下麵詳細說明本次更新:
一、新功能
1、Kafka Streams: Add Cogroup in the DSL
當多個流聚集在一起以形成單個較大的對象時(例如,購物網站可能具有購物車流,心愿單流和購買流。它們共同構成一個客戶),將其在Kafka Streams DSL中使用非常困難。
通常需要您將所有流分組並聚合到KTables,然後進行多個外部聯接調用,最後得到具有所需對象的KTable。這將為每個流和一長串ValueJoiners創建一個狀態存儲,每個新記錄都必須經過此連接才能到達最終對象。
創建使用單個狀態存儲的Cogroup 方法將:
-
減少從狀態存儲獲取的數量。對於多個聯接,當新值進入任何流時,都會發生連鎖反應,聯接處理器將繼續調用ValueGetters,直到我們訪問了所有狀態存儲。
-
性能略有提高。如上所述,所有ValueGetters都被調用,還導致所有ValueJoiners被調用,從而強制重新計算所有其他流的當前聯接值,從而影響性能。
2、Add support for TLS 1.3
Java 11添加了對TLS 1.3的支持。添加對Java 11的支持後,我們應該對此提供支持。
3、不再支持Scala 2.11
為什麼不再支持?
我們目前為3個Scala版本構建Kafka:2.11、2.12和最近發佈的2.13。由於我們必須在每個受支持的版本上編譯和運行測試,因此從開發和測試的角度來看,這是一筆不小的成本。
Scala 2.11.0於2014年4月發佈,對2.11.x的支持於2017年11月結束(到發佈Kafka 2.5時將超過2年)。Scala 2.12.0於2016年11月發佈,Scala 2.13.0於2019年6月發佈。基於此,現在該放棄對Scala 2.11的支持了,以便我們使測試矩陣易於管理(最近的kafka-trunk-jdk8占用了將近10個小時,它將使用3個Scala版本構建並運行單元測試和集成測試。此外,Scala 2.12和更高版本還改進了與Java 8功能介面的互操作性(Scala 2.12中首次引入)。更具體地說,Scala 2.12中的lambda可以與Java 8代碼相同的方式與Java 8功能介面一起使用。
在我們的下載頁面中,我們推薦自Kafka 2.1.0起使用Scala 2.12構建的Kafka二進位文件。我們切換到Scala 2.12作為Kafka 2.2.0中源tarball,構建和系統測試的預設Scala版本。
二、改進與修複
- 當輸入 topic 事務時,Kafka Streams lag 不為 0
- Kafka-streams 可配置內部 topics message.timestamp.type=CreateTime
- 將 KStream#toTable 添加到 Streams DSL
- 將 Commit/List Offsets 選項添加到 AdminClient
- 將 VoidSerde 添加到 Serdes
- 改進 Sensor Retrieval
[KAFKA-3061] 修複Guava依賴問題
[KAFKA-4203] Java生產者預設的最大消息大小不再與broker預設一致
[KAFKA-5868] kafka消費者reblance時間過長問題
三、其他版本升級至2.5.0指南
如果要從2.1.x之前的版本升級,請參閱以下註釋,以瞭解用於存儲偏移量的架構的更改。將inter.broker.protocol.version更改為最新版本後,將無法降級到2.1之前的版本。
在所有Broker上更新server.properties並添加以下屬性。CURRENT_KAFKA_VERSION指的是您要升級的版本。CURRENT_MESSAGE_FORMAT_VERSION是指當前使用的消息格式版本。如果以前覆蓋了消息格式版本,則應保留其當前值。或者,如果要從0.11.0.x之前的版本升級,則應將CURRENT_MESSAGE_FORMAT_VERSION設置為與CURRENT_KAFKA_VERSION相匹配。
- inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.10.0、0.11.0、1.0、2.0、2.2)。
- log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION
- 如果要從0.11.0.x或更高版本升級,並且尚未覆蓋消息格式,則只需要覆蓋Broker間協議版本。
- inter.broker.protocol.version = CURRENT_KAFKA_VERSION(0.11.0,1.0,1.1,2.0,2.1,2.2,2.3)。
- 一次升級一個Broker:關閉Broker,更新代碼,然後重新啟動。完成此操作後,Broker將運行最新版本,並且您可以驗證集群的行為和性能是否符合預期。如果有任何問題,此時仍可以降級。
- 驗證群集的行為和性能後,通過編輯
inter.broker.protocol.version
並將其設置為2.5來提高協議版本 。 - 逐一重新啟動Broker,以使新協議版本生效。Broker開始使用最新協議版本後,將無法再將群集降級到較舊版本。
- 如果您已按照上述說明覆蓋了消息格式版本,則需要再次滾動重啟以將其升級到最新版本。一旦所有(或大多數)使用者均已升級到0.11.0或更高版本,則在每個Broker上將log.message.format.version更改為2.5,然後逐一重新啟動它們。請註意,不再維護的較舊的Scala客戶端不支持0.11中引入的消息格式,因此,為避免轉換成本,必須使用較新的Java客戶端。
2.5.0主要的變化,可能產生的升級影響
- 當
RebalanceProtocol#COOPERATIVE
使用時,Consumer#poll
仍然可以返回數據,此外,Consumer#commitSync
現在可以拋出RebalanceInProgressException來通知用戶此類事件,CommitFailedException
並允許用戶完成正在進行的Reblance,然後重新嘗試為那些仍然擁有的分區提交偏移量。 - 為了提高典型網路環境中的彈性,預設值
zookeeper.session.timeout.ms
已從6s增加到18s,replica.lag.time.max.ms
從10s增加到30s。 cogroup()
添加了新的DSL運營商,用於一次將多個流聚合在一起。- 添加了新的
KStream.toTable()
API,可將輸入事件流轉換為KTable。 - 添加了新的Serde類型
Void
以表示輸入主題中的空鍵或空值。 - 棄用
UsePreviousTimeOnInvalidTimestamp
並替換為UsePartitionTimeOnInvalidTimeStamp
。 - 通過添加掛起的偏移防護機制和更強大的事務提交一致性檢查,改進了一次精確語義,這大大簡化了可伸縮的一次精確應用程式的實現。
- 棄用
KafkaStreams.store(String, QueryableStoreType)
並替換為KafkaStreams.store(StoreQueryParameters)
。 - 不再支持Scala 2.11。
- 軟體包中的所有Scala類
kafka.security.auth
均已棄用。請註意,在2.4.0中已棄用kafka.security.auth.Authorizer
和kafka.security.auth.SimpleAclAuthorizer
。 - 預設情況下,TLSv1和TLSv1.1已被禁用,因為它們具有已知的安全漏洞。現在預設情況下僅啟用TLSv1.2。您可以通過在配置選項
ssl.protocol
和中明確啟用它們來繼續使用TLSv1和TLSv1.1ssl.enabled.protocols
。 - ZooKeeper已升級到3.5.7,並且如果3.4數據目錄中沒有快照文件,則ZooKeeper從3.4.X升級到3.5.7可能會失敗。這通常發生在測試升級中,其中ZooKeeper 3.5.7嘗試載入沒有創建快照文件的現有3.4數據目錄。有關問題請參考:https://issues.apache.org/jira/browse/ZOOKEEPER-3056
- ZooKeeper 3.5.7版支持有或沒有客戶端證書的TLS加密的到ZooKeeper的連接,並且可以使用其他Kafka配置來利用此功能。