RabbitMQ 可靠投遞

来源:https://www.cnblogs.com/wangiqngpei557/archive/2018/07/28/9381478.html
-Advertisement-
Play Games

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。 rabbitmq 整個消息投遞的路徑為: producer->rabbitmq broker cluster->exchange->que... ...


RabbitMQ 可靠投遞

標簽: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投遞


  • 背景
  • confirmCallback 確認模式
  • returnCallback 未投遞到 queue 退回模式
  • shovel-plugin 跨機房可靠投遞

背景

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。

rabbitmq 整個消息投遞的路徑為:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 則會返回一個 confirmCallback
messageexchange->queue 投遞失敗則會返回一個 returnCallback 。我們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。

confirmCallback 確認模式

在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息發送失敗!" + cause + data.toString());
        } else {
            log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我們來看下 ConfirmCallback 介面。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重點是 CorrelationData 對象,每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。

發送的時候創建一個 CorrelationData 對象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

這裡將 user ID 設置為當前消息 CorrelationData id 。當然這裡是純粹 demo,真實場景是需要做業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對賬。

消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用 confirmCallback

broker 接收到只能表示 message 已經到達伺服器,並不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback

returnCallback 未投遞到queue退回模式

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到 return 退回模式。

同樣創建 ConnectionFactory 到時候需要設置 PublisherReturns(true) 選項。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啟return模式
rabbitTemplate.setMandatory(true);//開啟強制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

這樣如果未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。

shovel-plugin 跨機房可靠投遞

RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受機房之間的網路斷開、機器下線等不穩定因素。

這裡有兩個 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我們希望將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。我們先開啟 rabbit_node1shovel-plugin

先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,如果有的話直接開啟。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然後就可以在 Admin 面板里看到這個設置選項,怎麼設置這裡就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost

如果配置沒有問題的話,應該是這樣的一個狀態,說明已經順利連接到 rabbit_node2 broker


我們來看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 創建了兩個 tcp 連接,埠 39544 連接是用來消費 plen.queue 里的消息,埠 55706 連接是用來推送消息給 rabbit_node2

我們來看下 rabbit_node1 tcp 連接狀態:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 連接狀態:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

為了驗證 shovel-plugin 穩定性,我們將 rabbit_node2 下線。

然後再發送消息,發現消息會現在 rabbit_node1 plen.queue 里待著,一旦 shovel-plugin 連接恢復將消費 rabbit_node1 plen.queue 消息,然後投遞給 rabbit_node2 plen.queue

作者:王清培 (滬江集團資深JAVA架構師)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在表單提交中,設置input不可編輯,但是可以向後臺傳輸數據,的設置方法: $('#input').attr("readonly",true); ...
  • 做簡單相冊,點擊小圖片,下麵的圖片進行放大 佈局為上下分別為兩個div 上面一個div內的圖片用a標簽包含 頁面效果為點擊上面div的圖片下麵的圖片換成對應的圖片 js思路為: 首先分別找到上面 <!DOCTYPE html> <html lang="en"> <head> <meta charse ...
  • heigh:100%失效 解決: 1. html, body {height: 100%;} 2. div { height: 100%; position: absolute;} 非定位元素的寬高百分比計算不會將 padding 計算在內,而定位元素會計算在內。 利用這個特性可以實現圖片左右半區點 ...
  • float 特性 包裹性; 塊狀化並格式化上下文; 破壞文檔流,使父元素高度塌陷,在父元素中浮動元素高度不計在內; 沒有任何 margin 合併 使浮動元素後的非浮動元素的文本環繞著浮動元素 浮動之間的排列 元素設為浮動後會變為包裹性,若沒有明確寬度,寬度則是元素里的內容寬度。浮動元素會生成一個塊級 ...
  • GitHub地址: https://github.com/jeromeetienne/jquery-qrcode ...
  • java的(PO,VO,TO,BO,DAO,POJO)解釋 O/R Mapping 是 Object Relational Mapping(對象關係映射)的縮寫。通俗點講,就是將對象與關係資料庫綁定,用對象來表示關係數據。在O/R Mapping的世界里,有兩個基本的也是重要的東東需要瞭解,即VO, ...
  • 載入靜態文件 在一個網頁中,不僅僅只有一個 html 骨架,還需要 css 樣式文件, js 執行文件以及一些圖片 等。因此在 DTL 中載入靜態文件是一個必須要解決的問題。在 DTL 中,使用 static 標簽來載入 靜態文件。要使用 static 標簽,首先需要 {% load static ...
  • 人應該自信點,因為在某個方面,你無人可取代。做事,做人都要有底線,一件事的底線是什麼,做人的底線是什麼,做事的底線要符合做人的底線。這些事都要清楚。 工作要努力,對你最直接的回饋,就是努力工作所應得的報酬。做人要積極上進,(欲望驅使,興趣驅使,職業規劃,人生態度,生活態度驅使等等) 我今年的計劃,是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...