SpringBoot3集成Kafka

来源:https://www.cnblogs.com/cicada-smile/archive/2023/08/18/17639820.html
-Advertisement-
Play Games

Kafka是一個開源的分散式事件流平臺,常被用於高性能數據管道、流分析、數據集成和關鍵任務應用,基於Zookeeper協調的處理平臺,也是一種消息系統,具有更好的吞吐量、內置分區、複製和容錯。 ...


目錄

標簽:Kafka3.Kafka-eagle3;

一、簡介

Kafka是一個開源的分散式事件流平臺,常被用於高性能數據管道、流分析、數據集成和關鍵任務應用,基於Zookeeper協調的處理平臺,也是一種消息系統,具有更好的吞吐量、內置分區、複製和容錯,這使得它成為大規模消息處理應用程式的一個很好的解決方案;

二、環境搭建

1、Kafka部署

1、下載安裝包:kafka_2.13-3.5.0.tgz

2、配置環境變數

open -e ~/.bash_profile

export KAFKA_HOME=/本地路徑/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile

3、該目錄【kafka3.5/bin】啟動zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties

4、該目錄【kafka3.5/bin】啟動kafka
kafka-server-start.sh ../config/server.properties

2、Kafka測試

1、生產者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message

2、消費者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message

3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic

4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message

3、可視化工具

配置和部署

1、下載安裝包:kafka-eagle-bin-3.0.2.tar.gz

2、配置環境變數

open -e ~/.bash_profile

export KE_HOME=/本地路徑/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin

source ~/.bash_profile

3、修改配置文件:system-config.properties

efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle

4、本地新建資料庫:kafka-eagle,註意用戶名和密碼是否一致

5、啟動命令
efak-web-3.0.2/bin/ke.sh start
命令語法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}

6、本地訪問【localhost:8048】 username:admin password:123456

KSQL語句測試

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程結構

2、依賴管理

這裡關於依賴的管理就比較複雜了,首先spring-kafka組件選擇與boot框架中spring相同的依賴,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;

但是該版本使用的是kafka-clients組件的3.3.2版本,在Spring文檔的kafka模塊中,明確說明spring-boot:3.1要使用kafka-clients:3.4,所以從spring-kafka組件中排除掉,重新依賴kafka-clients組件;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka-clients.version}</version>
</dependency>

3、配置文件

配置kafka連接地址,監聽器的消息應答機制,消費者的基礎模式;

spring:
  # kafka配置
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      missing-topics-fatal: false
      ack-mode: manual_immediate
    consumer:
      group-id: boot-kafka-group
      enable-auto-commit: false
      max-poll-records: 10
      properties:
        max.poll.interval.ms: 3600000

四、基礎用法

1、消息生產

模板類KafkaTemplate用於執行高級的操作,封裝各種消息發送的方法,在該方法中,通過topickey以及消息主體,實現消息的生產;

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/msg")
    public String sendMsg (){
        try {
            // 構建消息主體
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));
            // 發送消息
            kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消費

編寫消息監聽類,通過KafkaListener註解控制監聽的具體信息,在實現消息生產和消費的方法測試後,使用可視化工具kafka-eagle查看topic和消息列表;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaListener(topics = "boot-kafka-topic")
    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
        try {
            String key =  String.valueOf(record.key());
            String body = record.value();
            log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            acknowledgment.acknowledge();
        }
    }
}

五、參考源碼

文檔倉庫:
https://gitee.com/cicadasmile/butte-java-note

源碼倉庫:
https://gitee.com/cicadasmile/butte-spring-parent
Gitee主頁: https://gitee.com/cicadasmile/butte-java-note
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ![image](https://img2023.cnblogs.com/blog/2609621/202308/2609621-20230818091044358-21979402.png) ```html 點擊觸發頁面彈窗 ``` ```js alert('努力,奮鬥') ``` ![image ...
  • 翻了一下之前剛入職時候的學習筆記,發現之前在熟悉業務代碼的時候曾經專門學習並整理過過設計模式中的責任鏈模式,之前只是對其簡單瞭解過常用的設計模式有哪些,並未結合實例和源碼深入對其探究,利用熟悉代碼契機進行系統學習並整理文檔如下。 ...
  • 閱讀本文前,需要儲備的知識點如下,點擊鏈接直接跳轉。 [java線程詳解](https://www.cnblogs.com/star95/p/17583193.html) [Java不能操作記憶體?Unsafe瞭解一下](https://www.cnblogs.com/star95/p/1761943 ...
  • 在業務開發的時候,經常會遇到某一個介面不能對外暴露,只能內網服務間調用的實際需求。面對這樣的情況,我們該如何實現呢? 今天,我們就來理一理這個問題,從幾個可行的方案中,挑選一個來實現。 推薦一個開源免費的 Spring Boot 實戰項目: > [https://github.com/javasta ...
  • # 前言 最近公司一個新的項目,因為需要存儲的數據很少,單獨去部署一個資料庫去存儲該數據顯然是不划算的,所以想的是通過存入csv文件中來代替存入資料庫中。說乾就乾。 ## 什麼是csv文件 CSV代表逗號分隔值(Comma-Separated Values),是一種常見的文本文件格式,用於存儲表格數 ...
  • `google.golang.org/protobuf/encoding/protojson` 是 Go 語言中的一個庫,用於處理 Protocol Buffers(protobuf)和 JSON 之間的轉換,遵循[https://protobuf.dev/programming-guides/pr ...
  • 平時我一直用Notion來記錄內容為主,但也一直關註著其他開源產品。上周正好看到一款非常受歡迎的開源免費筆記,今天就推薦給大家:[**VNote**](https://blog.didispace.com/tj-opensource-vnote/)。 ![file](https://img2023. ...
  • 介紹創建Java單例對象的七種方式,重點掌握哪些創建方式是線程安全的,哪些方式是線程不安全的,並能夠在實際項目中靈活運用設計模式,編寫可維護的代碼。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...