spring boot / cloud (九) 使用rabbitmq消息中間件 前言 rabbitmq介紹: RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。它可以用於大型軟體系統各個模塊之間的高效通信,支持高併發,支持可擴展。 amqp介紹: 即Advanced Message ...
spring boot / cloud (九) 使用rabbitmq消息中間件
前言
rabbitmq介紹:
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。它可以用於大型軟體系統各個模塊之間的高效通信,支持高併發,支持可擴展。
amqp介紹:
即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。
思路
基於spring boot的特性連接rabbitmq,並作出如下樣例:
配置
發佈方樣例
消費方樣例
實現
1.配置
引入maven依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
編寫config配置類(預設情況下是不用做任何配置的,這裡有配置是因為,它預設是用的二進位做的消息傳輸,這裡的配置是改為json傳輸)
@Configuration
public class RabbitMqConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryPlus(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
rabbitListenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter);
return rabbitListenerContainerFactory;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper xssObjectMapper) {
return new Jackson2JsonMessageConverter(xssObjectMapper);
}
}
編寫配置文件
spring.rabbitmq.host=192.168.134.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=dev_udf-sample
spring.rabbitmq.password=1qazxsw2
spring.rabbitmq.virtual-host=/dev_udf-sample
spring.rabbitmq.template.retry.enabled=true #發送方是否重試
spring.rabbitmq.listener.retry.enabled=true #消費方是否重試
定義公共的消息類
public class RabbitmqMessage<T> implements Serializable {
private static final long serialVersionUID = 1L;
//消息ID
private String id;
....其他自定義
}
2.發佈方樣例
創建Exchange,這裡使用的是DirectExchange,exchange主要是定義路由規則的,還有其他不同的路由規則實現,如:TopicExchange,他們都繼承至AbstractExchange
@Bean
public DirectExchange testExchange() {
return new DirectExchange("test_exchange");
}
使用AmqpTemplate發送非同步消息(RoutingKey則是指定消息的路由鍵,不同的路由鍵可被不同的消費方消費)
@Autowired
private AmqpTemplate amqpTemplate;
//然後調用發送方法發送消息
this.amqpTemplate.convertAndSend("test_exchange", "testRoutingKey", new RabbitmqMessage<String>("test"));
3.消費方樣例
創建消費隊列,死信隊列,以及與exchange的綁定關係
//消費隊列
@Bean
public Queue testConsume() {
//死信exchange與上面的定義方式一樣
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","test_exchange_dlx");
args.put("x-dead-letter-routing-key","testRoutingKey_dlx");
return new Queue("test_consume", true, false, false, args);
}
//死信消費隊列
@Bean
public Queue testConsumeDlx() {
return new Queue("test_consume_dlx");
}
//消費隊列綁定
@Bean
public Binding testConsumeBinding() {
return new Binding("test_consume", DestinationType.QUEUE,
"test_exchange","testRoutingKey", null);
}
//死信消費隊列綁定
@Bean
public Binding testConsumeDlxBinding() {
return new Binding("test_consume_dlx", DestinationType.QUEUE,
"test_exchange_dlx","testRoutingKey_dlx", null);
}
消費消息
@RabbitListener(queues = "test_consume")
public void process(Message<String> message) {
log.info(message);
}
代碼倉庫 (博客配套代碼)
結束
以上演示了rabbitmq在spring boot中的配置,以及發送方和消費方的樣例,在後續的章節中,會找機會介紹rabbitmq的搭建以及配置.
想獲得最快更新,請關註公眾號