Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來連接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。 應用程... ...
Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來連接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。
應用程式通過input通道或者output通道來與Spring Cloud Stream中binder(綁定器)交互,通過配置來binding. 而Spring Cloud Stream的binder負責與中間件交互。
開發工具:IntelliJ IDEA 2019.2.3
一、伺服器端
1、創建項目
IDEA中創建一個新的SpringBoot項目,名稱為“spring-server”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Spring Cloud Discovery -> Eureka Server。
pom.xml完整內容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-server</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
修改埠號為8761;取消將自己信息註冊到Eureka伺服器,不從Eureka伺服器抓取註冊信息。
server: port: 8761 eureka: client: register-with-eureka: false fetch-registry: false
3、修改啟動類代碼
增加註解@EnableEurekaServer
package com.example.springserver; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class SpringServerApplication { public static void main(String[] args) { SpringApplication.run(SpringServerApplication.class, args); } }View Code
二、消息生產者
1、創建項目
IDEA中創建一個新的SpringBoot項目,名稱為“spring-producer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit,會自動引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整內容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
pom.xml使用RabbitMQ,預設情況下,連接本地的5672埠。下麵這段rabbitmq也可省略。
server: port: 8081 spring: application: name: spring-producer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、編寫發送服務
方法sendOrder使用@Output("myInput")註解表示創建myInput的消息通道。調用該方法後,會向myInput通道投遞消息。
如果不使用參數myInput,則使用方法名作為通道名稱。
package com.example.springproducer; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.SubscribableChannel; public interface SendService { @Output("myInput") SubscribableChannel sendOrder(); }
4、修改啟動類代碼
加入註解@EnableBinding以開啟Spring容器的綁定功能,以SendService.class為參數,Spring容器啟動時,會自動綁定SendService介面中定義的通道。
package com.example.springproducer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableEurekaClient @EnableBinding(SendService.class) public class SpringProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringProducerApplication.class, args); } }
5、添加一個控制器類
調用SendService的發送方法,往伺服器發送消息。
package com.example.springproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired SendService sendService; @RequestMapping(value="/send",method= RequestMethod.GET) public String sendRequest(){ //創建消息 Message msg = MessageBuilder.withPayload("hello world".getBytes()).build(); //發送消息 sendService.sendOrder().send(msg); return "SUCCESS"; } }
三、消息消費者
1、創建項目
IDEA中創建一個新的SpringBoot項目,名稱為“spring-consumer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
server: port: 8080 spring: application: name: spring-consumer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、縮寫接受消息的通道介面
package com.example.springconsumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface ReceiveService { @Input("myInput") SubscribableChannel myInput(); }
4、修改啟動類代碼
同樣綁定消息通道
package com.example.springconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @SpringBootApplication @EnableBinding(ReceiveService.class) public class SpringConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringConsumerApplication.class, args); } //訂閱myInput通道的消息 @StreamListener("myInput") public void receive(byte[] msg){ System.out.println("接收到的消息:" + new String(msg)); } }
5、測試
(1)檢查服務裡面的RabbitMQ是否有啟動(預設啟動);
(2)啟動spring-server(8761埠);
(3)啟動spring-producer(8081埠);
(4)啟動spring-consumer(8080埠);
(5)瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制台輸出:
接收到的消息:hello world
說明消費者已經可以從消息代理中獲取到消息。
四、更換綁定器
上面使用了RabbitMQ作為消息代理,如果使用Kafka,可以更換Maven依賴實現。
在生產者和消費者的pom.xml中,將spring-cloud-starter-stream-rabbit修改為spring-cloud-starter-stream-kafka。