在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。 rabbitmq 整個消息投遞的路徑為: producer->rabbitmq broker cluster->exchange->que... ...
RabbitMQ 可靠投遞
標簽: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投遞
- 背景
- confirmCallback 確認模式
- returnCallback 未投遞到 queue 退回模式
- shovel-plugin 跨機房可靠投遞
背景
在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。
rabbitmq 整個消息投遞的路徑為:
producer->rabbitmq broker cluster->exchange->queue->consumer
message 從 producer 到 rabbitmq broker cluster 則會返回一個 confirmCallback 。
message 從 exchange->queue 投遞失敗則會返回一個 returnCallback 。我們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。
confirmCallback 確認模式
在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟 confirmcallback 。
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
if (!ack) {
log.error("消息發送失敗!" + cause + data.toString());
} else {
log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null));
}
});
我們來看下 ConfirmCallback 介面。
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(CorrelationData correlationData, boolean ack, String cause);
}
重點是 CorrelationData 對象,每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。
發送的時候創建一個 CorrelationData 對象。
User user = new User();
user.setID(1010101L);
user.setUserName("plen");
rabbitTemplate.convertAndSend(exchange, routing, user,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
},
new CorrelationData(user.getID().toString()));
這裡將 user ID 設置為當前消息 CorrelationData id 。當然這裡是純粹 demo,真實場景是需要做業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對賬。
消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用 confirmCallback。
被 broker 接收到只能表示 message 已經到達伺服器,並不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback 。
returnCallback 未投遞到queue退回模式
confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到 return 退回模式。
同樣創建 ConnectionFactory 到時候需要設置 PublisherReturns(true) 選項。
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啟return模式
rabbitTemplate.setMandatory(true);//開啟強制委托模式
rabbitTemplate.setReturnCallback((message, replyCode, replyText,
exchange, routingKey) ->
log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));
這樣如果未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。
shovel-plugin 跨機房可靠投遞
RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受機房之間的網路斷開、機器下線等不穩定因素。
這裡有兩個 broker :
10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2
我們希望將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。我們先開啟 rabbit_node1 的 shovel-plugin。
先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,如果有的話直接開啟。
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
然後就可以在 Admin 面板里看到這個設置選項,怎麼設置這裡就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost 。
如果配置沒有問題的話,應該是這樣的一個狀態,說明已經順利連接到 rabbit_node2 broker 。
我們來看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。
rabbit_node1(10.211.55.3):
rabbit_node2(10.211.55.4):
RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 創建了兩個 tcp 連接,埠 39544 連接是用來消費 plen.queue 里的消息,埠 55706 連接是用來推送消息給 rabbit_node2 。
我們來看下 rabbit_node1 tcp 連接狀態:
tcp6 0 0 10.211.55.3:5672 10.211.55.3:39544 ESTABLISHED
tcp 0 0 10.211.55.3:55706 10.211.55.4:5672 ESTABLISHED
rabbit_node2 tcp 連接狀態:
tcp6 0 0 10.211.55.4:5672 10.211.55.3:55706 ESTABLISHED
為了驗證 shovel-plugin 穩定性,我們將 rabbit_node2 下線。
然後再發送消息,發現消息會現在 rabbit_node1 plen.queue 里待著,一旦 shovel-plugin 連接恢復將消費 rabbit_node1 plen.queue 消息,然後投遞給 rabbit_node2 plen.queue 。
作者:王清培 (滬江集團資深JAVA架構師)