Spring Cloud Stream 進行服務之間的通訊

来源:https://www.cnblogs.com/bluersw/archive/2019/10/15/11675139.html
-Advertisement-
Play Games

Spring Cloud Stream Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、S ...


Spring Cloud Stream

Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進行封裝(整合)和擴展,下麵我們實現兩個服務之間的通訊來演示Spring Cloud Stream的使用方法。

整體概述

Alt text
服務要想與其他服務通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用於發送消息,輸入通道用於接收消息,每個通道都會有個名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的複雜性和不一致性,綁定器會用通道的名字在消息中間件中定義主題,一個主題內的消息生產者來自多個服務,一個主題內消息的消費者也是多個服務,也就是說消息的發佈和消費是通過主題進行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實現,在Kafka中主題使用Topic實現。

準備環境

創建兩個項目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實現通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實現通訊。
兩個項目的POM文件依賴都是:

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

spring-cloud-stream-binder-rabbit是指綁定器的實現使用RabbitMQ。

項目配置內容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

啟動一個rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

編寫A項目代碼

在A項目中定義一個輸入通道一個輸出通道,定義通道在介面中使用@Input和@Output註解定義,程式啟動的時候Spring Cloud Stream會根據介面定義將實現類自動註入(Spring Cloud Stream自動實現該介面不需要寫代碼)。
A服務輸入通道,通道名稱ChatExchanges-A-Input,介面定義輸入通道必須返回SubscribableChannel:

public interface ChatInput {

    String INPUT = "ChatExchanges-A-Input";

    @Input(ChatInput.INPUT)
    SubscribableChannel input();
}

A服務輸出通道,通道名稱ChatExchanges-A-Output,輸出通道必須返回MessageChannel:

public interface ChatOutput {

    String OUTPUT = "ChatExchanges-A-Output";

    @Output(ChatOutput.OUTPUT)
    MessageChannel output();
}

定義消息實體類:

public class ChatMessage implements Serializable {

    private String name;
    private String message;
    private Date chatDate;

    //沒有無參數的構造函數並行化會出錯
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
    }
}

在業務處理類上用@EnableBinding註解綁定輸入通道和輸出通道,這個綁定動作其實就是創建並註冊輸入和輸出通道的實現類到Bean中,所以可以直接是使用@Autowired進行註入使用,另外消息的串列化預設使用application/json格式(com.fastexml.jackson),最後用@StreamListener註解進行指定通道消息的監聽:

//ChatInput.class的輸入通道不在這裡綁定,監聽到數據會找不到AClient類的引用。
//Input和Output通道定義的名字不能一樣,否則程式啟動會拋異常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

    private static Logger logger = LoggerFactory.getLogger(AClient.class);

    @Autowired
    private ChatOutput chatOutput;

    //StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
    @StreamListener(ChatInput.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());

        ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

        chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
    }
}

到此A項目代碼編寫完成。

編寫B項目代碼

B項目使用Spring Integration實現消息的發佈和消費,定義通道時我們要交換輸入通道和輸出通道的名稱:

public interface ChatProcessor {

    String OUTPUT = "ChatExchanges-A-Input";
    String INPUT  = "ChatExchanges-A-Output";

    @Input(ChatProcessor.INPUT)
    SubscribableChannel input();

    @Output(ChatProcessor.OUTPUT)
    MessageChannel output();
}

消息實體類:

public class ChatMessage {
    private String name;
    private String message;
    private Date chatDate;

    //沒有無參數的構造函數並行化會出錯
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
    }
}

業務處理類用@ServiceActivator註解代替@StreamListener,用@InboundChannelAdapter註解發佈消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

    private static Logger logger = LoggerFactory.getLogger(BClient.class);

    //@ServiceActivator沒有Json轉對象的能力需要藉助@Transformer註解
    @ServiceActivator(inputChannel=ChatProcessor.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());
    }

    @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
    public ChatMessage transform(String message) throws Exception{
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(message,ChatMessage.class);
    }

    //每秒發出一個消息給A
    @Bean
    @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
    public GenericMessage<ChatMessage> SendChatMessage(){
        ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
        GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
        return gm;
    }
}

運行程式

啟動A項目和B項目:
Alt text
Alt text

消費組和消息分區

  • 消費組:服務的部署一般是同一個服務會部署多份,如果希望一條消息只執行一次,就將這些相同服務的不同部署實例設置成一個消費組,消費組內的消息只會被一個實例消費。
  • 消息分區:在一個消費組內除了要保證只有一個實例消費外,還要保證具備相同特征的消息被同一個實例進行消費。

消費組的設定比較簡單,在消息的消費方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.group={分組名}
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
在消息的產生方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
spring-cloud-stream-a配置內容:

#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.group=A.group
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput

spring-cloud-stream-b配置內容:

#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.group=B.group
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput

消息分區首先在消息消費方開啟消息分區並配置消費者數量和當前消費者索引,然後在消息生產者配置分區鍵表達式和分區數量(因為是測試我們都將數量設置為1):
spring-cloud-stream-a配置內容:

#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionCount=1

spring-cloud-stream-b配置內容:

#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionCount=1

修改spring-cloud-stream-a和spring-cloud-stream-b的發送消息代碼:
spring-cloud-stream-a:

    //StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
    @StreamListener(ChatInput.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());

        ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

        //這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
        int feature = 1;
        Map<String, Object> headers = new HashMap<>();
        headers.put("router", feature);

        GenericMessage<ChatMessage> genericMessage = new GenericMessage<>(replyMessage,headers);

        chatOutput.output().send(MessageBuilder.fromMessage(genericMessage).build());
    }

spring-cloud-stream-b:

    //每秒發出一個消息給A
    @Bean
    @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
    public GenericMessage<ChatMessage> SendChatMessage(){
        ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());

        //這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
        int feature = 1;
        Map<String, Object> headers = new HashMap<>();
        headers.put("router", feature);

        return  new GenericMessage<>(message,headers);
    }

運行結果:
Alt text
Alt text
Alt text
Alt text
Alt text

源碼

Github倉庫:https://github.com/sunweisheng/spring-cloud-example


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 場景 SpringCloud-服務註冊與實現-Eureka創建服務註冊中心(附源碼下載): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/102535957 上面已經搭建好服務註冊中心,開始創建服務提供者。 當 Client 向 ...
  • 一、代理模式的作用 將主要業務與次要業務進行松耦合的組裝 二、代理模式本質 監控行為的特征 例子: <input type="button" onclick="處理函數"> 三、生活案例 案例:飯前便後要洗手 分析: 1.分析出主要業務和次要業務 【主要業務】:吃飯,上廁所 【次要業務】:洗手 2. ...
  • 在高併發業務場景下,消息隊列在流量削峰、解耦上有不可替代的作用。當前使用較多的消息隊列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等。 消息隊列這麼多,到底該選擇哪款消息隊列呢? 選擇消息隊列的基本標準 雖然這些消息隊列在功能和特性方面各有優劣, ...
  • 1. 概述 logstash把格式化的數據發送到elasticsearch以後,elasticsearch負責存儲搜索日誌數據 elasticsearch的搜索介面還是很強大的,這邊不詳細展開,因為kibana會去調用el的介面; 本文將講解elasticsearch的相關配置和遇到的問題,至於el ...
  • Top12原則: 開發各階段流程及規範 需求、架構、設計、開發、測試等階段流程及規範 需求是基石 總流程 :戰略規劃 3-4年->產品立項-> 用戶調研+競品分析->業務信息 -> 需求分析 -> 指導 開發 與測試規範 -> 標尺【功能清單 + 4 要素: 界面 + 邏輯 + 交互 + 數據】-> ...
  • 'XML添加 Public Sub Add(ID As String, RFSerialnumber As String, Mood As Integer) If reatch(RFSerialnumber) = 1 Then Return End If Dim xmlDoc As New XmlD ...
  • 再用dubbo作為項目架構的時候,給consumer消費者用nginx提供了負載均衡策略和集群的實現, 但是想了下,consumer再多,但是提供者還是一個,最後還不都是落到了這一個provider上面? 舉個列子: 一個飯店有1個後廚在做飯, 前臺有100個點菜的服務員, 100個顧客來點餐,每個 ...
  • 前言 今天我們講的是狀態模式【State Pattern】、這個名字咋一看不好理解,但是仔細一想還是比較容易的。狀態模式重點關註的是狀態。狀態又牽扯著什麼呢?房屋的狀態暫且可以分為出租、簽訂合同、退房。那麼出租對應的是什麼呢?出租狀態代表可以租房。可以租房是一個行為了。所以不難理解的是狀態模式關註的 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...