背景 近年來隨著國際形勢的變化,信創產業成為我國國家戰略的一部分。一直以來,一直以來,全球 ICT 產業底層標準、架構、產品、生態等要素均由國外公司或機構制定和控制,使我國 ICT 產業乃至廣大用戶面臨被卡脖子、數據泄露、信息安全等諸多風險,尤其是 2018年以來,中興、華為等公司的遭遇成為鮮活的實 ...
背景
某些時候,kafka上游生產者生產的消息有錯誤,或者下游消費者並不需要消費某部分的數據,這時候,通常有兩個解決方案,一種是對數據做不解析處理,直接略過。另一種就是暫時關掉kafka的消費者組,等到生產者正常後再進行消費,但由於kafka本身是預設斷點續傳的,此時就需要我們先重置kafka中當前kafka組的offset。
解決方案
更改消費者組
由於kafka對某topic中offset的管理是以組的形式來進行的,因此,在新建或更改消費者組後,對於offset的管理也會重新開始,策略取決於配置的auto.offset.reset參數
在重啟動時指定起始offset
在再次啟動時,通過配置指定要消費topic中分區的offset
@KafkaListener(groupId = "topic_group_test",topicPartitions = { @TopicPartition(topic = "topic_test",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "9830")) })
java springboot版本
通過kafka服務端腳本指定重置
kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 \ --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
具體支持8種操作
--to-earliest
--to-latest
--to-current
--to-offset
--shift-by N: 把位移調整到當前位移+N 處,N可以為負數
--to-datetime
--by-duration
--from-file
通過API代碼來指定
consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo-> new TopicPartition(topic,partitionInfo.partition())) .collect(Collectors.toList()) );
void seek(TopicPartition partition, long offset);
Void seek(TopicPartition partition,OffsetAndMetadata offsetAndMetadata);
Void seekToBeginning(Collection
Void seekToEnd(Collection
註意
以上所有操作都需要在消費者組處於未激活的情況下進行
使用代碼方式時,需要指定所有分區的消費策略