Spring Cloud Stream Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、S ...
Spring Cloud Stream
Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進行封裝(整合)和擴展,下麵我們實現兩個服務之間的通訊來演示Spring Cloud Stream的使用方法。
整體概述
服務要想與其他服務通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用於發送消息,輸入通道用於接收消息,每個通道都會有個名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的複雜性和不一致性,綁定器會用通道的名字在消息中間件中定義主題,一個主題內的消息生產者來自多個服務,一個主題內消息的消費者也是多個服務,也就是說消息的發佈和消費是通過主題進行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實現,在Kafka中主題使用Topic實現。
準備環境
創建兩個項目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實現通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實現通訊。
兩個項目的POM文件依賴都是:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring-cloud-stream-binder-rabbit是指綁定器的實現使用RabbitMQ。
項目配置內容application.properties:
spring.application.name=spring-cloud-stream-a
server.port=9010
#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011
#設置預設綁定器
spring.cloud.stream.defaultBinder = rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
啟動一個rabbitmq:
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
編寫A項目代碼
在A項目中定義一個輸入通道一個輸出通道,定義通道在介面中使用@Input和@Output註解定義,程式啟動的時候Spring Cloud Stream會根據介面定義將實現類自動註入(Spring Cloud Stream自動實現該介面不需要寫代碼)。
A服務輸入通道,通道名稱ChatExchanges-A-Input,介面定義輸入通道必須返回SubscribableChannel:
public interface ChatInput {
String INPUT = "ChatExchanges-A-Input";
@Input(ChatInput.INPUT)
SubscribableChannel input();
}
A服務輸出通道,通道名稱ChatExchanges-A-Output,輸出通道必須返回MessageChannel:
public interface ChatOutput {
String OUTPUT = "ChatExchanges-A-Output";
@Output(ChatOutput.OUTPUT)
MessageChannel output();
}
定義消息實體類:
public class ChatMessage implements Serializable {
private String name;
private String message;
private Date chatDate;
//沒有無參數的構造函數並行化會出錯
private ChatMessage(){}
public ChatMessage(String name,String message,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = chatDate;
}
public String getName(){
return this.name;
}
public String getMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
}
}
在業務處理類上用@EnableBinding註解綁定輸入通道和輸出通道,這個綁定動作其實就是創建並註冊輸入和輸出通道的實現類到Bean中,所以可以直接是使用@Autowired進行註入使用,另外消息的串列化預設使用application/json格式(com.fastexml.jackson),最後用@StreamListener註解進行指定通道消息的監聽:
//ChatInput.class的輸入通道不在這裡綁定,監聽到數據會找不到AClient類的引用。
//Input和Output通道定義的名字不能一樣,否則程式啟動會拋異常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {
private static Logger logger = LoggerFactory.getLogger(AClient.class);
@Autowired
private ChatOutput chatOutput;
//StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
@StreamListener(ChatInput.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());
chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
}
}
到此A項目代碼編寫完成。
編寫B項目代碼
B項目使用Spring Integration實現消息的發佈和消費,定義通道時我們要交換輸入通道和輸出通道的名稱:
public interface ChatProcessor {
String OUTPUT = "ChatExchanges-A-Input";
String INPUT = "ChatExchanges-A-Output";
@Input(ChatProcessor.INPUT)
SubscribableChannel input();
@Output(ChatProcessor.OUTPUT)
MessageChannel output();
}
消息實體類:
public class ChatMessage {
private String name;
private String message;
private Date chatDate;
//沒有無參數的構造函數並行化會出錯
private ChatMessage(){}
public ChatMessage(String name,String message,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = chatDate;
}
public String getName(){
return this.name;
}
public String getMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
}
}
業務處理類用@ServiceActivator註解代替@StreamListener,用@InboundChannelAdapter註解發佈消息:
@EnableBinding(ChatProcessor.class)
public class BClient {
private static Logger logger = LoggerFactory.getLogger(BClient.class);
//@ServiceActivator沒有Json轉對象的能力需要藉助@Transformer註解
@ServiceActivator(inputChannel=ChatProcessor.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
}
@Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
public ChatMessage transform(String message) throws Exception{
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(message,ChatMessage.class);
}
//每秒發出一個消息給A
@Bean
@InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
public GenericMessage<ChatMessage> SendChatMessage(){
ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
return gm;
}
}
運行程式
啟動A項目和B項目:
消費組和消息分區
- 消費組:服務的部署一般是同一個服務會部署多份,如果希望一條消息只執行一次,就將這些相同服務的不同部署實例設置成一個消費組,消費組內的消息只會被一個實例消費。
- 消息分區:在一個消費組內除了要保證只有一個實例消費外,還要保證具備相同特征的消息被同一個實例進行消費。
消費組的設定比較簡單,在消息的消費方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.group={分組名}
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
在消息的產生方配置文件中增加:
spring.cloud.stream.bindings.{通道名稱}.destination={主題名}
spring-cloud-stream-a配置內容:
#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.group=A.group
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput
spring-cloud-stream-b配置內容:
#設置消費組(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.group=B.group
spring.cloud.stream.bindings.ChatExchanges-A-Output.destination=AOutput
#設置消費組(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.destination=AInput
消息分區首先在消息消費方開啟消息分區並配置消費者數量和當前消費者索引,然後在消息生產者配置分區鍵表達式和分區數量(因為是測試我們都將數量設置為1):
spring-cloud-stream-a配置內容:
#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Output.producer.partitionCount=1
spring-cloud-stream-b配置內容:
#設置分區(消費方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Output.consumer.partitioned=true
spring.cloud.stream.instance-count=1
spring.cloud.stream.instance-index=0
#設置分區(生產方設置)
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionKeyExpression=headers.router
spring.cloud.stream.bindings.ChatExchanges-A-Input.producer.partitionCount=1
修改spring-cloud-stream-a和spring-cloud-stream-b的發送消息代碼:
spring-cloud-stream-a:
//StreamListener自帶了Json轉對象的能力,收到B的消息列印並回覆B一個新的消息。
@StreamListener(ChatInput.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());
//這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
int feature = 1;
Map<String, Object> headers = new HashMap<>();
headers.put("router", feature);
GenericMessage<ChatMessage> genericMessage = new GenericMessage<>(replyMessage,headers);
chatOutput.output().send(MessageBuilder.fromMessage(genericMessage).build());
}
spring-cloud-stream-b:
//每秒發出一個消息給A
@Bean
@InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
public GenericMessage<ChatMessage> SendChatMessage(){
ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
//這裡只是測試實際業務根據需要設計特征值的範圍,這個和消費組內有多少實例有關,然後把特征值放在消息頭router屬性中
int feature = 1;
Map<String, Object> headers = new HashMap<>();
headers.put("router", feature);
return new GenericMessage<>(message,headers);
}
運行結果:
源碼
Github倉庫:https://github.com/sunweisheng/spring-cloud-example