創建項目 使用idea(版本2018.2),新建項目,使用spring initalizr,項目命名為名字為producer,選擇integration里的rabbitmq,確定。 配置rabbitmq服務 將預設生成的application.properties重命名為application.ym ...
創建項目
使用idea(版本2018.2),基於jdk1.8,springboot版本2.1,maven3.5.4
分別創建3個項目,group都為com.study.rabbitmq
項目1:framework,框架,存放共用的實體類
項目2:producer,生產者,存放消息發送服務
項目3:consumer,消費者,存放消息處理服務
1.framework項目,
創建package,命名為entity,在其下創建實體類Order,只為模擬,設置了兩個屬性,id與no
註意:需要實現Serializable介面,否則消息進行序列化時會報錯。
package com.study.rabbitmq.framework.entity; import lombok.Data; import java.io.Serializable; @Data public class Order implements Serializable { private String id; private String no; }
2.producer項目
a.pom中增加對framework的依賴
<dependency>
<groupId>com.study.rabbitmq</groupId>
<artifactId>framework</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
b.配置文件中增加rabbitmq連接信息
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
server:
port: 8080
c.創建package,命名為service,其下創建消息發送服務,命名為MessageSendService
package com.study.rabbitmq.producer.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import com.study.rabbitmq.framework.entity.Order; import org.springframework.stereotype.Service; import java.util.UUID; @Service @Slf4j public class MessageSendService { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrder(Order order) { String messageId=UUID.randomUUID().toString(); log.info(messageId); CorrelationData correlationData=new CorrelationData(); correlationData.setId(messageId); rabbitTemplate.convertAndSend("order-exchange","123",order,correlationData); } }
3.consumer項目
a.pom中增加對framework的依賴
<dependency>
<groupId>com.study.rabbitmq</groupId>
<artifactId>framework</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
b.配置文件中增加rabbitmq連接信息
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
server:
port: 8081
c.創建package,命名為service,其下創建消息處理服務,命名為MessageHandleService
package com.study.rabbitmq.consumer.service; import com.study.rabbitmq.framework.entity.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service @Slf4j public class MessageHanlerService { @RabbitListener(bindings = @QueueBinding( value = @Queue("order-queue"), exchange =@Exchange("order-exchange"), key ="123" )) @RabbitHandler public void handleOrder(@Payload Order order) { log.info("開始處理訂單信息"); log.info("訂單編號為:"+order.getNo()); } }
配置rabbitmq服務
使用rabbitmq自帶的管理頁面,http://localhost:15672/#/
1.創建exchange,命名為order-exchange
2.創建queue,命名為order-queue
3.綁定二者,路由key設置為123
我們的應用向rabbitmq發送消息需要以上信息。
此處涉及一些rabbitmq的基本概念,交換機、隊列,請自行瞭解rabbitmq框架體系。
發送消息測試
為producer項目下的MessageSendService創建Junit測試
package com.study.rabbitmq.producer.service; import com.study.rabbitmq.framework.entity.Order; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class MessageSendServiceTest { @Autowired private MessageSendService orderService; @Test public void sendOrder() { Order order=new Order(); order.setId("1"); order.setNo("單號1247"); orderService.sendOrder(order); } }
運行測試
無報錯,進入rabbitmq管理頁面,http://localhost:15672/#/,進入queues的tab頁,可看到,已經收到一條消息處於ready狀態
接收消息
啟動運行consumer項目,查看控制台輸出的消息,確實消息已被接收及處理。