RabbitMQ 發佈訂閱持久化及持久化方式

来源:https://www.cnblogs.com/jiagoushi/archive/2018/03/30/8678871.html
-Advertisement-
Play Games

RabbitMQ是一種重要的消息隊列中間件,在生產環境中,穩定是第一考慮。RabbitMQ廠家也深知開發者的聲音,穩定、可靠是第一考慮,為了消息傳輸的可靠性傳輸,RabbitMQ提供了多種途徑的消息持久化保證:Exchange持久化、Queue持久化及Message的持久化等。以保證RabbitMQ ...


RabbitMQ是一種重要的消息隊列中間件,在生產環境中,穩定是第一考慮。RabbitMQ廠家也深知開發者的聲音,穩定、可靠是第一考慮,為了消息傳輸的可靠性傳輸,RabbitMQ提供了多種途徑的消息持久化保證:Exchange持久化、Queue持久化及Message的持久化等。以保證RabbitMQ在重啟或Crash等異常情況下,消息不會丟失。RabbitMQ提供了簡單的參數配置來實現持久化操作。

簡單說明一下各種持久化方式:(描述代碼採用的是RabbitMQ.Client  SDK,  C#代碼)

Queue持久化:隊列是我們使用RabbitMQ進行數據傳輸的最多使用的方式,是進行點對點消息傳遞使用最多的方式。隊列的持久化是通過durable=true 來實現。

var connFactory = new ConnectionFactory();
Conn = connFactory.CreateConnection();
Model = Conn.CreateModel();
Model.QueueDeclare(q, false, false, false, null);  

 

其中,QueueDeclare的定義:

/// <summary>(Spec method) Declare a queue.</summary>
        [AmqpMethodDoNotImplement(null)]
        QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
            bool autoDelete, IDictionary<string, object> arguments);  

 

參數說明:queue:隊列名稱。durable:設置是否執行持久化。如果設置為true,即durable=true,持久化實現的重要參數

exclusive:指示隊列是否是排他性。如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,併在連接斷開時自動刪除。需要註意:1. 排他隊列是基於連接可見的,同一連接的不同通道Channel是可以同時訪問同一連接創建的排他隊列;2.“首次”,如果一個連接已經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。

autoDelete:是否自動刪除。如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於發佈訂閱方式創建的臨時隊列。

 

消息的持久化:如果將一個隊列設置為持久化,那麼會創建一個持久化的隊列,但並不意味著隊列中的消息也會持久化存儲。因此如果要保證消息在RabbitMQ出現異常時不會丟失,需要設定消息的持久化。

簡要說明一下消息持久化和隊列持久化的聯繫:

隊列設置為持久化,那麼在RabbitMQ重啟之後,持久化的隊列也會存在,並會保持和重啟前一致的隊列參數。

消息設置為持久化,在RabbitMQ重啟之後,持久化的消息也會存在。

那麼就會出現一些矛盾的地方:

1、因為消息必須依附於隊列存在才有意義,那麼如果隊列設置為非持久化,而消息設置為持久化。在RabbitMQ重啟之後,持久化的消息是否還存在呢?因為非持久化的隊列可能並不存在。

2、如果設置消息持久化為true,但隊列設置成排他性隊列,那麼在RabbitMQ重啟之後,消息是否仍然存在。請自行查找分析,下次分析該問題。

 1              var sf = new ConnectionFactory();
 2             using (IConnection conn = cf.CreateConnection())
 3             {
 4                 IModel ch = conn.CreateModel();
                   Model = Conn.CreateModel();
                   Model.QueueDeclare(queueName, true, false, false, null); 
string message = "Hello C# SSL Client World"; 11 byte[] msgBytes = System.Text.Encoding.UTF8.GetBytes(message);
//發送消息
12 ch.BasicPublish("", queueName, null, msgBytes); 13 14 bool noAck = false; 15 BasicGetResult result = ch.BasicGet(qName, noAck); 16 byte[] body = result.Body; 17 string resultMessage = System.Text.Encoding.UTF8.GetString(body); 18 19 Assert.AreEqual(message, resultMessage); 20 }

通過RabbitMQ SDK發送消息至MQ非常簡單,通過BasicPublish即可。

 BasicPublish 的定義:

1   /// <summary>
2         /// (Spec method) Convenience overload of BasicPublish.
3         /// </summary>
4         /// <remarks>
5         /// The publication occurs with mandatory=false
6         /// </remarks>
7         [AmqpMethodDoNotImplement(null)]
8         void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);

設置消息持久化,需要設置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).

設置了隊列和消息持久化後,當服務重啟之後,消息仍然存在。只設置隊列持久化,不設置消息持久化,重啟之後消息會丟失;只設置消息持久化,不設置隊列持久化,在服務重啟後,隊列會消失,從而依附於隊列的消息也會丟失。只設置消息持久化而不設置隊列的持久化,毫無意義。

 

Exchange持久化:

為了實現一對多的消息發送,我們一般會採用發佈訂閱模式,通過一個發送端、多個訂閱端來實現消息的分發。

發佈訂閱模式存在一些問題:

1、如果消費者由於網路或其他原因,與RabbitMQ的連接斷開,那麼RabbitMQ會自動將與其對應的隊列刪除,當消息程式重新連接以後,無法獲取斷開前未來得及消費的消息。

2、如果RabbitMQ出現故障或Crash,那麼在RabbitMQ  服務重啟之後,消費端未及時消費的消息也會丟失,並且如果Exchange 不設置成持久化,那麼在MQ服務重啟之後,Exchange也不會存在。

1   /// <summary>(Spec method) Declare an exchange.</summary>
2         /// <remarks>
3         /// The exchange is declared non-passive and non-internal.
4         /// The "nowait" option is not exercised.
5         /// </remarks>
6         [AmqpMethodDoNotImplement(null)]
7         void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete,
8             IDictionary<string, object> arguments);

參數說明:exchange:RabbitMQ中定義的Exchange名稱,type:類型,包含fanout、topic、direct、headers,durable:持久化設置。設置成true,就可以設定exchange持久化存儲,autodelete:是否自動刪除。

exchange是實現發佈訂閱的基礎,其類型包含fanout、headers、direct、、topic。我們本次僅討論類型為topic。

發佈訂閱模式執行消息發送的流程:

 

總結:

RabbitMQ要實現發佈訂閱持久化,按照消息的傳輸流程,可以分成三類:

Exchange 持久化:如果不設定Exchange持久化,那麼在RabbitMQ由於某些異常等原因重啟之後,Exchange會丟失。Exchange丟失, 會影響發送端發送消息到RabbitMQ。

Queue持久化:發送端將消息發送至Exchange,Exchange將消息轉發至關聯的Queue。如果Queue不設置持久化,那麼在RabbitMQ重啟之後,Queue信息會丟失。導致消息發送至Exchange,但Exchange不知道需要將該消息發送至哪些具體的隊列。

Message持久化:發送端將消息發送至Exchange,Exchange將消息轉發至關聯的Queue,消息存儲於具體的Queue中。如果RabbitMQ重啟之後,由於Message未設置持久化,那麼消息會在重啟之後丟失。

為了保證發佈訂閱的持久化,必須設置Exchange、Queue、Message的持久化,才可以保證消息最終不會丟失。

雖然持久化會造成性能損耗,但為了生產環境的數據一致性,這是我們必須做出的選擇。但我們可以通過設置消息過期時間、降低發送消息大小等其他方式來儘可能的降低MQ性能的降低。

擴展閱讀:

1、Exchange type:topic、fanout、direct、headers的不同。

2、消息的確認機制。

3、將Exchange、Queue、Message都設置持久化,能保證消息100%會被成功消費嗎?

答案肯定是否,天下沒有絕對的事情,尤其是複雜的MQ。

原因簡單介紹,一、如果消息的自動確認為true,那麼在消息被接收以後,RabbitMQ就會刪除該消息,假如消費端此時宕機,那麼消息就會丟失。因此需要將消息設置為手動確認。

二、設置手動確認會出現另一個問題,如果消息已被成功處理,但在消息確認過程中出現問題,那麼在消費端重啟後,消息會重新被消費。

三、發送端為了保證消息會成功投遞,一般會設定重試。如果消息發送至RabbitMQ之後,在RabbitMQ回覆已成功接收消息過程中出現異常,那麼發送端會重新發送該消息,從而造成消息重覆發送。

四、RabbitMQ的消息磁碟寫入,如果出現問題,也會造成消息丟失。

五、。。。。。

 

下期熱點問題:

1、Exchange type的不同

2、消息的確認與拒絕機制

3、優先順序機制

 

RabbitMQ發佈訂閱持久化方式:Exchange、Queue、Message持久化,隊列設定手動確認、AutoDelete=false。可以最大程度的保證消息不丟失。

附RabbitMQ發佈訂閱持久化具體實現方式,參考代碼:

 

 1 MQ SDK新增介面:
 2 IMQSession新增方法:
 3 /// <summary>
 4         /// 創建消息消費者
 5         /// </summary>
 6         /// <param name="topicName">主題名稱</param> 
 7         /// <param name="customTopicQueueName">自定義Topic關聯隊列名稱</param>
 8         /// <param name="isPersistence">是否持久化</param>
 9         /// <returns>消息消費者</returns>
10         IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false);
11 調用方式:消費端需要明確指定需要消費的發佈訂閱關聯隊列。例如配置中心熱部署,每個配置中心實例都需要指定唯一的關聯隊列名。
12 這樣就可以和正常的MAC隊列消費一樣,消費指定隊列消息。
13 
14 實現方式,四個步驟:
15 1.創建持久化Topic(即持久化Exchange):
16   var service = MQServiceProvider.GetDefaultMQService();
17             var messageText = "abc";
18             ///創建Topic
19             using (var connection = service.CreateConnection())
20             {
21                 var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
22                 var messageCreator = service.GetMessageCreator();
23                 var message = messageCreator.CreateMessage(messageText);
24                 message.IsPersistent = true;
25                 var producer = session.CreateProducer();
26                 var topic = session.DeclareTopic(topicName, true);
27             }
28 2.定義消費者Consumer:
29 List<string> queueList = new List<string>() {
30                 "guozhiqi1",
31                 "guozhiqi2",
32                 "guozhiqi3",
33                 "guozhiqi4",
34                 "guozhiqi5",
35                 "guozhiqi6",
36                 "guozhiqi7",
37                 "guozhiqi8",
38                 "guozhiqi9",
39             };
40             //var service = MQServiceProvider.GetDefaultMQService();
41             //var messageText = "abc" + DateTime.Now.ToShortTimeString();
42             //定義消費者
43             using (var connection1 = service.CreateConnection())
44             {
45                 var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
46                 foreach (var item in queueList)
47                 {
48                     session1.DeclareQueue(item, true);
49                     var consumer = session1.CreateTopicConsumer(topicName, item, true);
50                 }
51             }
52 3.發送消息到Topic
53  //發送消息
54             for (int i = 0; i <= 100; i++)
55             {
56                 using (var connection = service.CreateConnection())
57                 {
58                     var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
59                     var messageCreator = service.GetMessageCreator();
60                     var message = messageCreator.CreateMessage(messageText);
61                     message.IsPersistent = true;//設置持久化
62                    message.TimeToLive = TimeSpan.FromSeconds(30);//設置過期時間
63                     var producer = session.CreateProducer();
64                     var topic = session.DeclareTopic(topicName, true);
65                     producer.Send(message, topic);
66                 }
67             }
68 4.從隊列接收消息
69 Parallel.ForEach(queueList, (item) =>
70             {
71                 while (true)
72                 {
73                     //接收消息
74                     using (var connection1 = service.CreateConnection())
75                     {
76                         var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
77 
78                         session1.DeclareQueue(item, true);
79                         var consumer = session1.CreateTopicConsumer(topicName, item, true);
80                         var topic = session1.DeclareTopic(topicName, true);
81                         var receivedmessage = consumer.Receive(topic);
82                         var textMessage = receivedmessage as ITextMessage;
83 
84                         Assert.AreEqual(messageText, textMessage.Body);
85                         consumer.Acknowledge(receivedmessage);
86                     }
87                 }
88 
89             });
View Code

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 四則運算題目自動生成——基於控制台(java) "個人作業——四則運算題目生成程式(基於控制台)" 項目已提交到碼雲: "UMLProject" 需求分析: 關於輸入、根據提示依次輸入: 數字範圍(樣例:10) 題目數量(樣例:10) 生成的題目: 如果存在形如e1 ÷ e2的子表達式,那麼其結果應 ...
  • 上面這種使用@Value註入每個配置在實際項目中會顯得格外麻煩,因為我們的配置通常會是許多個,就要使用@Value註入很多次。Spring Boot提供了基於類型安全的配置方式,通過@ConfigurationProperties將properties屬性和一個Bean關聯,從而實現類型安全的配置。 ...
  • mvc:V視圖 Controller: <?php header("Content-Type: text/html; charset=UTF-8"); require'productModel.php'; $porduct = new Porduct(); $a =isset($_GET['a']) ...
  • 迭代器 一、什麼是迭代器 二、為何要有迭代器,什麼是可迭代對象,什麼是迭代器對象 三、迭代器對象的使用 四、for迴圈 五、迭代器的優缺點 優點:1.提供一種統一的,不依賴於索引的迭代方式 2.懶性計算,每次只有一條數據,節省記憶體 缺點:1.無法獲取長度(只有在迭代完畢才能知道有多少值) 2.一次性 ...
  • 本文非原創~~ 指定一個點(源點)到其餘各個頂點的最短路徑,也叫做“單源最短路徑”。例如求下圖中的1號頂點到2、3、4、5、6號頂點的最短路徑。 與Floyd-Warshall演算法一樣這裡仍然使用二維數組e來存儲頂點之間邊的關係,初始值如下。 我們還需要用一個一維數組dis來存儲1號頂點到其餘各個頂 ...
  • import ide; ide.setConfig("editor_font_name","fixedsys"); ...
  • 《C#面向服務WebService從入門到精通》包含以下兩個部分: 一、《C#遠程調用技術WebService修煉手冊【基礎篇】》本次分享課您將學習到以下乾貨知識點:1)、WebService技術調用原理圖。2)、C# WebService常用的幾種調用方式。3)、C# WebService調試小技 ...
  • 記錄日誌時, 經常需要描述對象的狀態發生了怎樣的變化, 以前處理的非常簡單粗暴: a. 重寫class的ToString()方法, 將重要的屬性都輸出來 b. 記錄日誌時: 誰誰誰 由 變更前實例.ToString() 變成 變更後實例.ToString() 但輸出的日誌總是太長了, 翻看日誌時想找 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...