SpringBoot Kafka 整合 實例 源碼

来源:https://www.cnblogs.com/songlu/archive/2018/10/31/9885892.html
-Advertisement-
Play Games

1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。 工程POM文件代碼如下: 註釋部分為手動添加的 gson、lombok 依賴。 2、創建消息實體類 3、創建消息生產類 4、編輯資源配置文件 application.properties 5、啟 ...


 

1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。

工程POM文件代碼如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <groupId>com.miniooc</groupId>
 7     <artifactId>springboot-kafka-producer</artifactId>
 8     <version>1.0.0-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10 
11     <name>springboot-kafka-producer</name>
12     <description>Demo project for Spring Boot</description>
13 
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>2.0.3.RELEASE</version>
18         <relativePath/>
19     </parent>
20 
21     <properties>
22         <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
23     </properties>
24 
25     <dependencies>
26         <dependency>
27             <groupId>org.springframework.boot</groupId>
28             <artifactId>spring-boot-starter-web</artifactId>
29         </dependency>
30         <dependency>
31             <groupId>org.springframework.boot</groupId>
32             <artifactId>spring-boot-starter-actuator</artifactId>
33         </dependency>
34         <dependency>
35             <groupId>org.springframework.kafka</groupId>
36             <artifactId>spring-kafka</artifactId>
37         </dependency>
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-starter-test</artifactId>
41             <scope>test</scope>
42         </dependency>
43 
44         <!-- 添加 gson 依賴 -->
45         <dependency>
46             <groupId>com.google.code.gson</groupId>
47             <artifactId>gson</artifactId>
48             <version>2.8.5</version>
49         </dependency>
50         <!-- 添加 lombok 依賴 -->
51         <dependency>
52             <groupId>org.projectlombok</groupId>
53             <artifactId>lombok</artifactId>
54             <version>1.16.22</version>
55             <scope>provided</scope>
56         </dependency>
57     </dependencies>
58 
59     <dependencyManagement>
60         <dependencies>
61             <dependency>
62                 <groupId>org.springframework.cloud</groupId>
63                 <artifactId>spring-cloud-dependencies</artifactId>
64                 <version>${spring-cloud.version}</version>
65                 <type>pom</type>
66                 <scope>import</scope>
67             </dependency>
68         </dependencies>
69     </dependencyManagement>
70 
71     <build>
72         <plugins>
73             <plugin>
74                 <groupId>org.springframework.boot</groupId>
75                 <artifactId>spring-boot-maven-plugin</artifactId>
76             </plugin>
77         </plugins>
78     </build>
79 
80 
81 </project>

註釋部分為手動添加的 gson、lombok 依賴。

2、創建消息實體類

 1 package com.miniooc.kafka.message;
 2 
 3 import lombok.Data;
 4 
 5 import java.io.Serializable;
 6 import java.util.Date;
 7 import java.util.List;
 8 
 9 @Data
10 public class OrderBasic implements Serializable {
11 
12     /**
13      * 訂單ID
14      */
15     private String orderId;
16     /**
17      * 訂單編號
18      */
19     private String orderNumber;
20     /**
21      * 訂單日期
22      */
23     private Date date;
24     /**
25      * 訂單信息
26      */
27     private List<String> desc;
28 
29 }

3、創建消息生產類

 1 /**
 2  *
 3  */
 4 package com.miniooc.kafka.producer;
 5 
 6 import com.google.gson.GsonBuilder;
 7 import com.miniooc.kafka.message.OrderBasic;
 8 import lombok.extern.java.Log;
 9 import org.springframework.beans.factory.annotation.Value;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.stereotype.Component;
12 
13 import javax.annotation.Resource;
14 
15 /**
16  * Kafka消息生產類
17  */
18 @Log
19 @Component
20 public class KafkaProducer {
21 
22     @Resource
23     private KafkaTemplate<String, String> kafkaTemplate;
24 
25     @Value("${kafka.topic.order}")
26     private String topicOrder;
27 
28     /**
29      * 發送訂單消息
30      *
31      * @param orderBasic 訂單信息
32      */
33     public void sendOrderMessage(OrderBasic orderBasic) {
34         GsonBuilder builder = new GsonBuilder();
35         builder.setPrettyPrinting();
36         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
37         String message = builder.create().toJson(orderBasic);
38         kafkaTemplate.send(topicOrder, message);
39         log.info("\n" + message);
40     }
41 }

 4、編輯資源配置文件 application.properties

1 server.port=9526
2 spring.application.name=kafka-producer
3 kafka.bootstrap.servers=localhost:9092
4 kafka.topic.order=topic-order
5 kafka.group.id=group-order

5、啟動 zookeeper

D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

6、啟動 kafka

D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

7、運行工程,通過控制器調用消息生產類,創建一條消息到kafka

看到紅框內容,說明消息發送成功。

8、再使用IDEA新建工程引導方式,創建消息消費工程 springboot-kafka-producer。

9、創建消息消費類,並監聽topic。

 1 package com.miniooc.kafka.consumer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.google.gson.reflect.TypeToken;
 6 import com.miniooc.kafka.message.OrderBasic;
 7 import lombok.extern.java.Log;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11 
12 @Log
13 @Component
14 public class KafkaConsumer {
15 
16     @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
17     public void consume(@Payload String message) {
18         GsonBuilder builder = new GsonBuilder();
19         builder.setPrettyPrinting();
20         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
21         Gson gson = builder.create();
22         OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
23         }.getType());
24         String json = gson.toJson(orderBasic);
25         log.info("\n接受並消費消息\n" + json);
26     }
27 }

10、運行工程。

看到紅框內容,說明消息消費成功。

SpringBoot Kafka 整合完成! 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 多線程 std::lock 當要同時操作2個對象時,就需要同時鎖定這2個對象,而不是先鎖定一個,然後再鎖定另一個。同時鎖定多個對象的方法:std::lock(對象1.鎖,對象2.鎖...) 額外說明:lock_guard\ lock_a(d1.m, std::adopt_lock); 上面這句是為了 ...
  • 1、遞歸與迭代: 遞歸和迭代都是迴圈的一種。簡單地說,遞歸是重覆調用函數自身實現迴圈。迭代是函數內某段代碼實現迴圈,而迭代與普通迴圈的區別是:迴圈代碼中參與運算的變數同時是保存結果的變數,當前保存的結果作為下一次迴圈計算的初始值。 遞歸迴圈中,遇到滿足終止條件的情況時逐層返回來結束。迭代則使用計數器 ...
  • 問題: 由於公司業務擴大,各個子系統陸續遷移和部署在不同的數據源上,這樣方便擴容,但是因此引出了一些問題。 舉個例子:在查詢"訂單"(位於訂單子系統)列表時,同時需要查詢出所關聯的"用戶"(位於賬戶子系統)的姓名,而這時由於數據存儲在不同的數據源上,沒有辦法通過一條連表的sql獲取到全部的數據,而是 ...
  • laravel里所謂的provider服務提供者,其實是對某一類功能進行整合,與做一些使用前的初始化引導工作。laravel里的服務提供者也分為,系統核心服務提供者、與一般系統服務提供者。例如上一篇博文里介紹的,最早在application中進行註冊的event、log、routing這些就是系統的 ...
  • 開篇:做了這麼多年的軟體,第一次使用博客的方式記錄學習過程,之前都是筆記本(都有一摞了),因為之前一直從事的都是.NET的開發工作,對C++知之甚少,但一直想瞭解C++這門鼻祖級的語言,現在終於下定決心、騰出時間,系統的學習一下,因為有了豐富的編程經驗,所以不再記錄安裝編程環境之類的事項直接進行編程 ...
  • 認識進程與線程(python) 一段時間沒有更新博客了,今天和大家講講關於 python 進程和線程的知識點。(個人心得,多多指教!) 階段一:併發與並行的深入理解 ​ 並行一定是併發,但併發不一定是並行。 ​ 並行是相對的,並行是絕對的。 問題一: 電腦是如何執行程式指令的? 問題二: 電腦如 ...
  • 1. fgetss函數php官網的解釋是: (PHP 4, PHP 5, PHP 7) fgetss — 從文件指針中讀取一行並過濾掉 HTML 標記 2. 測試後出現的問題是: 當文本中有一行數據出現 < 左尖括弧字元時,會把下麵的數據全部替換成空白行 ,每行讀取到的數據都是空白 ...
  • 前面提到條件語句的標準格式為“if (條件) { /* 條件成立時的操作代碼 */ } else { /* 條件不成立時的操作代碼 */ }”,乍看之下仿佛只有兩個分支,一個是條件成立時的分支,另一個是條件不成立時的分支。很明顯僅僅兩個分支是不能滿足複雜的業務需求的,自然Java代碼也不會這麼傻瓜到 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...