RabbitMQ是一種重要的消息隊列中間件,在生產環境中,穩定是第一考慮。RabbitMQ廠家也深知開發者的聲音,穩定、可靠是第一考慮,為了消息傳輸的可靠性傳輸,RabbitMQ提供了多種途徑的消息持久化保證:Exchange持久化、Queue持久化及Message的持久化等。以保證RabbitMQ ...
RabbitMQ是一種重要的消息隊列中間件,在生產環境中,穩定是第一考慮。RabbitMQ廠家也深知開發者的聲音,穩定、可靠是第一考慮,為了消息傳輸的可靠性傳輸,RabbitMQ提供了多種途徑的消息持久化保證:Exchange持久化、Queue持久化及Message的持久化等。以保證RabbitMQ在重啟或Crash等異常情況下,消息不會丟失。RabbitMQ提供了簡單的參數配置來實現持久化操作。
簡單說明一下各種持久化方式:(描述代碼採用的是RabbitMQ.Client SDK, C#代碼)
Queue持久化:隊列是我們使用RabbitMQ進行數據傳輸的最多使用的方式,是進行點對點消息傳遞使用最多的方式。隊列的持久化是通過durable=true 來實現。
var connFactory = new ConnectionFactory(); Conn = connFactory.CreateConnection(); Model = Conn.CreateModel(); Model.QueueDeclare(q, false, false, false, null);
其中,QueueDeclare的定義:
/// <summary>(Spec method) Declare a queue.</summary> [AmqpMethodDoNotImplement(null)] QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
參數說明:queue:隊列名稱。durable:設置是否執行持久化。如果設置為true,即durable=true,持久化實現的重要參數。
exclusive:指示隊列是否是排他性。如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,併在連接斷開時自動刪除。需要註意:1. 排他隊列是基於連接可見的,同一連接的不同通道Channel是可以同時訪問同一連接創建的排他隊列;2.“首次”,如果一個連接已經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
autoDelete:是否自動刪除。如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於發佈訂閱方式創建的臨時隊列。
消息的持久化:如果將一個隊列設置為持久化,那麼會創建一個持久化的隊列,但並不意味著隊列中的消息也會持久化存儲。因此如果要保證消息在RabbitMQ出現異常時不會丟失,需要設定消息的持久化。
簡要說明一下消息持久化和隊列持久化的聯繫:
隊列設置為持久化,那麼在RabbitMQ重啟之後,持久化的隊列也會存在,並會保持和重啟前一致的隊列參數。
消息設置為持久化,在RabbitMQ重啟之後,持久化的消息也會存在。
那麼就會出現一些矛盾的地方:
1、因為消息必須依附於隊列存在才有意義,那麼如果隊列設置為非持久化,而消息設置為持久化。在RabbitMQ重啟之後,持久化的消息是否還存在呢?因為非持久化的隊列可能並不存在。
2、如果設置消息持久化為true,但隊列設置成排他性隊列,那麼在RabbitMQ重啟之後,消息是否仍然存在。請自行查找分析,下次分析該問題。
1 var sf = new ConnectionFactory(); 2 using (IConnection conn = cf.CreateConnection()) 3 { 4 IModel ch = conn.CreateModel();
Model = Conn.CreateModel(); Model.QueueDeclare(queueName, true, false, false, null);
string message = "Hello C# SSL Client World"; 11 byte[] msgBytes = System.Text.Encoding.UTF8.GetBytes(message);
//發送消息 12 ch.BasicPublish("", queueName, null, msgBytes); 13 14 bool noAck = false; 15 BasicGetResult result = ch.BasicGet(qName, noAck); 16 byte[] body = result.Body; 17 string resultMessage = System.Text.Encoding.UTF8.GetString(body); 18 19 Assert.AreEqual(message, resultMessage); 20 }
通過RabbitMQ SDK發送消息至MQ非常簡單,通過BasicPublish即可。
BasicPublish 的定義:
1 /// <summary> 2 /// (Spec method) Convenience overload of BasicPublish. 3 /// </summary> 4 /// <remarks> 5 /// The publication occurs with mandatory=false 6 /// </remarks> 7 [AmqpMethodDoNotImplement(null)] 8 void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);
設置消息持久化,需要設置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).
設置了隊列和消息持久化後,當服務重啟之後,消息仍然存在。只設置隊列持久化,不設置消息持久化,重啟之後消息會丟失;只設置消息持久化,不設置隊列持久化,在服務重啟後,隊列會消失,從而依附於隊列的消息也會丟失。只設置消息持久化而不設置隊列的持久化,毫無意義。
Exchange持久化:
為了實現一對多的消息發送,我們一般會採用發佈訂閱模式,通過一個發送端、多個訂閱端來實現消息的分發。
發佈訂閱模式存在一些問題:
1、如果消費者由於網路或其他原因,與RabbitMQ的連接斷開,那麼RabbitMQ會自動將與其對應的隊列刪除,當消息程式重新連接以後,無法獲取斷開前未來得及消費的消息。
2、如果RabbitMQ出現故障或Crash,那麼在RabbitMQ 服務重啟之後,消費端未及時消費的消息也會丟失,並且如果Exchange 不設置成持久化,那麼在MQ服務重啟之後,Exchange也不會存在。
1 /// <summary>(Spec method) Declare an exchange.</summary> 2 /// <remarks> 3 /// The exchange is declared non-passive and non-internal. 4 /// The "nowait" option is not exercised. 5 /// </remarks> 6 [AmqpMethodDoNotImplement(null)] 7 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, 8 IDictionary<string, object> arguments);
參數說明:exchange:RabbitMQ中定義的Exchange名稱,type:類型,包含fanout、topic、direct、headers,durable:持久化設置。設置成true,就可以設定exchange持久化存儲,autodelete:是否自動刪除。
exchange是實現發佈訂閱的基礎,其類型包含fanout、headers、direct、、topic。我們本次僅討論類型為topic。
發佈訂閱模式執行消息發送的流程:
總結:
RabbitMQ要實現發佈訂閱持久化,按照消息的傳輸流程,可以分成三類:
Exchange 持久化:如果不設定Exchange持久化,那麼在RabbitMQ由於某些異常等原因重啟之後,Exchange會丟失。Exchange丟失, 會影響發送端發送消息到RabbitMQ。
Queue持久化:發送端將消息發送至Exchange,Exchange將消息轉發至關聯的Queue。如果Queue不設置持久化,那麼在RabbitMQ重啟之後,Queue信息會丟失。導致消息發送至Exchange,但Exchange不知道需要將該消息發送至哪些具體的隊列。
Message持久化:發送端將消息發送至Exchange,Exchange將消息轉發至關聯的Queue,消息存儲於具體的Queue中。如果RabbitMQ重啟之後,由於Message未設置持久化,那麼消息會在重啟之後丟失。
為了保證發佈訂閱的持久化,必須設置Exchange、Queue、Message的持久化,才可以保證消息最終不會丟失。
雖然持久化會造成性能損耗,但為了生產環境的數據一致性,這是我們必須做出的選擇。但我們可以通過設置消息過期時間、降低發送消息大小等其他方式來儘可能的降低MQ性能的降低。
擴展閱讀:
1、Exchange type:topic、fanout、direct、headers的不同。
2、消息的確認機制。
3、將Exchange、Queue、Message都設置持久化,能保證消息100%會被成功消費嗎?
答案肯定是否,天下沒有絕對的事情,尤其是複雜的MQ。
原因簡單介紹,一、如果消息的自動確認為true,那麼在消息被接收以後,RabbitMQ就會刪除該消息,假如消費端此時宕機,那麼消息就會丟失。因此需要將消息設置為手動確認。
二、設置手動確認會出現另一個問題,如果消息已被成功處理,但在消息確認過程中出現問題,那麼在消費端重啟後,消息會重新被消費。
三、發送端為了保證消息會成功投遞,一般會設定重試。如果消息發送至RabbitMQ之後,在RabbitMQ回覆已成功接收消息過程中出現異常,那麼發送端會重新發送該消息,從而造成消息重覆發送。
四、RabbitMQ的消息磁碟寫入,如果出現問題,也會造成消息丟失。
五、。。。。。
下期熱點問題:
1、Exchange type的不同
2、消息的確認與拒絕機制
3、優先順序機制
RabbitMQ發佈訂閱持久化方式:Exchange、Queue、Message持久化,隊列設定手動確認、AutoDelete=false。可以最大程度的保證消息不丟失。
附RabbitMQ發佈訂閱持久化具體實現方式,參考代碼:
1 MQ SDK新增介面: 2 IMQSession新增方法: 3 /// <summary> 4 /// 創建消息消費者 5 /// </summary> 6 /// <param name="topicName">主題名稱</param> 7 /// <param name="customTopicQueueName">自定義Topic關聯隊列名稱</param> 8 /// <param name="isPersistence">是否持久化</param> 9 /// <returns>消息消費者</returns> 10 IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false); 11 調用方式:消費端需要明確指定需要消費的發佈訂閱關聯隊列。例如配置中心熱部署,每個配置中心實例都需要指定唯一的關聯隊列名。 12 這樣就可以和正常的MAC隊列消費一樣,消費指定隊列消息。 13 14 實現方式,四個步驟: 15 1.創建持久化Topic(即持久化Exchange): 16 var service = MQServiceProvider.GetDefaultMQService(); 17 var messageText = "abc"; 18 ///創建Topic 19 using (var connection = service.CreateConnection()) 20 { 21 var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge); 22 var messageCreator = service.GetMessageCreator(); 23 var message = messageCreator.CreateMessage(messageText); 24 message.IsPersistent = true; 25 var producer = session.CreateProducer(); 26 var topic = session.DeclareTopic(topicName, true); 27 } 28 2.定義消費者Consumer: 29 List<string> queueList = new List<string>() { 30 "guozhiqi1", 31 "guozhiqi2", 32 "guozhiqi3", 33 "guozhiqi4", 34 "guozhiqi5", 35 "guozhiqi6", 36 "guozhiqi7", 37 "guozhiqi8", 38 "guozhiqi9", 39 }; 40 //var service = MQServiceProvider.GetDefaultMQService(); 41 //var messageText = "abc" + DateTime.Now.ToShortTimeString(); 42 //定義消費者 43 using (var connection1 = service.CreateConnection()) 44 { 45 var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge); 46 foreach (var item in queueList) 47 { 48 session1.DeclareQueue(item, true); 49 var consumer = session1.CreateTopicConsumer(topicName, item, true); 50 } 51 } 52 3.發送消息到Topic 53 //發送消息 54 for (int i = 0; i <= 100; i++) 55 { 56 using (var connection = service.CreateConnection()) 57 { 58 var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge); 59 var messageCreator = service.GetMessageCreator(); 60 var message = messageCreator.CreateMessage(messageText); 61 message.IsPersistent = true;//設置持久化 62 message.TimeToLive = TimeSpan.FromSeconds(30);//設置過期時間 63 var producer = session.CreateProducer(); 64 var topic = session.DeclareTopic(topicName, true); 65 producer.Send(message, topic); 66 } 67 } 68 4.從隊列接收消息 69 Parallel.ForEach(queueList, (item) => 70 { 71 while (true) 72 { 73 //接收消息 74 using (var connection1 = service.CreateConnection()) 75 { 76 var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge); 77 78 session1.DeclareQueue(item, true); 79 var consumer = session1.CreateTopicConsumer(topicName, item, true); 80 var topic = session1.DeclareTopic(topicName, true); 81 var receivedmessage = consumer.Receive(topic); 82 var textMessage = receivedmessage as ITextMessage; 83 84 Assert.AreEqual(messageText, textMessage.Body); 85 consumer.Acknowledge(receivedmessage); 86 } 87 } 88 89 });View Code