Spring Cloud Stream 進行服務之間的通訊

来源:https://www.cnblogs.com/bluersw/archive/2019/10/15/11675139.html
-Advertisement-
Play Games

Spring Cloud Stream Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、S ...


Spring Cloud Stream

Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進行封裝(整合)和擴展,下麵我們實現兩個服務之間的通訊來演示Spring Cloud Stream的使用方法。

整體概述

Alt text
服務要想與其他服務通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用於發送消息,輸入通道用於接收消息,每個通道都會有個名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的複雜性和不一致性,綁定器會用通道的名字在消息中間件中定義主題,一個主題內的消息生產者來自多個服務,一個主題內消息的消費者也是多個服務,也就是說消息的發佈和消費是通過主題進行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實現,在Kafka中主題使用Topic實現。

準備環境

創建兩個項目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實現通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實現通訊。
兩個項目的POM文件依賴都是:

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

spring-cloud-stream-binder-rabbit是指綁定器的實現使用RabbitMQ。

項目配置內容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

啟動一個rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

編寫A項目代碼

在A項目中定義一個輸入通道一個輸出通道,定義通道在介面中使用@Input和@Output註解定義,程式啟動的時候Spring Cloud Stream會根據介面定義將實現類自動註入(Spring Cloud Stream自動實現該介面不需要寫代碼)。
A服務輸入通道,通道名稱ChatExchanges-A-Input,介面定義輸入通道必須返回SubscribableChannel:

public interface ChatInput {

    String INPUT = "ChatExchanges-A-Input";

    @Input(ChatInput.INPUT)
    SubscribableChannel input();
}

A服務輸出通道,通道名稱ChatExchanges-A-Output,輸出通道必須返回MessageChannel:

public interface ChatOutput {

    String OUTPUT = "ChatExchanges-A-Output";

    @Output(ChatOutput.OUTPUT)
    MessageChannel output();
}

定義消息實體類:

public class ChatMessage implements Serializable {

    private String name;
    private String message;
    private Date chatDate;

    //沒有無參數的構造函數並行化會出錯
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
    }
}

在業務處理類上用@EnableBinding註解綁定輸入通道和輸出通道,這個綁定動作其實就是創建並註冊輸入和輸出通道的實現類到Bean中,所以可以直接是使用@Autowired進行註入使用,另外消息的串列化預設使用application/json格式(com.fastexml.jackson),最後用@StreamListener註解進行指定通道消息的監聽:

//ChatInput.class的輸入通道不在這裡綁定,監聽到數據會找不到AClient類的引用。
//Input和Output通道定義的名字不能一樣,否則程式啟動會拋異常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

    private static Logger logger = LoggerFactory.getLogger(AClient.class);

    @Autowired
    private ChatOutput chatOutput;

    //StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
    @StreamListener(ChatInput.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());

        ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

        chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
    }
}

到此A項目代碼編寫完成。

編寫B項目代碼

B項目使用Spring Integration實現消息的發佈和消費,定義通道時我們要交換輸入通道和輸出通道的名稱:

public interface ChatProcessor {

    String OUTPUT = "ChatExchanges-A-Input";
    String INPUT  = "ChatExchanges-A-Output";

    @Input(ChatProcessor.INPUT)
    SubscribableChannel input();

    @Output(ChatProcessor.OUTPUT)
    MessageChannel output();
}

消息實體類:

public class ChatMessage {
    private String name;
    private String message;
    private Date chatDate;

    //沒有無參數的構造函數並行化會出錯
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
    }
}

業務處理類用@ServiceActivator註解代替@StreamListener,用@InboundChannelAdapter註解發佈消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

    private static Logger logger = LoggerFactory.getLogger(BClient.class);

    //@ServiceActivator沒有Json轉對象的能力需要藉助@Transformer註解
    @ServiceActivator(inputChannel=ChatProcessor.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());
    }

    @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
    public ChatMessage transform(String message) throws Exception{
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(message,ChatMessage.class);
    }

    //每秒發出一個消息給A
    @Bean
    @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
    public GenericMessage<ChatMessage> SendChatMessage(){
        ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
        GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
        return gm;
    }
}

運行程式

啟動A項目和B項目:
Alt text
Alt text

消費組和消息分區

  • 消費組:服務的部署一般是同一個服務會部署多份,如果希望一條消息只執行一次,就將這些相同服務的不同部署實例設置成一個消費組,消費組內的消息只會被一個實例消費。
  • 消息分區:在一個消費組內除了要保證只有一個實例消費外,還要保證具備相同特征的消息被同一個實例進行消費。

消費組的設定比較簡單,在消息的消費方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.group={分組名}
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
在消息的產生方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
spring-cloud-stream-a配置內容:

#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.group=A.group
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput

spring-cloud-stream-b配置內容:

#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.group=B.group
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput

消息分區首先在消息消費方開啟消息分區並配置消費者數量和當前消費者索引,然後在消息生產者配置分區鍵表達式和分區數量(因為是測試我們都將數量設置為1):
spring-cloud-stream-a配置內容:

#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionCount=1

spring-cloud-stream-b配置內容:

#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionCount=1

修改spring-cloud-stream-a和spring-cloud-stream-b的發送消息代碼:
spring-cloud-stream-a:

    //StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
    @StreamListener(ChatInput.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());

        ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

        //這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
        int feature = 1;
        Map<String, Object> headers = new HashMap<>();
        headers.put("router", feature);

        GenericMessage<ChatMessage> genericMessage = new GenericMessage<>(replyMessage,headers);

        chatOutput.output().send(MessageBuilder.fromMessage(genericMessage).build());
    }

spring-cloud-stream-b:

    //每秒發出一個消息給A
    @Bean
    @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
    public GenericMessage<ChatMessage> SendChatMessage(){
        ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());

        //這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
        int feature = 1;
        Map<String, Object> headers = new HashMap<>();
        headers.put("router", feature);

        return  new GenericMessage<>(message,headers);
    }

運行結果:
Alt text
Alt text
Alt text
Alt text
Alt text

源碼

Github倉庫:https://github.com/sunweisheng/spring-cloud-example


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 場景 SpringCloud-服務註冊與實現-Eureka創建服務註冊中心(附源碼下載): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/102535957 上面已經搭建好服務註冊中心,開始創建服務提供者。 當 Client 向 ...
  • 一、代理模式的作用 將主要業務與次要業務進行松耦合的組裝 二、代理模式本質 監控行為的特征 例子: <input type="button" onclick="處理函數"> 三、生活案例 案例:飯前便後要洗手 分析: 1.分析出主要業務和次要業務 【主要業務】:吃飯,上廁所 【次要業務】:洗手 2. ...
  • 在高併發業務場景下,消息隊列在流量削峰、解耦上有不可替代的作用。當前使用較多的消息隊列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等。 消息隊列這麼多,到底該選擇哪款消息隊列呢? 選擇消息隊列的基本標準 雖然這些消息隊列在功能和特性方面各有優劣, ...
  • 1. 概述 logstash把格式化的數據發送到elasticsearch以後,elasticsearch負責存儲搜索日誌數據 elasticsearch的搜索介面還是很強大的,這邊不詳細展開,因為kibana會去調用el的介面; 本文將講解elasticsearch的相關配置和遇到的問題,至於el ...
  • Top12原則: 開發各階段流程及規範 需求、架構、設計、開發、測試等階段流程及規範 需求是基石 總流程 :戰略規劃 3-4年->產品立項-> 用戶調研+競品分析->業務信息 -> 需求分析 -> 指導 開發 與測試規範 -> 標尺【功能清單 + 4 要素: 界面 + 邏輯 + 交互 + 數據】-> ...
  • 'XML添加 Public Sub Add(ID As String, RFSerialnumber As String, Mood As Integer) If reatch(RFSerialnumber) = 1 Then Return End If Dim xmlDoc As New XmlD ...
  • 再用dubbo作為項目架構的時候,給consumer消費者用nginx提供了負載均衡策略和集群的實現, 但是想了下,consumer再多,但是提供者還是一個,最後還不都是落到了這一個provider上面? 舉個列子: 一個飯店有1個後廚在做飯, 前臺有100個點菜的服務員, 100個顧客來點餐,每個 ...
  • 前言 今天我們講的是狀態模式【State Pattern】、這個名字咋一看不好理解,但是仔細一想還是比較容易的。狀態模式重點關註的是狀態。狀態又牽扯著什麼呢?房屋的狀態暫且可以分為出租、簽訂合同、退房。那麼出租對應的是什麼呢?出租狀態代表可以租房。可以租房是一個行為了。所以不難理解的是狀態模式關註的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...