1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。 工程POM文件代碼如下: 註釋部分為手動添加的 gson、lombok 依賴。 2、創建消息實體類 3、創建消息生產類 4、編輯資源配置文件 application.properties 5、啟 ...
1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。
工程POM文件代碼如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.miniooc</groupId> 7 <artifactId>springboot-kafka-producer</artifactId> 8 <version>1.0.0-SNAPSHOT</version> 9 <packaging>jar</packaging> 10 11 <name>springboot-kafka-producer</name> 12 <description>Demo project for Spring Boot</description> 13 14 <parent> 15 <groupId>org.springframework.boot</groupId> 16 <artifactId>spring-boot-starter-parent</artifactId> 17 <version>2.0.3.RELEASE</version> 18 <relativePath/> 19 </parent> 20 21 <properties> 22 <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> 23 </properties> 24 25 <dependencies> 26 <dependency> 27 <groupId>org.springframework.boot</groupId> 28 <artifactId>spring-boot-starter-web</artifactId> 29 </dependency> 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-actuator</artifactId> 33 </dependency> 34 <dependency> 35 <groupId>org.springframework.kafka</groupId> 36 <artifactId>spring-kafka</artifactId> 37 </dependency> 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-starter-test</artifactId> 41 <scope>test</scope> 42 </dependency> 43 44 <!-- 添加 gson 依賴 --> 45 <dependency> 46 <groupId>com.google.code.gson</groupId> 47 <artifactId>gson</artifactId> 48 <version>2.8.5</version> 49 </dependency> 50 <!-- 添加 lombok 依賴 --> 51 <dependency> 52 <groupId>org.projectlombok</groupId> 53 <artifactId>lombok</artifactId> 54 <version>1.16.22</version> 55 <scope>provided</scope> 56 </dependency> 57 </dependencies> 58 59 <dependencyManagement> 60 <dependencies> 61 <dependency> 62 <groupId>org.springframework.cloud</groupId> 63 <artifactId>spring-cloud-dependencies</artifactId> 64 <version>${spring-cloud.version}</version> 65 <type>pom</type> 66 <scope>import</scope> 67 </dependency> 68 </dependencies> 69 </dependencyManagement> 70 71 <build> 72 <plugins> 73 <plugin> 74 <groupId>org.springframework.boot</groupId> 75 <artifactId>spring-boot-maven-plugin</artifactId> 76 </plugin> 77 </plugins> 78 </build> 79 80 81 </project>
註釋部分為手動添加的 gson、lombok 依賴。
2、創建消息實體類
1 package com.miniooc.kafka.message; 2 3 import lombok.Data; 4 5 import java.io.Serializable; 6 import java.util.Date; 7 import java.util.List; 8 9 @Data 10 public class OrderBasic implements Serializable { 11 12 /** 13 * 訂單ID 14 */ 15 private String orderId; 16 /** 17 * 訂單編號 18 */ 19 private String orderNumber; 20 /** 21 * 訂單日期 22 */ 23 private Date date; 24 /** 25 * 訂單信息 26 */ 27 private List<String> desc; 28 29 }
3、創建消息生產類
1 /** 2 * 3 */ 4 package com.miniooc.kafka.producer; 5 6 import com.google.gson.GsonBuilder; 7 import com.miniooc.kafka.message.OrderBasic; 8 import lombok.extern.java.Log; 9 import org.springframework.beans.factory.annotation.Value; 10 import org.springframework.kafka.core.KafkaTemplate; 11 import org.springframework.stereotype.Component; 12 13 import javax.annotation.Resource; 14 15 /** 16 * Kafka消息生產類 17 */ 18 @Log 19 @Component 20 public class KafkaProducer { 21 22 @Resource 23 private KafkaTemplate<String, String> kafkaTemplate; 24 25 @Value("${kafka.topic.order}") 26 private String topicOrder; 27 28 /** 29 * 發送訂單消息 30 * 31 * @param orderBasic 訂單信息 32 */ 33 public void sendOrderMessage(OrderBasic orderBasic) { 34 GsonBuilder builder = new GsonBuilder(); 35 builder.setPrettyPrinting(); 36 builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); 37 String message = builder.create().toJson(orderBasic); 38 kafkaTemplate.send(topicOrder, message); 39 log.info("\n" + message); 40 } 41 }
4、編輯資源配置文件 application.properties
1 server.port=9526 2 spring.application.name=kafka-producer 3 kafka.bootstrap.servers=localhost:9092 4 kafka.topic.order=topic-order 5 kafka.group.id=group-order
5、啟動 zookeeper
D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
6、啟動 kafka
D:\kafka>bin\windows\kafka-server-start.bat config\server.properties
7、運行工程,通過控制器調用消息生產類,創建一條消息到kafka
看到紅框內容,說明消息發送成功。
8、再使用IDEA新建工程引導方式,創建消息消費工程 springboot-kafka-producer。
9、創建消息消費類,並監聽topic。
1 package com.miniooc.kafka.consumer; 2 3 import com.google.gson.Gson; 4 import com.google.gson.GsonBuilder; 5 import com.google.gson.reflect.TypeToken; 6 import com.miniooc.kafka.message.OrderBasic; 7 import lombok.extern.java.Log; 8 import org.springframework.kafka.annotation.KafkaListener; 9 import org.springframework.messaging.handler.annotation.Payload; 10 import org.springframework.stereotype.Component; 11 12 @Log 13 @Component 14 public class KafkaConsumer { 15 16 @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory") 17 public void consume(@Payload String message) { 18 GsonBuilder builder = new GsonBuilder(); 19 builder.setPrettyPrinting(); 20 builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); 21 Gson gson = builder.create(); 22 OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() { 23 }.getType()); 24 String json = gson.toJson(orderBasic); 25 log.info("\n接受並消費消息\n" + json); 26 } 27 }
10、運行工程。
看到紅框內容,說明消息消費成功。
SpringBoot Kafka 整合完成!