SpringCloud之Spring Cloud Stream:消息驅動

来源:https://www.cnblogs.com/gdjlc/archive/2019/11/23/11920463.html
-Advertisement-
Play Games

Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來連接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。 應用程... ...


Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來連接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。
應用程式通過input通道或者output通道來與Spring Cloud Stream中binder(綁定器)交互,通過配置來binding. 而Spring Cloud Stream的binder負責與中間件交互。

開發工具:IntelliJ IDEA 2019.2.3

一、伺服器端

1、創建項目

IDEA中創建一個新的SpringBoot項目,名稱為“spring-server”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Spring Cloud Discovery -> Eureka Server。
pom.xml完整內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-server</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

修改埠號為8761;取消將自己信息註冊到Eureka伺服器,不從Eureka伺服器抓取註冊信息。

server:
  port: 8761
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

3、修改啟動類代碼

增加註解@EnableEurekaServer

package com.example.springserver;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class SpringServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringServerApplication.class, args);
    }

}
View Code

二、消息生產者

1、創建項目
IDEA中創建一個新的SpringBoot項目,名稱為“spring-producer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit,會自動引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

pom.xml使用RabbitMQ,預設情況下,連接本地的5672埠。下麵這段rabbitmq也可省略。

server:
  port: 8081
spring:
  application:
    name: spring-producer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、編寫發送服務

方法sendOrder使用@Output("myInput")註解表示創建myInput的消息通道。調用該方法後,會向myInput通道投遞消息。
如果不使用參數myInput,則使用方法名作為通道名稱。

package com.example.springproducer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface SendService {
    @Output("myInput")
    SubscribableChannel sendOrder();
}

4、修改啟動類代碼

加入註解@EnableBinding以開啟Spring容器的綁定功能,以SendService.class為參數,Spring容器啟動時,會自動綁定SendService介面中定義的通道。

package com.example.springproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class SpringProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

}

5、添加一個控制器類

調用SendService的發送方法,往伺服器發送消息。

package com.example.springproducer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    SendService sendService;

    @RequestMapping(value="/send",method= RequestMethod.GET)
    public String sendRequest(){
        //創建消息
        Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
        //發送消息
        sendService.sendOrder().send(msg);
        return "SUCCESS";
    }
}

三、消息消費者

1、創建項目

IDEA中創建一個新的SpringBoot項目,名稱為“spring-consumer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、縮寫接受消息的通道介面

package com.example.springconsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiveService {
    @Input("myInput")
    SubscribableChannel myInput();
}

4、修改啟動類代碼

同樣綁定消息通道

package com.example.springconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@SpringBootApplication
@EnableBinding(ReceiveService.class)
public class SpringConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumerApplication.class, args);
    }

    //訂閱myInput通道的消息
    @StreamListener("myInput")
    public void receive(byte[] msg){
        System.out.println("接收到的消息:" + new String(msg));
    }
}

5、測試

(1)檢查服務裡面的RabbitMQ是否有啟動(預設啟動);

(2)啟動spring-server(8761埠);

(3)啟動spring-producer(8081埠);

(4)啟動spring-consumer(8080埠);

(5)瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制台輸出:

接收到的消息:hello world

說明消費者已經可以從消息代理中獲取到消息。

四、更換綁定器

上面使用了RabbitMQ作為消息代理,如果使用Kafka,可以更換Maven依賴實現。
在生產者和消費者的pom.xml中,將spring-cloud-starter-stream-rabbit修改為spring-cloud-starter-stream-kafka。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • https://dvteclipse.com/documentation/svlinter/How_to_use_special_characters_in_XML.3F.html Because XML syntax uses some characters for tags and attrib ...
  • 生成條形碼 <body> <div> <img id="ma"/> </div> </body> </html> <script src="jquery-1.8.1.min.js"></script> <script type="text/javascript" src="https://cdn.b ...
  • 閉包(closure)是Javascript語言的一個難點,也是它的特色,很多高級應用都要依靠閉包實現。 下麵就是我的學習筆記,對於Javascript初學者應該是很有用的。 一、變數的作用域 要理解閉包,首先必須理解Javascript特殊的變數作用域。 變數的作用域無非就是兩種:全局變數和局部變 ...
  • 12-Factor與雲原生雲原生應用今天先到這兒,希望對技術領導力, 企業管理,系統架構設計與評估,團隊管理, 項目管理, 產品管理,團隊建設 有參考作用 , 您可能感興趣的文章: 精益IT組織與分享式領導領導人怎樣帶領好團隊構建創業公司突擊小團隊國際化環境下系統架構演化微服務架構設計視頻直播平臺的... ...
  • 1 介紹 MongoDB中文社區(mongoing.com)是大中華區獲得官方認可的中文社區,11月23日下午,在廣州舉辦了線下用戶大會,帶給大家一手乾貨和實踐。 2 大會議程 大會組織者對時間的把控做得非常好,沒有拖沓,基本是按時既定流程走的。具體流程如下: 3 一些個人收穫 3.1 MongoD ...
  • [TOC] Java程式在記憶體中運行詳解 Java語言是一門編譯型語言,需要將編寫的源代碼(.java文件)編譯之後(.class位元組碼文件),通過 jvm 才能正常的執行,下麵的內容記錄了一個程式從編寫到執行整個過程在記憶體中是怎麼一個變的。 一、JVM的記憶體分佈 先瞭解下 JVM 的記憶體分佈,因為 ...
  • 本文適合JAVA新人,想瞭解RabbitMQ又不想去看官網文檔的人(英語水看的頭疼(◎﹏◎),但建議有能力還是去看官網文檔)。 消息隊列MQ(一) MQ全稱為Message Queue,消息隊列是應用程式和應用程式之間的通信方法。 先引入一下常見的通訊方案。 為什麼使用MQ? 在項目中,可將一些無需 ...
  • 前言 "《【源碼解析】憑什麼?spring boot 一個 jar 就能開發 web 項目》 " 中有讀者反應: 部署後運維很不方便,比較修改一個 IP 配置,需要重新打包。 這一點我是深有體會,17 年自學,並很大膽的直接在生產環境用的時候,我都是讓產品經理(此時他充當我們的運維,嘿嘿)用壓縮軟體 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...