RabbitMQ 可靠投遞

来源:https://www.cnblogs.com/wangiqngpei557/archive/2018/07/28/9381478.html
-Advertisement-
Play Games

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。 rabbitmq 整個消息投遞的路徑為: producer->rabbitmq broker cluster->exchange->que... ...


RabbitMQ 可靠投遞

標簽: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投遞


  • 背景
  • confirmCallback 確認模式
  • returnCallback 未投遞到 queue 退回模式
  • shovel-plugin 跨機房可靠投遞

背景

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。

rabbitmq 整個消息投遞的路徑為:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 則會返回一個 confirmCallback
messageexchange->queue 投遞失敗則會返回一個 returnCallback 。我們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。

confirmCallback 確認模式

在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息發送失敗!" + cause + data.toString());
        } else {
            log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我們來看下 ConfirmCallback 介面。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重點是 CorrelationData 對象,每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。

發送的時候創建一個 CorrelationData 對象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

這裡將 user ID 設置為當前消息 CorrelationData id 。當然這裡是純粹 demo,真實場景是需要做業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對賬。

消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用 confirmCallback

broker 接收到只能表示 message 已經到達伺服器,並不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback

returnCallback 未投遞到queue退回模式

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到 return 退回模式。

同樣創建 ConnectionFactory 到時候需要設置 PublisherReturns(true) 選項。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啟return模式
rabbitTemplate.setMandatory(true);//開啟強制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

這樣如果未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。

shovel-plugin 跨機房可靠投遞

RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受機房之間的網路斷開、機器下線等不穩定因素。

這裡有兩個 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我們希望將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。我們先開啟 rabbit_node1shovel-plugin

先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,如果有的話直接開啟。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然後就可以在 Admin 面板里看到這個設置選項,怎麼設置這裡就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost

如果配置沒有問題的話,應該是這樣的一個狀態,說明已經順利連接到 rabbit_node2 broker


我們來看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 創建了兩個 tcp 連接,埠 39544 連接是用來消費 plen.queue 里的消息,埠 55706 連接是用來推送消息給 rabbit_node2

我們來看下 rabbit_node1 tcp 連接狀態:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 連接狀態:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

為了驗證 shovel-plugin 穩定性,我們將 rabbit_node2 下線。

然後再發送消息,發現消息會現在 rabbit_node1 plen.queue 里待著,一旦 shovel-plugin 連接恢復將消費 rabbit_node1 plen.queue 消息,然後投遞給 rabbit_node2 plen.queue

作者:王清培 (滬江集團資深JAVA架構師)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在表單提交中,設置input不可編輯,但是可以向後臺傳輸數據,的設置方法: $('#input').attr("readonly",true); ...
  • 做簡單相冊,點擊小圖片,下麵的圖片進行放大 佈局為上下分別為兩個div 上面一個div內的圖片用a標簽包含 頁面效果為點擊上面div的圖片下麵的圖片換成對應的圖片 js思路為: 首先分別找到上面 <!DOCTYPE html> <html lang="en"> <head> <meta charse ...
  • heigh:100%失效 解決: 1. html, body {height: 100%;} 2. div { height: 100%; position: absolute;} 非定位元素的寬高百分比計算不會將 padding 計算在內,而定位元素會計算在內。 利用這個特性可以實現圖片左右半區點 ...
  • float 特性 包裹性; 塊狀化並格式化上下文; 破壞文檔流,使父元素高度塌陷,在父元素中浮動元素高度不計在內; 沒有任何 margin 合併 使浮動元素後的非浮動元素的文本環繞著浮動元素 浮動之間的排列 元素設為浮動後會變為包裹性,若沒有明確寬度,寬度則是元素里的內容寬度。浮動元素會生成一個塊級 ...
  • GitHub地址: https://github.com/jeromeetienne/jquery-qrcode ...
  • java的(PO,VO,TO,BO,DAO,POJO)解釋 O/R Mapping 是 Object Relational Mapping(對象關係映射)的縮寫。通俗點講,就是將對象與關係資料庫綁定,用對象來表示關係數據。在O/R Mapping的世界里,有兩個基本的也是重要的東東需要瞭解,即VO, ...
  • 載入靜態文件 在一個網頁中,不僅僅只有一個 html 骨架,還需要 css 樣式文件, js 執行文件以及一些圖片 等。因此在 DTL 中載入靜態文件是一個必須要解決的問題。在 DTL 中,使用 static 標簽來載入 靜態文件。要使用 static 標簽,首先需要 {% load static ...
  • 人應該自信點,因為在某個方面,你無人可取代。做事,做人都要有底線,一件事的底線是什麼,做人的底線是什麼,做事的底線要符合做人的底線。這些事都要清楚。 工作要努力,對你最直接的回饋,就是努力工作所應得的報酬。做人要積極上進,(欲望驅使,興趣驅使,職業規劃,人生態度,生活態度驅使等等) 我今年的計劃,是 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...