Stream 簡介 Spring Cloud Stream 是用於構建消息驅動的微服務應用程式的框架,提供了多種中間件的合理配置 Spring Cloud Stream 包含以下核心概念: Destination Binders:目標綁定器,目標指的是 Kafka 或者 RabbitMQ,綁定器就是 ...
Stream 簡介
Spring Cloud Stream 是用於構建消息驅動的微服務應用程式的框架,提供了多種中間件的合理配置
Spring Cloud Stream 包含以下核心概念:
- Destination Binders:目標綁定器,目標指的是 Kafka 或者 RabbitMQ,綁定器就是封裝了目標中間件的包,如果操作的是 Kafka,就使用 Kafka Binder,如果操作的是 RabbitMQ,就使用 RabbitMO Binder
- Bindings:外部消息傳遞系統和應用程式之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
- Message:一種規範化的數據結構,生產者和消費者基於這個數據結構通過外部消息系統與目標綁定器和其他應用程式通信
應用程式通過 inputs 或者 outpus 與 Spring Cloud Stream 的 Binder 交互,Binder 層負責和中間件的通信,通過配置來 binding。通過定義 Binder 作為中間層,實現了應用程式與消息中間件細節之間的隔離,應用程式不需要再考慮各種不同的消息中間件實現。當需要升級消息中間件或是更換其他消息中間件產品時,只需要更換對應的 Binder 綁定器
Stream 整合 kafka
以 Kafka 為例,確保全裝 Kafka 並啟動
分別創建生產者和消費者項目,分別添加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1. 創建生產者
開發 MqSource 介面
public interface MqSource {
@Output("test-topic")
MessageChannel testTopic();
@Output("test-topic-2")
MessageChannel testTopic2();
}
通過 @Output
、@Input
註解定義消息輸入和輸出通道的名稱定義,輸出通道需要返回 MessageChannel 介面對象,它定義了向消息通道發送消息的方法。預設情況下,通道的名稱就是註解的方法的名稱,也能自己定義通道名稱,只需要給 @Input
和 @Output
註解傳入 String 類型參數通道名稱即可,這裡指定兩個通道分別為 test-topic
和 test-topic-2
開發 MsgProducer 類
@Slf4j
@EnableBinding(MqSource.class)
public class MsgProducer {
@Autowired
private MqSource mqSource;
public void sendTestTopicMessage(String msg) {
try {
mqSource.testTopic().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
log.error("sendTestTopicMessage error", e);
}
}
public void sendTestTopic2Message(String msg) {
try {
mqSource.testTopic2().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
log.error("sendTestTopic2Message error", e);
}
}
}
使用 @EnableBinding
創建和綁定通道,綁定通道是指將通道和 Binder 進行綁定,比如 Kafka、RabbiMQ 等。如果類路徑下只有一種 Binder,那麼 Spring Cloud Stream 會找到並綁定它,不需要進行配置。如果有多個就需要明確配置
調用 MqSource 介面方法獲取輸出通道對象,接著調用 send 方法發送數據。send 方法接收一個 Message 對象,這個對象不能直接新建,需要使用 MessageBuilder 獲取
2. 創建消費者
public interface MqSink {
@Input("test-topic")
MessageChannel testTopic();
@Input("test-topic-2")
MessageChannel testTopic2();
}
與生產者的 MqSource
同理
開發 MsgReceiver 類,@StreamLisiener
接收的參數是要處理的通道名,所註解的方法就是處理從通道獲取數據的方法,方法的參數就是獲取到的數據
@Slf4j
@EnableBinding(MqSink.class)
public class MsgReceiver {
@StreamListener("test-topic")
public void testTopicMessageListen(String msg) {
log.info("testTopicMessageListen: {}", msg);
}
@StreamListener("test-topic-2")
public void testTopic2MessageListen(String msg) {
log.info("testTopic2MessageListen: {}", msg);
}
}