SpringBoot-Learning系列之Kafka整合

来源:https://www.cnblogs.com/andreby/archive/2023/09/11/17694051.html
-Advertisement-
Play Games

SpringBoot-Learning系列之Kafka整合 本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。 消息系統 主要應用場景 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦) 非同步處理、順序處理 實時數據 ...


SpringBoot-Learning系列之Kafka整合

本系列是一個獨立的SpringBoot學習系列,本著 What Why How 的思想去整合Java開發領域各種組件。

file

  • 消息系統

    • 主要應用場景
      • 流量消峰(秒殺 搶購)、應用解耦(核心業務與非核心業務之間的解耦)
      • 非同步處理、順序處理
      • 實時數據傳輸管道
      • 異構語言架構系統之間的通信
        • 如 C語言的CS客戶端的HIS系統與java語言開發的互聯網線上診療系統的交互
  • Kafka是什麼

    kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。是java領域常用的消息隊列。

    核心概念:

    • 生產者(Producer) 生產者應用向主題隊列中投送消息數據
    • 消費者 (Consumer) 消費者應用從訂閱的Kafka的主題隊列中獲取數據、處理數據等後續操作
    • 主題 (Topic) 可以理解為生產者與消費者交互的橋梁
    • 分區 (Partition) 預設一個主題有一個分區,用戶可以設置多個分區。每個分區可以有多個副本(Replica)。分區的作用是,將數據劃分為多個小塊,提高併發性和可擴展性。每個分區都有一個唯一的標識符,稱為分區號。消息按照鍵(key)來進行分區,相同鍵的消息會被分配到同一個分區中。分區可以有不同的消費者同時消費。副本的作用是提供數據的冗餘和故障恢復。每個分區可以有多個副本,其中一個被稱為領導者(leader),其他副本被稱為追隨者(follower)。領導者負責處理讀寫請求,而追隨者只負責複製領導者的數據。如果領導者宕機或不可用,某個追隨者會被選舉為新的領導者,保證數據的可用性。
  • windows 安裝kafka

    本地環境DockerDeskTop+WSL2,基於Docker方式安裝Kafka

    2.8.0後不需要依賴zk了

    • 拉取鏡像

      docker pull wurstmeister/zookeeper
      
      docker pull wurstmeister/kafka
      
    • 創建網路

      docker network create kafka-net --driver bridge
      
    • 安裝zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安裝kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 測試

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依賴

      								```
      										<?xml version="1.0" encoding="UTF-8"?>
      										<project xmlns="http://maven.apache.org/POM/4.0.0"
      														 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      														 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      												<modelVersion>4.0.0</modelVersion>
      												<parent>
      														<groupId>org.springframework.boot</groupId>
      														<artifactId>spring-boot-starter-parent</artifactId>
      														<version>3.1.0</version>
      														<relativePath/> <!-- lookup parent from repository -->
      												</parent>
      												<groupId>io.github.vino42</groupId>
      												<artifactId>springboot-kafka</artifactId>
      												<version>1.0-SNAPSHOT</version>
      
      												<properties>
      														<java.version>17</java.version>
      														<maven.compiler.source>17</maven.compiler.source>
      														<maven.compiler.target>17</maven.compiler.target>
      														<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      												</properties>
      
      
      												<dependencies>
      														<dependency>
      																<groupId>org.projectlombok</groupId>
      																<artifactId>lombok</artifactId>
      																<optional>true</optional>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-test</artifactId>
      																<scope>test</scope>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-web</artifactId>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-log4j2</artifactId>
      														</dependency>
      														<!--kafka-->
      														<dependency>
      																<groupId>org.springframework.kafka</groupId>
      																<artifactId>spring-kafka</artifactId>
      																<exclusions>
      																		<!--排除掉 自行添加最新的官方clients依賴-->
      																		<exclusion>
      																				<groupId>org.apache.kafka</groupId>
      																				<artifactId>kafka-clients</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.apache.kafka</groupId>
      																<artifactId>kafka-clients</artifactId>
      																<version>3.5.1</version>
      														</dependency>
      														<dependency>
      																<groupId>com.google.code.gson</groupId>
      																<artifactId>gson</artifactId>
      																<version>2.10.1</version>
      														</dependency>
      														<dependency>
      																<groupId>cn.hutool</groupId>
      																<artifactId>hutool-all</artifactId>
      																<version>5.8.21</version>
      														</dependency>
      
      												</dependencies>
      												<build>
      														<plugins>
      																<plugin>
      																		<groupId>org.springframework.boot</groupId>
      																		<artifactId>spring-boot-maven-plugin</artifactId>
      																		<version>3.1.0</version>
      																</plugin>
      														</plugins>
      												</build>
      										</project>
      						```
      
    • 配置

      spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發送消息的數量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,處理完業務後,手動調用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交
            #      MANUAL_IMMEDIATE	每處理完業務手動調用Acknowledgment.acknowledge()後立即提交
            #      RECORD	當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
            #      BATCH	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
            #      TIME	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
            #      COUNT	當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
            #      COUNT_TIME	TIME或COUNT滿足其中一個時提交
            ack-mode: manual_immediate
          consumer:
            group-id: test
            # 是否自動提交
            enable-auto-commit: false
            max-poll-records: 100
            #      用於指定消費者在啟動時、重置消費偏移量時的行為。
            #      earliest:消費者會將消費偏移量重置為最早的可用偏移量,也就是從最早的消息開始消費。
            #      latest:消費者會將消費偏移量重置為最新的可用偏移量,也就是只消費最新發送的消息。
            #      none:如果找不到已保存的消費偏移量,消費者會拋出一個異常
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      server:
        port: 8888spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量發送消息的數量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
            ack-mode: manual_immediate
          consumer:
            group-id: test
            enable-auto-commit: false
            max-poll-records: 100
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息體的編解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      
    • 生產者代碼示例

      package io.github.vino42.publiser;
      
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      /**
       * =====================================================================================
       *
       * @Created :   2023/8/30 21:29
       * @Compiler :  jdk 17
       * @Author :    VINO
       * @Copyright : VINO
       * @Decription : kafak 消息生產者
       * =====================================================================================
       */
      @Component
      public class KafkaPublishService {
          @Autowired
          KafkaTemplate kafkaTemplate;
      
          /**
           * 這裡為了簡單 直接發送json字元串
           *
           * @param json
           */
          public void send(String topic, String json) {
              kafkaTemplate.send(topic, json);
          }
      }
      
      
          @RequestMapping("/send")
          public String send() {
              IntStream.range(0, 10000).forEach(d -> {
                  kafkaPublishService.send("test", RandomUtil.randomString(16));
              });
              return "ok";
          }
      
      
    • 消費者

      @Component
      @Slf4j
      public class CustomKafkaListener {
      
          @org.springframework.kafka.annotation.KafkaListener(topics = "test")
          public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {
              try {
                  String key = String.valueOf(record.key());
                  String body = record.value();
                  log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);
                  log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //手動ack
                  acknowledgment.acknowledge();
              }
          }
      }
      

SpringBoot Learning系列 是筆者總結整理的一個SpringBoot學習集合。可以說算是一個SpringBoot學習的大集合。歡迎Star關註。謝謝觀看。

file
關註公眾號不迷路


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近系統學習了vue.js 的官方腳手架create-vue的源碼,深入分析了裡面的技術實現細節,本文是我整理的源碼學習文章。 ...
  • 很早之前,就寫過一篇與原生嵌套相關的文章 -- CSS 即將支持嵌套,SASS/LESS 等預處理器已無用武之地?,彼時 CSS 原生嵌套還處於工作草案 Working Draft (WD) 階段,而今天(2023-09-02),CSS 原生嵌套 Nesting 終於成為了既定的規範! CSS 原生 ...
  • 淺拷貝 當我們想要複製一段數據的時候嗎,我們就會用到拷貝;拷貝數據又分為了淺拷貝和深拷貝,淺拷貝指複製對象或數組的頂層結構,如果對象或數組中有引用類型的屬性值,複製的是引用(地址)而非值;而深拷貝則是遞歸複製完整的對象或數組,包括嵌套的子對象或子數組,生成一個全新的對象,新對象和原對象的引用地址不同 ...
  • 前言 大家好,我是 god23bin,今天我們來聊一聊 Spring 框架中的 Bean 作用域(Scope)。 什麼是 Bean 的作用域? 我們在以 XML 作為配置元數據的情況下,進行 Bean 的定義,是這樣的: <bean id="vehicle" class="cn.god23bin.d ...
  • 1 ★★★ 例1 : 判斷集合是否為空: 2 CollectionUtils.isEmpty(null); //控制台列印:true 3 CollectionUtils.isEmpty(new ArrayList());//控制台列印:true 4 CollectionUtils.isEmpty({ ...
  • 使用<property>標簽的value屬性配置原始數據類型和ref屬性配置對象引用的方式來定義Bean配置文件。這兩種情況都涉及將單一值傳遞給Bean。那麼如果您想傳遞多個值,例如Java集合類型,如List、Set、Map和Properties怎麼辦?為了處理這種情況,Spring提供了四種類型 ...
  • 裝飾器 裝飾器的簡易版本 import time def index(): time.sleep(3) print('from index') def home(): print('from home') def func(): print('from func') def outer(func_n ...
  • 數據來源:House Prices - Advanced Regression Techniques 參考文獻: Comprehensive data exploration with Python 1. 導入數據 import pandas as pd import warnings warnin ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...