# 【RabbitMQ】當隊列中消息數量超過最大長度的淘汰策略 ## 說明 最近在研究RabbitMQ如何實現延時隊列時發現消息進入死信隊列的情況之一就是當消息數量超過隊列設置的最大長度時會被丟入死信隊列,看到這時我就產生了一個疑問,到底是最後插入的消息還是最早插入的消息會被丟入死信隊列呢?遺憾的是 ...
【RabbitMQ】當隊列中消息數量超過最大長度的淘汰策略
說明
最近在研究RabbitMQ如何實現延時隊列時發現消息進入死信隊列的情況之一就是當消息數量超過隊列設置的最大長度時會被丟入死信隊列,看到這時我就產生了一個疑問,到底是最後插入的消息還是最早插入的消息會被丟入死信隊列呢?遺憾的是看了幾篇博客都是模棱兩可的答案,還有的說超過數量後該消息會被放入死信隊列,看完之後還是對這個問題將信將疑。所以我決定去探究一下正確答案
答案
遇事不決肯定是先看官方文檔最靠譜啦,在官網中扒拉了半天終於找到說明這個問題的頁面了,就是上面引用的鏈接,重點如下:
翻譯過來就是:在RabbitMQ中,當消息的數量或大小達到限制後,預設的操作是刪除最舊的消息或將最舊的消息放入死信隊列,這取決於該隊列是否配置了死信隊列。 我們可以通過使用overflow
配置指定的處理策略,如果overflow
被設置為reject-publish
或reject-publish-dlx
,那麼會將最新插入的消息丟棄。如果該隊列開啟了confirm機制,發佈者會收到nack的信息,如果一個消息被路由到多個隊列,只要其中一個隊列拒絕發佈者就會收到nack消息,但是沒被拒絕的隊列可以正確接收到消息。reject-publish
和reject-publish-dlx
的區別是後者還會將拒絕的消息放入死信隊列。
驗證
下麵我們使用demo來驗證一下各個策略的現象
預設策略(drop-head)
- application.yml配置如下:
rabbitmq:
host: 127.0.0.1
port: 5672
username: username
password: password
virtual-host: /test
publisher-confirm-type: correlated # 配置啟用confirm機制
- 使用RabbitMQConfig創建業務隊列和對應死信隊列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.business.exchange";
public static final String DELAY_QUEUE_NAME = "delay.business.queue";
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.business.queue.routingKey";
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routingKey";
// 聲明延遲隊列交換機
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 聲明死信隊列交換機
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 聲明延時隊列
@Bean("delayQueue")
public Queue delayQueue(){
HashMap<String, Object> map = new HashMap<>();
// x-dead-letter-exchange 這裡聲明當前隊列綁定的死信交換機
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// x-dead-letter-routing-key 這裡聲明當前隊列的死信路由key
map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
// 設置該隊列最大消息數
map.put("x-max-length", 10);
map.put("x-overflow", "reject-publish");
return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(map).build();
}
// 聲明死信隊列
@Bean("deadLetterQueue")
public Queue deadLetterQueue(){
return new Queue(DEAD_LETTER_QUEUE_NAME);
}
// 聲明延時隊列的綁定關係
@Bean
public Binding delayBinding(@Qualifier("delayExchange") DirectExchange directExchange,
@Qualifier("delayQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with(DELAY_QUEUE_ROUTING_KEY);
}
// 聲明死信隊列的綁定關係
@Bean
public Binding deadLetterBinding(@Qualifier("deadLetterExchange") DirectExchange directExchange,
@Qualifier("deadLetterQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);
}
}
註意,這裡我們並沒有設置overflow
參數,所以採用的是預設配置
3. 創建消費者監聽死信隊列
import com.rabbitmq.client.Channel;
import com.whs.edws.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class MaxLengthConsumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
public void receive(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
log.info("死信隊列消費者接收到消息:" + s);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
- 創建測試方法發送消息
@Test
void maxLengthTestPublish(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相關配置信息
* @param ack 消息隊列是否成功收到消息
* @param cause 錯誤原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息發送成功:" + correlationData.getId());
} else {
logger.error("消息發送失敗:" + correlationData.getId());
logger.error("錯誤原因:" + cause);
}
}
});
for (int i = 0; i < 11; i++) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(String.valueOf(i));
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUTING_KEY, String.valueOf(i), correlationData);
}
}
- 運行結果
看上面的代碼可知,我們設置了隊列大小為10,但是我們向隊列發送了11條消息,最後日誌列印如下:
2023-07-18 02:37:52.941 INFO 24308 --- [ntContainer#1-1] com.edws.rabbitmq.MaxLengthConsumer : 死信隊列消費者接收到消息:0
和官方文檔說的一樣,預設最舊的一條消息被放入死信隊列
reject-publish策略
reject-publish策略的驗證代碼只需在預設策略的基礎上加上配置即可,我們在定義隊列的時候加上配置
// 指定超過隊列長度後的策略
map.put("x-overflow", "reject-publish");
執行方法,列印的結果為:
2023-07-18 02:45:07.242 INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 消息發送失敗:10
2023-07-18 02:45:07.242 INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 錯誤原因:null
通過日誌可以看到,最新插入的消息被丟棄了。至於cause為什麼是null,我沒找到原因,如果瞭解的朋友可以在評論里討論一下
reject-publish-dlx策略
reject-publish-dlx
策略的代碼也是只需要在預設代碼中加一行配置即可
// 指定超過隊列長度後的策略
map.put("x-overflow", "reject-publish-dlx");
列印結果如下:
2023-07-18 02:49:13.246 INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 消息發送失敗:10
2023-07-18 02:49:13.246 INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 錯誤原因:null
2023-07-18 02:49:13.252 INFO 10488 --- [ntContainer#1-1] com.whs.edws.rabbitmq.MaxLengthConsumer : 死信隊列消費者接收到消息:10
通過日誌可以看出,最新的一條消息被拒絕且被放入死信隊列