SpringBoot-Learning系列之Kafka整合

来源:https://www.cnblogs.com/andreby/archive/2023/09/11/17694051.html
-Advertisement-
Play Games

SpringBoot-Learning系列之Kafka整合 本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。 消息系統 主要應用場景 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦) 非同步處理、順序處理 實時數據 ...


SpringBoot-Learning系列之Kafka整合

本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。

file

  • 消息系統

    • 主要應用場景
      • 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦)
      • 非同步處理、順序處理
      • 實時數據傳輸管道
      • 異構語言架構系統之間的通信
        • 如 C語言的CS客戶端的HIS系統與java語言開發的互聯網線上診療系統的交互
  • Kafka是什麼

    kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。是java領域常用的消息隊列。

    核心概念:

    • 生產者(Producer) 生產者應用向主題隊列中投送消息數據
    • 消費者 (Consumer) 消費者應用從訂閱的Kafka的主題隊列中獲取數據、處理數據等後續操作
    • 主題 (Topic) 可以理解為生產者與消費者交互的橋梁
    • 分區 (Partition) 預設一個主題有一個分區,用戶可以設置多個分區。每個分區可以有多個副本(Replica)。分區的作用是,將數據劃分為多個小塊,提高併發性和可擴展性。每個分區都有一個唯一的標識符,稱為分區號。消息按照鍵(key)來進行分區,相同鍵的消息會被分配到同一個分區中。分區可以有不同的消費者同時消費。副本的作用是提供數據的冗餘和故障恢復。每個分區可以有多個副本,其中一個被稱為領導者(leader),其他副本被稱為追隨者(follower)。領導者負責處理讀寫請求,而追隨者只負責複製領導者的數據。如果領導者宕機或不可用,某個追隨者會被選舉為新的領導者,保證數據的可用性。
  • windows 安裝kafka

    本地環境DockerDeskTop+WSL2,基於Docker方式安裝Kafka

    2.8.0後不需要依賴zk了

    • 拉取鏡像

      docker pull wurstmeister/zookeeper
      
      docker pull wurstmeister/kafka
      
    • 創建網路

      docker network create kafka-net --driver bridge
      
    • 安裝zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安裝kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 測試

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依賴

      								```
      										<?xml version="1.0" encoding="UTF-8"?>
      										<project xmlns="http://maven.apache.org/POM/4.0.0"
      														 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      														 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      												<modelVersion>4.0.0</modelVersion>
      												<parent>
      														<groupId>org.springframework.boot</groupId>
      														<artifactId>spring-boot-starter-parent</artifactId>
      														<version>3.1.0</version>
      														<relativePath/> <!-- lookup parent from repository -->
      												</parent>
      												<groupId>io.github.vino42</groupId>
      												<artifactId>springboot-kafka</artifactId>
      												<version>1.0-SNAPSHOT</version>
      
      												<properties>
      														<java.version>17</java.version>
      														<maven.compiler.source>17</maven.compiler.source>
      														<maven.compiler.target>17</maven.compiler.target>
      														<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      												</properties>
      
      
      												<dependencies>
      														<dependency>
      																<groupId>org.projectlombok</groupId>
      																<artifactId>lombok</artifactId>
      																<optional>true</optional>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-test</artifactId>
      																<scope>test</scope>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-web</artifactId>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-log4j2</artifactId>
      														</dependency>
      														<!--kafka-->
      														<dependency>
      																<groupId>org.springframework.kafka</groupId>
      																<artifactId>spring-kafka</artifactId>
      																<exclusions>
      																		<!--排除掉 自行添加最新的官方clients依賴-->
      																		<exclusion>
      																				<groupId>org.apache.kafka</groupId>
      																				<artifactId>kafka-clients</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.apache.kafka</groupId>
      																<artifactId>kafka-clients</artifactId>
      																<version>3.5.1</version>
      														</dependency>
      														<dependency>
      																<groupId>com.google.code.gson</groupId>
      																<artifactId>gson</artifactId>
      																<version>2.10.1</version>
      														</dependency>
      														<dependency>
      																<groupId>cn.hutool</groupId>
      																<artifactId>hutool-all</artifactId>
      																<version>5.8.21</version>
      														</dependency>
      
      												</dependencies>
      												<build>
      														<plugins>
      																<plugin>
      																		<groupId>org.springframework.boot</groupId>
      																		<artifactId>spring-boot-maven-plugin</artifactId>
      																		<version>3.1.0</version>
      																</plugin>
      														</plugins>
      												</build>
      										</project>
      						```
      
    • 配置

      spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發送消息的數量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,處理完業務後,手動調用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交
            #      MANUAL_IMMEDIATE	每處理完業務手動調用Acknowledgment.acknowledge()後立即提交
            #      RECORD	當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
            #      BATCH	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
            #      TIME	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
            #      COUNT	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
            #      COUNT_TIME	TIME或COUNT滿足其中一個時提交
            ack-mode: manual_immediate
          consumer:
            group-id: test
            # 是否自動提交
            enable-auto-commit: false
            max-poll-records: 100
            #      用於指定消費者在啟動時、重置消費偏移量時的行為。
            #      earliest:消費者會將消費偏移量重置為最早的可用偏移量,也就是從最早的消息開始消費。
            #      latest:消費者會將消費偏移量重置為最新的可用偏移量,也就是只消費最新發送的消息。
            #      none:如果找不到已保存的消費偏移量,消費者會拋出一個異常
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      server:
        port: 8888spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發送消息的數量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
            ack-mode: manual_immediate
          consumer:
            group-id: test
            enable-auto-commit: false
            max-poll-records: 100
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      
    • 生產者代碼示例

      package io.github.vino42.publiser;
      
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      /**
       * =====================================================================================
       *
       * @Created :   2023/8/30 21:29
       * @Compiler :  jdk 17
       * @Author :    VINO
       * @Copyright : VINO
       * @Decription : kafak 消息生產者
       * =====================================================================================
       */
      @Component
      public class KafkaPublishService {
          @Autowired
          KafkaTemplate kafkaTemplate;
      
          /**
           * 這裡為了簡單 直接發送json字元串
           *
           * @param json
           */
          public void send(String topic, String json) {
              kafkaTemplate.send(topic, json);
          }
      }
      
      
          @RequestMapping("/send")
          public String send() {
              IntStream.range(0, 10000).forEach(d -> {
                  kafkaPublishService.send("test", RandomUtil.randomString(16));
              });
              return "ok";
          }
      
      
    • 消費者

      @Component
      @Slf4j
      public class CustomKafkaListener {
      
          @org.springframework.kafka.annotation.KafkaListener(topics = "test")
          public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {
              try {
                  String key = String.valueOf(record.key());
                  String body = record.value();
                  log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);
                  log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //手動ack
                  acknowledgment.acknowledge();
              }
          }
      }
      

SpringBoot Learning系列 是筆者總結整理的一個SpringBoot學習集合。可以說算是一個SpringBoot學習的大集合。歡迎Star關註。謝謝觀看。

file
關註公眾號不迷路


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近系統學習了vue.js 的官方腳手架create-vue的源碼,深入分析了裡面的技術實現細節,本文是我整理的源碼學習文章。 ...
  • 很早之前,就寫過一篇與原生嵌套相關的文章 -- CSS 即將支持嵌套,SASS/LESS 等預處理器已無用武之地?,彼時 CSS 原生嵌套還處於工作草案 Working Draft (WD) 階段,而今天(2023-09-02),CSS 原生嵌套 Nesting 終於成為了既定的規範! CSS 原生 ...
  • 淺拷貝 當我們想要複製一段數據的時候嗎,我們就會用到拷貝;拷貝數據又分為了淺拷貝和深拷貝,淺拷貝指複製對象或數組的頂層結構,如果對象或數組中有引用類型的屬性值,複製的是引用(地址)而非值;而深拷貝則是遞歸複製完整的對象或數組,包括嵌套的子對象或子數組,生成一個全新的對象,新對象和原對象的引用地址不同 ...
  • 前言 大家好,我是 god23bin,今天我們來聊一聊 Spring 框架中的 Bean 作用域(Scope)。 什麼是 Bean 的作用域? 我們在以 XML 作為配置元數據的情況下,進行 Bean 的定義,是這樣的: <bean id="vehicle" class="cn.god23bin.d ...
  • 1 ★★★ 例1 : 判斷集合是否為空: 2 CollectionUtils.isEmpty(null); //控制台列印:true 3 CollectionUtils.isEmpty(new ArrayList());//控制台列印:true 4 CollectionUtils.isEmpty({ ...
  • 使用<property>標簽的value屬性配置原始數據類型和ref屬性配置對象引用的方式來定義Bean配置文件。這兩種情況都涉及將單一值傳遞給Bean。那麼如果您想傳遞多個值,例如Java集合類型,如List、Set、Map和Properties怎麼辦?為了處理這種情況,Spring提供了四種類型 ...
  • 裝飾器 裝飾器的簡易版本 import time def index(): time.sleep(3) print('from index') def home(): print('from home') def func(): print('from func') def outer(func_n ...
  • 數據來源:House Prices - Advanced Regression Techniques 參考文獻: Comprehensive data exploration with Python 1. 導入數據 import pandas as pd import warnings warnin ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...