SpringBoot Kafka 整合 實例 源碼

来源:https://www.cnblogs.com/songlu/archive/2018/10/31/9885892.html
-Advertisement-
Play Games

1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。 工程POM文件代碼如下: 註釋部分為手動添加的 gson、lombok 依賴。 2、創建消息實體類 3、創建消息生產類 4、編輯資源配置文件 application.properties 5、啟 ...


 

1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。

工程POM文件代碼如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <groupId>com.miniooc</groupId>
 7     <artifactId>springboot-kafka-producer</artifactId>
 8     <version>1.0.0-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10 
11     <name>springboot-kafka-producer</name>
12     <description>Demo project for Spring Boot</description>
13 
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>2.0.3.RELEASE</version>
18         <relativePath/>
19     </parent>
20 
21     <properties>
22         <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
23     </properties>
24 
25     <dependencies>
26         <dependency>
27             <groupId>org.springframework.boot</groupId>
28             <artifactId>spring-boot-starter-web</artifactId>
29         </dependency>
30         <dependency>
31             <groupId>org.springframework.boot</groupId>
32             <artifactId>spring-boot-starter-actuator</artifactId>
33         </dependency>
34         <dependency>
35             <groupId>org.springframework.kafka</groupId>
36             <artifactId>spring-kafka</artifactId>
37         </dependency>
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-starter-test</artifactId>
41             <scope>test</scope>
42         </dependency>
43 
44         <!-- 添加 gson 依賴 -->
45         <dependency>
46             <groupId>com.google.code.gson</groupId>
47             <artifactId>gson</artifactId>
48             <version>2.8.5</version>
49         </dependency>
50         <!-- 添加 lombok 依賴 -->
51         <dependency>
52             <groupId>org.projectlombok</groupId>
53             <artifactId>lombok</artifactId>
54             <version>1.16.22</version>
55             <scope>provided</scope>
56         </dependency>
57     </dependencies>
58 
59     <dependencyManagement>
60         <dependencies>
61             <dependency>
62                 <groupId>org.springframework.cloud</groupId>
63                 <artifactId>spring-cloud-dependencies</artifactId>
64                 <version>${spring-cloud.version}</version>
65                 <type>pom</type>
66                 <scope>import</scope>
67             </dependency>
68         </dependencies>
69     </dependencyManagement>
70 
71     <build>
72         <plugins>
73             <plugin>
74                 <groupId>org.springframework.boot</groupId>
75                 <artifactId>spring-boot-maven-plugin</artifactId>
76             </plugin>
77         </plugins>
78     </build>
79 
80 
81 </project>

註釋部分為手動添加的 gson、lombok 依賴。

2、創建消息實體類

 1 package com.miniooc.kafka.message;
 2 
 3 import lombok.Data;
 4 
 5 import java.io.Serializable;
 6 import java.util.Date;
 7 import java.util.List;
 8 
 9 @Data
10 public class OrderBasic implements Serializable {
11 
12     /**
13      * 訂單ID
14      */
15     private String orderId;
16     /**
17      * 訂單編號
18      */
19     private String orderNumber;
20     /**
21      * 訂單日期
22      */
23     private Date date;
24     /**
25      * 訂單信息
26      */
27     private List<String> desc;
28 
29 }

3、創建消息生產類

 1 /**
 2  *
 3  */
 4 package com.miniooc.kafka.producer;
 5 
 6 import com.google.gson.GsonBuilder;
 7 import com.miniooc.kafka.message.OrderBasic;
 8 import lombok.extern.java.Log;
 9 import org.springframework.beans.factory.annotation.Value;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.stereotype.Component;
12 
13 import javax.annotation.Resource;
14 
15 /**
16  * Kafka消息生產類
17  */
18 @Log
19 @Component
20 public class KafkaProducer {
21 
22     @Resource
23     private KafkaTemplate<String, String> kafkaTemplate;
24 
25     @Value("${kafka.topic.order}")
26     private String topicOrder;
27 
28     /**
29      * 發送訂單消息
30      *
31      * @param orderBasic 訂單信息
32      */
33     public void sendOrderMessage(OrderBasic orderBasic) {
34         GsonBuilder builder = new GsonBuilder();
35         builder.setPrettyPrinting();
36         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
37         String message = builder.create().toJson(orderBasic);
38         kafkaTemplate.send(topicOrder, message);
39         log.info("\n" + message);
40     }
41 }

 4、編輯資源配置文件 application.properties

1 server.port=9526
2 spring.application.name=kafka-producer
3 kafka.bootstrap.servers=localhost:9092
4 kafka.topic.order=topic-order
5 kafka.group.id=group-order

5、啟動 zookeeper

D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

6、啟動 kafka

D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

7、運行工程,通過控制器調用消息生產類,創建一條消息到kafka

看到紅框內容,說明消息發送成功。

8、再使用IDEA新建工程引導方式,創建消息消費工程 springboot-kafka-producer。

9、創建消息消費類,並監聽topic。

 1 package com.miniooc.kafka.consumer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.google.gson.reflect.TypeToken;
 6 import com.miniooc.kafka.message.OrderBasic;
 7 import lombok.extern.java.Log;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11 
12 @Log
13 @Component
14 public class KafkaConsumer {
15 
16     @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
17     public void consume(@Payload String message) {
18         GsonBuilder builder = new GsonBuilder();
19         builder.setPrettyPrinting();
20         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
21         Gson gson = builder.create();
22         OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
23         }.getType());
24         String json = gson.toJson(orderBasic);
25         log.info("\n接受並消費消息\n" + json);
26     }
27 }

10、運行工程。

看到紅框內容,說明消息消費成功。

SpringBoot Kafka 整合完成! 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 多線程 std::lock 當要同時操作2個對象時,就需要同時鎖定這2個對象,而不是先鎖定一個,然後再鎖定另一個。同時鎖定多個對象的方法:std::lock(對象1.鎖,對象2.鎖...) 額外說明:lock_guard\ lock_a(d1.m, std::adopt_lock); 上面這句是為了 ...
  • 1、遞歸與迭代: 遞歸和迭代都是迴圈的一種。簡單地說,遞歸是重覆調用函數自身實現迴圈。迭代是函數內某段代碼實現迴圈,而迭代與普通迴圈的區別是:迴圈代碼中參與運算的變數同時是保存結果的變數,當前保存的結果作為下一次迴圈計算的初始值。 遞歸迴圈中,遇到滿足終止條件的情況時逐層返回來結束。迭代則使用計數器 ...
  • 問題: 由於公司業務擴大,各個子系統陸續遷移和部署在不同的數據源上,這樣方便擴容,但是因此引出了一些問題。 舉個例子:在查詢"訂單"(位於訂單子系統)列表時,同時需要查詢出所關聯的"用戶"(位於賬戶子系統)的姓名,而這時由於數據存儲在不同的數據源上,沒有辦法通過一條連表的sql獲取到全部的數據,而是 ...
  • laravel里所謂的provider服務提供者,其實是對某一類功能進行整合,與做一些使用前的初始化引導工作。laravel里的服務提供者也分為,系統核心服務提供者、與一般系統服務提供者。例如上一篇博文里介紹的,最早在application中進行註冊的event、log、routing這些就是系統的 ...
  • 開篇:做了這麼多年的軟體,第一次使用博客的方式記錄學習過程,之前都是筆記本(都有一摞了),因為之前一直從事的都是.NET的開發工作,對C++知之甚少,但一直想瞭解C++這門鼻祖級的語言,現在終於下定決心、騰出時間,系統的學習一下,因為有了豐富的編程經驗,所以不再記錄安裝編程環境之類的事項直接進行編程 ...
  • 認識進程與線程(python) 一段時間沒有更新博客了,今天和大家講講關於 python 進程和線程的知識點。(個人心得,多多指教!) 階段一:併發與並行的深入理解 ​ 並行一定是併發,但併發不一定是並行。 ​ 並行是相對的,並行是絕對的。 問題一: 電腦是如何執行程式指令的? 問題二: 電腦如 ...
  • 1. fgetss函數php官網的解釋是: (PHP 4, PHP 5, PHP 7) fgetss — 從文件指針中讀取一行並過濾掉 HTML 標記 2. 測試後出現的問題是: 當文本中有一行數據出現 < 左尖括弧字元時,會把下麵的數據全部替換成空白行 ,每行讀取到的數據都是空白 ...
  • 前面提到條件語句的標準格式為“if (條件) { /* 條件成立時的操作代碼 */ } else { /* 條件不成立時的操作代碼 */ }”,乍看之下仿佛只有兩個分支,一個是條件成立時的分支,另一個是條件不成立時的分支。很明顯僅僅兩個分支是不能滿足複雜的業務需求的,自然Java代碼也不會這麼傻瓜到 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...