##### 延遲和死信隊列的配置 - 延遲隊列有效期一分鐘,後進入死信隊列,如果異常就進入異常隊列 ``` @Configuration @Data public class RabbitMQConfig { /** * 交換機 */ private String orderEventExchang ...
延遲和死信隊列的配置
- 延遲隊列有效期一分鐘,後進入死信隊列,如果異常就進入異常隊列
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交換機
*/
private String orderEventExchange="order.event.exchange";
/**
* 延遲隊列, 不能被監聽消費
*/
private String orderCloseDelayQueue="order.close.delay.queue";
/**
* 關單隊列, 延遲隊列的消息過期後轉發的隊列,被消費者監聽
*/
private String orderCloseQueue="order.close.queue";
/**
* 進入延遲隊列的路由key
*/
private String orderCloseDelayRoutingKey="order.close.delay.routing.key";
/**
* 進入死信隊列的路由key,消息過期進入死信隊列的key
*/
private String orderCloseRoutingKey="order.close.routing.key";
/**
* 過期時間 毫秒,臨時改為1分鐘定時關單
*/
private Integer ttl=1000*60;
/**
* 消息轉換器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 創建交換機 Topic類型,也可以用dirct路由
* 一般一個微服務一個交換機
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange(orderEventExchange,true,false);
}
/**
* 延遲隊列
*/
@Bean
public Queue orderCloseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",orderEventExchange);
args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
args.put("x-message-ttl",ttl);
return new Queue(orderCloseDelayQueue,true,false,false,args);
}
/**
* 死信隊列,普通隊列,用於被監聽
*/
@Bean
public Queue orderCloseQueue(){
return new Queue(orderCloseQueue,true,false,false);
}
/**
* 第一個隊列,即延遲隊列的綁定關係建立
* @return
*/
@Bean
public Binding orderCloseDelayBinding(){
return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
}
/**
* 死信隊列綁定關係建立
* @return
*/
@Bean
public Binding orderCloseBinding(){
return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
}
}
異常隊列配置類
public class RabbitMQErrorConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 異常交換機
*/
private String orderErrorExchange = "order.error.exchange";
/**
* 異常隊列
*/
private String orderErrorQueue = "order.error.queue";
/**
* 異常routing.key
*/
private String orderErrorRoutingKey = "order.error.routing.key";
/**
* 異常交換機
* @return
*/
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(orderErrorExchange,true,false);
}
/**
* 異常隊列
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue(orderErrorQueue,true);
}
/**
* 隊列交換機進行綁定
* @param errorQueue
* @return
*/
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey);
}
/**
* 配置 RepublishMessageRecoverer
* 用途:消息重試一定次數後,用特定的routingKey轉發到指定的交換機中,方便後續排查和告警
*
* 頂層是 MessageRecoverer介面,多個實現類
*
* @return
*/
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey);
}
}