SpringBoot-Learning系列之Kafka整合 本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。 消息系統 主要應用場景 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦) 非同步處理、順序處理 實時數據 ...
SpringBoot-Learning系列之Kafka整合
本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。
-
消息系統
- 主要應用場景
- 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦)
- 非同步處理、順序處理
- 實時數據傳輸管道
- 異構語言架構系統之間的通信
- 如 C語言的CS客戶端的HIS系統與java語言開發的互聯網線上診療系統的交互
- 主要應用場景
-
Kafka是什麼
kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。是java領域常用的消息隊列。
核心概念:
- 生產者(Producer) 生產者應用向主題隊列中投送消息數據
- 消費者 (Consumer) 消費者應用從訂閱的Kafka的主題隊列中獲取數據、處理數據等後續操作
- 主題 (Topic) 可以理解為生產者與消費者交互的橋梁
- 分區 (Partition) 預設一個主題有一個分區,用戶可以設置多個分區。每個分區可以有多個副本(Replica)。分區的作用是,將數據劃分為多個小塊,提高併發性和可擴展性。每個分區都有一個唯一的標識符,稱為分區號。消息按照鍵(key)來進行分區,相同鍵的消息會被分配到同一個分區中。分區可以有不同的消費者同時消費。副本的作用是提供數據的冗餘和故障恢復。每個分區可以有多個副本,其中一個被稱為領導者(leader),其他副本被稱為追隨者(follower)。領導者負責處理讀寫請求,而追隨者只負責複製領導者的數據。如果領導者宕機或不可用,某個追隨者會被選舉為新的領導者,保證數據的可用性。
-
windows 安裝kafka
本地環境DockerDeskTop+WSL2,基於Docker方式安裝Kafka
2.8.0後不需要依賴zk了
-
拉取鏡像
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka
-
創建網路
docker network create kafka-net --driver bridge
-
安裝zk
docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
-
安裝kafka
docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:latest
-
測試
telnet localhost:9092
-
-
SpringBoot集成
SpringBoot3.1.0+jdk17
-
pom依賴
``` <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>io.github.vino42</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>17</java.version> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <!--排除掉 自行添加最新的官方clients依賴--> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.21</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>3.1.0</version> </plugin> </plugins> </build> </project> ```
-
配置
spring: kafka: bootstrap-servers: 172.31.192.1:9092 producer: retries: 0 # 每次批量發送消息的數量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false # MANUAL poll()拉取一批消息,處理完業務後,手動調用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交 # MANUAL_IMMEDIATE 每處理完業務手動調用Acknowledgment.acknowledge()後立即提交 # RECORD 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交 # BATCH 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交 # TIME 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交 # COUNT 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交 # COUNT_TIME TIME或COUNT滿足其中一個時提交 ack-mode: manual_immediate consumer: group-id: test # 是否自動提交 enable-auto-commit: false max-poll-records: 100 # 用於指定消費者在啟動時、重置消費偏移量時的行為。 # earliest:消費者會將消費偏移量重置為最早的可用偏移量,也就是從最早的消息開始消費。 # latest:消費者會將消費偏移量重置為最新的可用偏移量,也就是只消費最新發送的消息。 # none:如果找不到已保存的消費偏移量,消費者會拋出一個異常 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000 server: port: 8888spring: kafka: bootstrap-servers: 172.31.192.1:9092 producer: retries: 0 # 每次批量發送消息的數量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false ack-mode: manual_immediate consumer: group-id: test enable-auto-commit: false max-poll-records: 100 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000
-
生產者代碼示例
package io.github.vino42.publiser; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * ===================================================================================== * * @Created : 2023/8/30 21:29 * @Compiler : jdk 17 * @Author : VINO * @Copyright : VINO * @Decription : kafak 消息生產者 * ===================================================================================== */ @Component public class KafkaPublishService { @Autowired KafkaTemplate kafkaTemplate; /** * 這裡為了簡單 直接發送json字元串 * * @param json */ public void send(String topic, String json) { kafkaTemplate.send(topic, json); } }
@RequestMapping("/send") public String send() { IntStream.range(0, 10000).forEach(d -> { kafkaPublishService.send("test", RandomUtil.randomString(16)); }); return "ok"; }
-
消費者
@Component @Slf4j public class CustomKafkaListener { @org.springframework.kafka.annotation.KafkaListener(topics = "test") public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) { try { String key = String.valueOf(record.key()); String body = record.value(); log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body); log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body); } catch (Exception e) { e.printStackTrace(); } finally { //手動ack acknowledgment.acknowledge(); } } }
-
SpringBoot Learning系列 是筆者總結整理的一個SpringBoot學習集合。可以說算是一個SpringBoot學習的大集合。歡迎Star關註。謝謝觀看。
關註公眾號不迷路