# 系統架構 **主題topic和分區partition** - topic Kafka中存儲數據的邏輯分類;你可以理解為資料庫中“表”的概念;比如,將app端日誌、微信小程式端日誌、業務庫訂單表數據分別放入不同的topic - partition分區(提升kafka吞吐量) topic中數據的具體 ...
系統架構
主題topic和分區partition
- topic
Kafka中存儲數據的邏輯分類;你可以理解為資料庫中“表”的概念;比如,將app端日誌、微信小程式端日誌、業務庫訂單表數據分別放入不同的topic - partition分區(提升kafka吞吐量)
topic中數據的具體管理單元; - 每個partition由一個kafka broker伺服器管理;
- 每個topic 可以劃分為多個partition,分佈到多個broker上管理;
- 每個partition都可以有多個副本;保證數據安全
分區對於 kafka 集群的好處是:實現topic數據的負載均衡。提高寫入、讀出的併發度,提高吞吐量。 - 分區副本replica
每個topic的每個partition都可以配置多個副本(replica),以提高數據的可靠性;
每個partition的所有副本中,必有一個leader副本,其他的就是follower副本(observer副本);follower定期找leader同步最新的數據;對外提供服務只有leader; - 分區follower
partition replica中的一個角色,它通過心跳通信不斷從leader中拉取、複製數據(只負責備份)。
如果leader所在節點宕機,follower中會選舉出新的leader; - 消息偏移量offset
partition內部每條消息都會被分配一個遞增id(offset);通過offset可以快速定位到消息的存儲位置;
kafka 只保證按一個partition中的消息的順序,不保證一個 topic的整體(多個partition 間)的順序。
哪一個topic的哪一個分區的哪一個偏移量,數據只能追加,不能被修改
自我推導設計:
- kafka是用來存數據的;
- 現實世界數據有分類,所以存儲系統也應有數據分類管理功能,如mysql的表;kafka有topic;
- 如一個topic的數據全部交給一臺server存儲和管理, 則讀寫吞吐量有限, 所以, 一個topic的數據應該可以分成多個部分(partition)分別交給多台server存儲和管理;
- 如一臺server宕機,這台server負責的partition將不可用,所以,一個partition應有多個副本;
- 一個partition有多個副本,則副本間的數據一致性難以保證,因此要有一個leader統領讀寫;
- 一個leader萬一掛掉,則該partition又不可用,因此還要有leader的動態選舉機制;
- 集群有哪些topic,topic有哪幾個分區,server線上情況,等等元信息和狀態信息需要在集群內部及客戶端之間共用,則引入了zookeeper;
- 客戶端在讀取數據時,往往需要知道自己所讀取到的位置,因而要引入消息偏移量維護機制;
broker伺服器
一臺 kafka伺服器就是一個broker。一個kafka集群由多個 broker 組成。
生產者producer
消息生產者,就是向kafka broker發消息的客戶端。
消費者consumer
- consumer :消費者,從kafka broker 取消息的客戶端。
- consumer group:消費組,單個或多個consumer可以組成一個消費組。消費組是用來實現消息的廣播(發給所有的 consumer)和單播(發給任意一個 consumer)的手段。
消費者可以對消費到的消息位置(消息偏移量)進行記錄;
老版本是記錄在zookeeper中;新版本是記錄在kafka中一個內置的topic中(__consumer_offsets)
數據存儲結構
在Kafka根目錄下的config/server.properties
文件中指定log.dirs=存儲日誌文件的目錄
物理存儲目錄結構: __consumer_offset
存儲目錄 名稱規範: topic名稱-分區號
生產者生產的消息會不斷追加到log文件末尾,為防止log文件過大導致數據定位效率低下,Kafka採取了分片和索引機制
- 每個partition的數據將分為多個segment存儲
- 每個segment對應兩個文件:“.index"文件和“.log"文件。index和log文件以當前segment的第一條消息的offset命名。
index索引文件中的數據為: 消息offset -> log文件中該消息的物理偏移量位置;
Kafka 中的索引文件以 稀疏索引(sparse index) 的方式構造消息的索引,它並不保證每個消息在索引文件中都有對應的索引;每當寫入一定量(由 broker 端參數 log.index.interval.bytes 指定,預設值為 4096 ,即 4KB )的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,增大或減小 log.index.interval.bytes的值,對應地可以縮小或增加索引項的密度;
查詢指定偏移量時,使用二分查找法來快速定位偏移量的位置。
消息message存儲結構
在客戶端編程代碼中,消息的封裝類有兩種:ProducerRecord、ConsumerRecord;
簡單來說,kafka中的每個massage由一對key-value構成;
Kafka中的message格式經歷了3個版本的變化了:v0 、 v1 、 v2
各個欄位的含義介紹如下:
- crc:占用4個位元組,主要用於校驗消息的內容;
- magic:這個占用1個位元組,主要用於標識日誌格式版本號,此版本的magic值為1
- attributes:占用1個位元組,這裡面存儲了消息壓縮使用的編碼以及Timestamp類型。目前Kafka 支持 gzip、snappy 以及 lz4(0.8.2引入) 三種壓縮格式;[0,1,2]三位bit表示壓縮類型。[3]位表示時間戳類型(0,create time;1,append time),[4,5,6,7]位保留;
- key length:占用4個位元組。主要標識 Key的內容的長度;
- key:占用 N個位元組,存儲的是 key 的具體內容;
- value length:占用4個位元組。主要標識 value 的內容的長度;
- value:value即是消息的真實內容,在 Kafka 中這個也叫做payload。
API開發
準備: 創建項目並添加依賴
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
<!-- 依賴下載國內鏡像庫 -->
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<layout>default</layout>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
</repository>
</repositories>
<!-- maven插件下載國內鏡像庫 -->
<pluginRepositories>
<pluginRepository>
<id>ali-plugin</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<!-- 指定編譯java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 指定編譯scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- 把依賴jar中的用到的類,提取到自己的jar中 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<!--下麵是為了使用 mvn package命令,如果不加則使用mvn assembly-->
<executions>
<execution>
<id>make-assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
生產者api
一個正常的生產邏輯需要具備以下幾個步驟
- 配置生產者參數及創建相應的生產者實例
- 構建待發送的消息
- 發送消息
- 關閉生產者實例
採用預設分區方式將消息散列的發送到各個分區當中
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) throws InterruptedException {
/**
* 1.構建一個kafka的客戶端
* 2.創建一些待發送的消息,構建成kafka所需要的格式
* 3.調用kafka的api去發送消息
* 4.關閉kafka生產實例
*/
//1.創建kafka的對象,配置一些kafka的配置文件
//它裡面有一個泛型k,v
//要發送數據的key
//要發送的數據value
//他有一個隱含之意,就是kafka發送的消息,是一個key,value類型的數據,但是不是必須得,其實只需要發送value的值就可以了
Properties pros = new Properties();
//指定kafka集群的地址
pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
//指定key的序列化方式
pros.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定value的序列化方式
pros.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//ack模式,取值有0,1,-1(all),all是最慢但最安全的 伺服器應答生產者成功的策略
pros.put("acks", "all");
//這是kafka發送數據失敗的重試次數,這個可能會造成發送數據的亂序問題
pros.setProperty("retries", "3");
//數據發送批次的大小 單位是位元組
pros.setProperty("batch.size", "10000");
//一次數據發送請求所能發送的最大數據量
pros.setProperty("max.request.size", "102400");
//消息在緩衝區保留的時間,超過設置的值就會被提交到服務端
pros.put("linger.ms", 10000);
//整個Producer用到總記憶體的大小,如果緩衝區滿了會提交數據到服務端
//buffer.memory要大於batch.size,否則會報申請記憶體不足的錯誤
pros.put("buffer.memory", 10240);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pros);
for (int i = 0; i < 1000; i++) {
//key value 0 --> doit32+-->+0
//key value 1 --> doit32+-->+1
//key value 2 --> doit32+-->+2
//2.創建一些待發送的消息,構建成kafka所需要的格式
ProducerRecord<String, String> record = new ProducerRecord<>("test01", i + "", "doit32-->" + i);
//3.調用kafka的api去發送消息
kafkaProducer.send(record);
Thread.sleep(100);
}
kafkaProducer.flush();
kafkaProducer.close();
}
}
對於properties配置的第二種寫法,相對來說不會出錯,簡單舉例:
public static void main(String[] args) {
Properties pros = new Properties();
pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
pros.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pros.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}
消費者Api
一個正常的消費邏輯需要具備以下幾個步驟:
- 配置消費者客戶端參數及創建相應的消費者實例;
- 訂閱主題topic;
- 拉取消息並消費;
- 定期向__consumer_offsets主題提交消費位移offset;
- 關閉消費者實例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
//1.創建kafka的消費者對象,附帶著把配置文件搞定
Properties props = new Properties();
//props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
//props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 定義kakfa 服務的地址,不需要將所有broker指定上
// props.put("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
// 制定consumer group
props.put("group.id", "g3");
// 是否自動提交offset __consumer_offset 有多少分區 50
props.put("enable.auto.commit", "true");
// 自動提交offset的時間間隔 -- 這玩意設置的大小怎麼控制
props.put("auto.commit.interval.ms", "5000"); //50000 1000
// key的反序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的反序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 如果沒有消費偏移量記錄,則自動重設為起始offset:latest, earliest, none
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.訂閱主題(確定需要消費哪一個或者多個主題)
consumer.subscribe(Arrays.asList("test02"));
//3.開始從topic中獲取數據
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
//這是數據所屬的哪一個topic
String topic = record.topic();
//該條數據的偏移量
long offset = record.offset();
//這條數據是哪一個分區的
int partition = record.partition();
//這條數據記錄的時間戳,但是這個時間戳有兩個類型
long timestamp = record.timestamp();
//上面時間戳的類型,這個類型有兩個,一個是CreateTime(這條數據創建的時間), LogAppendTime(這條數據往日誌裡面追加的時間)
TimestampType timestampType = record.timestampType();
//這個數據key的值
String key = record.key();
//這條數據value的值
String value = record.value();
//分區leader的紀元
Optional<Integer> integer = record.leaderEpoch();
//key的長度
int keySize = record.serializedKeySize();
//value的長度
int valueSize = record.serializedValueSize();
//數據的頭部信息
Headers headers = record.headers();
// for (Header header : headers) {
// String hKey = header.key();
// byte[] hValue = header.value();
// String valueString = new String(hValue);
// System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
// }
System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的紀元 = %d , key序列化的長度 = %d ,value 序列化的長度 = %d \r\n" ,
topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
}
}
//4.關閉消費者對象
// consumer.close();
}
}
subscribe訂閱主題
// subscribe有如下重載方法:
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
// 1.指定集合方式訂閱主題
consumer.subscribe(Arrays.asList(topicl ));
// 2.正則方式訂閱主題
// 如果消費者採用的是正則表達式的方式(subscribe(Pattern))訂閱, 在之後的過程中,如果有人又創建了新的主題,並且主題名字與正表達式相匹配,那麼這個消費者就可以消費到新添加的主題中的消息。
// 如果應用程式需要消費多個主題,並且可以處理不同的類型,那麼這種訂閱方式就很有效。
// 正則表達式的方式訂閱
consumer.subscribe(Pattern.compile ("topic.*" ));
// 利用正則表達式訂閱主題,可實現動態訂閱
assign訂閱主題
消費者不僅可以通過 KafkaConsumer.subscribe() 方法訂閱主題,還可直接訂閱某些主題的指定分區;
在 KafkaConsumer 中提供了 assign() 方法來實現這些功能,此方法的具體定義如下:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
public class ConsumerDemo1 {
public static void main(String[] args) {
//1.創建kafka的消費者對象,附帶著把配置文件搞定
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"doit01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.訂閱主題(確定需要消費哪一個或者多個主題)
// consumer.subscribe(Arrays.asList("test03"));
// consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
// //我現在想手動指定,我需要從哪邊開始消費
// //如果用subscribe去訂閱主題的時候,他內部會給這個消費者組來一個自動再均衡
// consumer.seek(new TopicPartition("test03",0),2);
TopicPartition tp01 = new TopicPartition("test03", 0);
//他就是手動去訂閱主題和partition,有了這個就不需要再去訂閱subscribe主題了,手動指定以後,他的內部就不會再來自動均衡了
consumer.assign(Arrays.asList(tp01)); // 手動訂閱指定主題的指定分區的指定位置
consumer.seek(new TopicPartition("test03",0),2);
//3.開始從topic中獲取數據
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
//這是數據所屬的哪一個topic
String topic = record.topic();
//該條數據的偏移量
long offset = record.offset();
//這條數據是哪一個分區的
int partition = record.partition();
//這條數據記錄的時間戳,但是這個時間戳有兩個類型
long timestamp = record.timestamp();
//上面時間戳的類型,這個類型有兩個,一個是CreateTime(這條數據創建的時間), LogAppendTime(這條數據往日誌裡面追加的時間)
TimestampType timestampType = record.timestampType();
//這個數據key的值
String key = record.key();
//這條數據value的值
String value = record.value();
//分區leader的紀元
Optional<Integer> integer = record.leaderEpoch();
//key的長度
int keySize = record.serializedKeySize();
//value的長度
int valueSize = record.serializedValueSize();
//數據的頭部信息
Headers headers = record.headers();
// for (Header header : headers) {
// String hKey = header.key();
// byte[] hValue = header.value();
// String valueString = new String(hValue);
// System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
// }
System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的紀元 = %d , key序列化的長度 = %d ,value 序列化的長度 = %d \r\n" ,
topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
}
}
//4.關閉消費者對象
// consumer.close();
}
}
這個方法只接受參數partitions,用來指定需要訂閱的分區集合。示例如下:
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;
subscribe與assign的區別
- 通過subscribe()方法訂閱主題具有消費者自動再均衡功能 ;
在多個消費者的情況下可以根據分區分配策略來自動分配各個消費者與分區的關係。當消費組的消費者增加或減少時,分區分配關係會自動調整,以實現消費負載均衡及故障自動轉移。
- assign() 方法訂閱分區時,是不具備消費者自動均衡的功能的;
其實這一點從assign方法參數可以看出端倪,兩種類型subscribe()都有ConsumerRebalanceListener類型參數的方法,而assign()方法卻沒有。
取消訂閱
可以使用KafkaConsumer中的unsubscribe()方法採取消主題的訂閱,這個方法既可以取消通過 subscribe( Collection)方式實現的訂閱;
也可以取消通過subscribe(Pattem)方式實現的訂閱,還可以取消通過assign( Collection)方式實現的訂閱。
consumer.unsubscribe();
// 如果將subscribe(Collection )或assign(Collection)集合參數設置為空集合,作用與unsubscribe()方法相同
// 如下示例中三行代碼的效果相同:
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());
消息的消費模式
Kafka中的消費是基於拉取模式的。
消息的消費一般有兩種模式:推送模式和拉取模式。推模式是服務端主動將消息推送給消費者,而拉模式是消費者主動向服務端發起請求來拉取消息。
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
}
- topic partition 這兩個屬性分別代表消息所屬主題的名稱和所在分區的編號。
- offset 表示消息在所屬分區的偏移量。
- timestamp 表示時間戳,與此對應的timestampType 表示時間戳的類型。
- timestampType 有兩種類型 CreateTime 和LogAppendTime ,分別代表消息創建的時間戳和消息追加到日誌的時間戳。
- headers 表示消息的頭部內容。
- key value 分別表示消息的鍵和消息的值,一般業務應用要讀取的就是value ;
- serializedKeySize、serializedValueSize分別表示key、value 經過序列化之後的大小,如果 key 為空,則 serializedKeySize 值為 -1,同樣,如果value為空,則serializedValueSize 的值也會為 -1;
- checksum 是CRC32的校驗值。
/**
* 訂閱與消費方式2
*/
TopicPartition tp1 = new TopicPartition("x", 0);
TopicPartition tp2 = new TopicPartition("y", 0);
TopicPartition tp3 = new TopicPartition("z", 0);
List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3);
consumer.assign(tps);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : tps) {
List<ConsumerRecord<String, String>> rList = records.records(tp);
for (ConsumerRecord<String, String> r : rList) {
r.topic();
r.partition();
r.offset();
r.value();
//do something to process record.
}
}
}
指定位移消費
有些時候,我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取消息,而 KafkaConsumer 中的seek()方法正好提供了這個功能,讓我們可以追前消費或回溯消費。
seek()方法的具體定義如下:
// seek都是和assign這個方法一起用 指定消費位置
public void seek(TopicPartiton partition,long offset)
代碼示例:
public class ConsumerDemo3指定偏移量消費 {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g002");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 是否自動提交消費位移
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 限制一次poll拉取到的數據量的最大值
props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// assign方式訂閱doit27-1的兩個分區
TopicPartition tp0 = new TopicPartition("doit27-1", 0);
TopicPartition tp1 = new TopicPartition("doit27-1", 1);
consumer.assign(Arrays.asList(tp0,tp1));
// 指定分區0,從offset:800開始消費 ; 分區1,從offset:650開始消費
consumer.seek(tp0,200);
consumer.seek(tp1,250);
// 開始拉取消息
while(true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> rec : poll) {
System.out.println(rec.partition()+","+rec.key()+","+rec.value()+","+rec.offset());
}
}
}
}
自動提交消費者偏移量
Kafka中預設的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable.auto.commit
配置,預設值為 true 。當然這個預設的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數 auto.commit.interval.ms
配置,預設值為5秒,此參數生效的前提是 enable. auto.commit
參數為 true。
在預設的方式下,消費者每隔5秒會將拉取到的每個分區中最大的消息位移進行提交。自動位移提交的動作是在 poll() 方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。
Kafka 消費的編程邏輯中位移提交是一大難點,自動提交消費位移的方式非常簡便,它免去了複雜的位移提交邏輯,讓編碼更簡潔。但隨之而來的是重覆消費和消息丟失的問題。
- 重覆消費
假設剛剛提交完一次消費位移,然後拉取一批消息進行消費,在下一次自動提交消費位移之前,消費者崩潰了,那麼又得從上一次位移提交的地方重新開始消費,這樣便發生了重覆消費的現象(對於再均衡的情況同樣適用)。我們可以通過減小位移提交的時間間隔來減小重覆消息的視窗大小,但這樣並不能避免重覆消費的發送,而且也會使位移提交更加頻繁。
- 丟失消息
按照一般思維邏輯而言,自動提交是延時提交,重覆消費可以理解,那麼消息丟失又是在什麼情形下會發生的呢?
拉取線程不斷地拉取消息並存入本地緩存,比如在BlockingQueue 中,另一個處理線程從緩存中讀取消息併進行相應的邏輯處理。設目前進行到了第 y+l 次拉取,以及第m次位移提交的時候,也就是 x+6 之前的位移己經確認提交了,處理線程卻還正在處理x+3的消息;此時如果處理線程發生了異常,待其恢復之後會從第m次位移提交處,也就是 x+6 的位置開始拉取消息,那麼 x+3至x+6 之間的消息就沒有得到相應的處理,這樣便發生消息丟失的現象。
手動提交消費者偏移量(調用kafka api)
自動位移提交的方式在正常情況下不會發生消息丟失或重覆消費的現象,但是在編程的世界里異常無可避免;同時,自動位移提交也無法做到精確的位移管理。在 Kafka中還提供了手動位移提交的方式,這樣可以使得開發人員對消費位移的管理控制更加靈活。
很多時候並不是說拉取到消息就算消費完成,而是需要將消息寫入資料庫、寫入本地緩存,或者是更加複雜的業務處理。在這些場景下,所有的業務處理完成才能認為消息被成功消費;
手動的提交方式可以讓開發人員根據程式的邏輯在合適的時機進行位移提交。開啟手動提交功能的前提是消費者客戶端參數 enable.auto.commit
配置為false ,示例如下:
props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false);
手動提交可以細分為同步提交和非同步提交,對應於 KafkaConsumer 中的 commitSync()和
commitAsync()兩種類型的方法。
同步提交的方式
commitSync()方法的定義如下:
/**
* 手動提交offset
*/
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
//do something to process record.
}
consumer.commitSync();
}
對於採用 commitSync()的無參方法,它提交消費位移的頻率和拉取批次消息、處理批次消息的頻率是一樣的,如果想尋求更細粒度的、更精準的提交,那麼就需要使用commitSync()的另一個有參方法,具體定義如下:
public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)
示例代碼如下:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
long offset = r.offset();
//do something to process record.
TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1)));
}
}
提交的偏移量 = 消費完的record的偏移量 + 1
因為,__consumer_offsets中記錄的消費偏移量,代表的是,消費者下一次要讀取的位置!!!
非同步提交方式
非同步提交的方式( commitAsync())在執行的時候消費者線程不會被阻塞;可能在提交消費位移的結果還未返回之前就開始了新一次的拉取。非同步提交可以讓消費者的性能得到一定的增強。 commitAsync方法有一個不同的重載方法,具體定義如下
/**
* 非同步提交offset
*/
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
long offset = r.offset();
//do something to process record.
TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1)));
consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e == null ){
System.out.println(map);
}else{
System.out.println("error commit offset");
}
}
});
}
}
手動提交位移(時機的選擇)
- 數據處理完成之前先提交偏移量
可能會發生漏處理的現象(數據丟失)反過來說,這種方式實現了: at most once的數據處理(傳遞)語義
- 數據處理完成之後再提交偏移量
可能會發生重覆處理的現象(數據重覆)反過來說,這種方式實現了: at least once的數據處理(傳遞)語義當然,數據處理(傳遞)的理想語義是: exactly once(精確一次)Kafka也能做到exactly once(基於kafka的事務機制)
代碼示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
public class CommitOffsetByMyself {
public static void main(String[] args) throws SQLException {
//獲取mysql的連接對象
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/football", "root", "123456");
connection.setAutoCommit(false);
PreparedStatement pps = connection.prepareStatement("insert into user values (?,?,?)");
PreparedStatement pps_offset = connection.prepareStatement("insert into offset values (?,?) on duplicate key update offset = ?");
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//設置手動提交偏移量參數,需要將自動提交給關掉
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//設置從哪裡開始消費
// props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//設置組id
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group001");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱主題
consumer.subscribe(Arrays.asList("kafka2mysql"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for (TopicPartition topicPartition : collection) {
try {
PreparedStatement get_offset = connection.prepareStatement("select offset from offset where topic_partition = ?");
String topic = topicPartition.topic();
int partition = topicPartition.partition();
get_offset.setString(1, topic + "_" + partition);
ResultSet resultSet = get_offset.executeQuery();
if (resultSet.next()){
int offset = resultSet.getInt(1);
System.out.println("發生了再均衡,被分配了分區消費權,並且查到了目標分區的偏移量"+partition+" , "+offset);
//拿到了offset後就可以定位消費了
consumer.seek(new TopicPartition(topic, partition), offset);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
});
//拉去數據後寫入到mysql
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
String data = record.value();
String[] arr = data.split(",");
String id = arr[0];
String name = arr[1];
String age = arr[2];
pps.setInt(1, Integer.parseInt(id));
pps.setString(2, name);
pps.setInt(3, Integer.parseInt(age));
pps.execute();
//埋個異常,看看是不是真的是這樣
// if (Integer.parseInt(id) == 5) {
// throw new SQLException();
// }
long offset = record.offset();
int partition = record.partition();
String topic = record.topic();
pps_offset.setString(1, topic + "_" + partition);
pps_offset.setInt(2, (int) offset + 1);
pps_offset.setInt(3, (int) offset + 1);
pps_offset.execute();
//提交jdbc事務
connection.commit();
}
} catch (Exception e) {
connection.rollback();
}
}
}
}
消費者提交偏移量方式的總結
consumer的消費位移提交方式:
- 全自動
- auto.offset.commit = true
- 定時提交到consumer_offsets
- 半自動
- auto.offset.commit = false;
- 然後手動觸發提交 consumer.commitSync();
- 提交到consumer_offsets
- 全手動
- auto.offset.commit = false;
- 寫自己的代碼去把消費位移保存到你自己的地方mysql/zk/redis/
- 提交到自己所涉及的存儲;初始化時也需要自己去從自定義存儲中查詢到消費位移