消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題 實現高性能,高可用,可伸縮和最終一致性架構。 RabbitMQ 是採用 Erlang 語言實現 AMQP (Advanced Message Queuing Protocol,高級消息 隊列協議)的消息中間件,它最初 ...
消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題
實現高性能,高可用,可伸縮和最終一致性架構。
RabbitMQ 是採用 Erlang 語言實現 AMQP (Advanced Message Queuing Protocol,高級消息 隊列協議)的消息中間件,它最初起源於金融系統,用於在分散式系統中存儲轉發消息。
RabbitMQ 整體上是一個生產者與消費者模型,主要負責接收、存儲和轉發消息。可以把消息傳遞的過程想象成:當你將一個包裹送到郵局,郵局會暫存並最終將郵件通過郵遞員送到收件人的手上, RabbitMQ 就好比由郵局、郵箱和郵遞員組成的一個系統。從電腦術語層面來說,RabbitMQ 模型更像是一種交換機模型
Producer:生產者,就是投遞消息的一方。
生產者創建消息,然後發佈到 RabbitMQ 中。消息一般可以包含 2 個部分:消息體和標簽 CLabel)。消息體也可以稱之為 payload,在實際應用中,消息體一般是一個帶有業務邏輯結構的數據,比如一個 JSON 字元串。當然可以進一步對這個消息體進行序列化操作。消息的標簽用來表述這條消息 , 比如一個交換器的名稱和一個路由鍵。 生產者把消息交由 RabbitMQ, RabbitMQ 之後會根據標簽把消息發送給感興趣的消費者 CConsumer)。
Consumer: 消費者, 就是接收消息的一方。
消費者連接到 RabbitMQ 伺服器,並訂閱到隊列上。 當消費者消費一條消息時, 只是消費 消息的消息體 Cpayload)。 在消息路由的過程中 , 消息的標簽會丟棄, 存入到隊列中的消息只 有消息體,消費者也只會消費到消息體, 也就不知道消息的生產者是誰,當然消費者也不需要 知道。
Broker: 消息中間件的服務節點。
對於 RabbitMQ 來說, 一個 RabbitMQ Broker 可 以簡單地看作一個 RabbitMQ 服務節點 , 或者 RabbitMQ 服務實例 。 大多數情況下也可以將一個 RabbitMQ Broker 看作一臺 RabbitMQ 伺服器。
交換器(Exchange)
Rabbitmq中,生產者會將消息先發送到交換器,然後由交換器根據路由規則將消息轉發到隊列中,如果路由不到,或許會返回給生產者,或許直接丟棄。
交換器有四種類型:fanout,direct,topic,header
Binding: 綁定。 RabbitMQ 中通過綁定將交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵 (BindingKey),這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了。
routingkey:路由鍵。生產者將消息發送給交換器時,一般會指定一個routingkey,用來指定消息的路由規則, routingkey需要與交換器類型和綁定鍵 (BindingKey) 聯合使用才能最終生效。
隊列(queue)
隊列是Rabbitmq的內部對象,用於存儲消息。Rabbitmq的消息只能存儲在隊列中。消費者可以從隊列中獲取消息並消費。如多個消費者訂閱同一個隊列,這時隊列內的消息會被平均分攤(輪詢)給多個消費者處理。
交換器類型:
fanout :將所有消息轉發至交換器綁定的所有隊列中。
direct :它會把消息路由到那些 BindingKey 和 RoutingKey完全匹配的隊列中。
topic :它與 direct 類型的交換器相似,也是將消息路由到 BindingKey 和 RoutingKey 相匹配的隊 列中,但這裡的匹配規則有些不同,它約定:
- RoutingKey 為一個點號". "分隔的字元串(被點號" "分隔開的每一段獨立的字元 串稱為一個單詞 ),如“com.rabbitmq.client”;
- BindingKey 和 RoutingKey 一樣也是點號". "分隔的字元串;
- BindingKey 中可以存在兩種特殊字元串"*"和"#",用於做模糊匹配,其中"*"用於匹配一個單詞,"#"用於匹配多規格單詞(可以是零個)。
header :該類型的交換器性能很差,而且也不實用,基本上不會看到它的存在。
生產者代碼:
nuget:添加RabbitMQ.Client;
1 IConnectionFactory conFactory = new ConnectionFactory//創建連接工廠對象 2 { 3 HostName = "*.*.*.*",//IP地址 4 Port = 5672,//埠號 5 UserName = "yan",//用戶賬號 6 Password = "yan"//用戶密碼 7 };
1 using (IConnection con = conFactory.CreateConnection()) 2 { 3 using (IModel channel = con.CreateModel()) 4 { 5 channel.ExchangeDeclare("ExchangeName", "direct", true, false, null); //聲明交換器,dureable:是否持久化,autoDelete:是否自動刪除 6 channel.QueueDeclare("QueueName", true, false, false, null); //聲明隊列 dureable:是否持久化,exclusive:是否排他 autoDelete:是否自動刪除 7 channel.QueueBind("QueueName", "ExchangeName", "RoutingKey", null); 8 var properties = channel.CreateBasicProperties(); 9 properties.DeliveryMode = 2; //消息持久化 10 channel.BasicPublish("ExchangeName", "RoutingKey", properties, Encoding.UTF8.GetBytes("HelloWord")); //發佈消息 11 } 12 }
聲明交換器參數:
durable: 設置是否持久化。 durable 設置為 true 表示持久化, 反之是非持久化。持 久化可以將交換器存檔,在伺服器重啟 的時候不會丟失相關信息。
autoDelete: 設置是否自動刪除。 autoDelete 設置為 true 則表示自動刪除。自動 刪除的前提是至少有一個隊列或者交換器與這個交換器綁定, 之後所有與這個交換器綁
定的隊列或者交換器都與此解綁。註意不能錯誤地把這個參數理解為: "當與此交換器 連接的客戶端都斷開時, RabbitMQ 會自動刪除本交換器"。
聲明隊列參數:
durable: 設置是否持久化。為 true 則設置隊列為持久化。持久化的隊列會存檔,在 伺服器重啟的時候可以保證不丟失相關信息。
exclusive: 設置是否排他。為 true 則設置隊列為排他的。如果一個隊列被聲明為排 他隊列,該隊列僅對首次聲明它的連接可見,併在連接斷開時自動刪除。這裡需要註意 三點:排他隊列是基於連接(Connection) 可見的,同一個連接的不同通道 (Channel) 是可以同時訪問同一連接創建的排他隊列; "首次"是指如果一個連接己經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列 適用於一個客戶端同時發送和讀取消息的應用場景。
autoDelete: 設置是否自動刪除。為 true 則設置隊列為自動刪除。自動刪除的前提是: 至少有一個消費者連接到這個隊列,之後所有與這個隊列連接的消費者都斷開時,才會自動刪除。不能把這個參數錯誤地理解為: "當連接到此隊列的所有客戶端斷開時,這 個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。
消費者代碼:
1 IConnectionFactory conFactory = new ConnectionFactory//創建連接工廠對象 2 { 3 HostName = "*.*.*.*",//IP地址 4 Port = 5672,//埠號 5 UserName = "yan",//用戶賬號 6 Password = "yan"//用戶密碼 7 }; 8 using (IConnection con = conFactory.CreateConnection()) 9 { 10 using (IModel channel = con.CreateModel()) 11 { 12 channel.ExchangeDeclare("ExchangeName", "direct", true, false, null); //聲明交換器,dureable:是否持久化,autoDelete:是否自動刪除 13 channel.QueueDeclare("QueueName", true, false, false, null); //聲明隊列 dureable:是否持久化,exclusive:是否排他 autoDelete:是否自動刪除 14 channel.QueueBind("QueueName", "ExchangeName", "RoutingKey", null); 15 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //聲明事件基本消費者 16 consumer.Received += (ch,ea) => { 17 var message = Encoding.UTF8.GetString(ea.Body); 18 Console.WriteLine($"收到消息: {message}"); //消費消息 19 channel.BasicAck(ea.DeliveryTag, false); //確認該消息已被消費 20 }; 21 channel.BasicConsume("QueueName", false, consumer); //啟動消費者,並設置為手動應答消息 22 } 23 }
channel.BasicReject(ea.DeliveryTag,false); //拒絕消息 requeue true:消息重新存入隊列。false:立即會把消息從隊列中移除。
為了保證消息從隊列可靠地達到消費者, RabbitMQ 提供了消息確認機制( message acknowledgement)。 消費者在訂閱隊列時,可以指定 autoAck 參數,當 autoAck 等於 false 時, RabbitMQ 會等待消費者顯式地回覆確認信號後才從記憶體(或者磁碟)中移去消息(實質上是先打上刪除標記,之後再刪除)。當 autoAck 等於 true 時, RabbitMQ 會自動把發送出去的 消息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些消息。
採用消息確認機制後,只要設置 autoAck 參數為 false,消費者就有足夠的時間處理消息 (任務),不用擔心處理消息過程中消費者進程掛掉後消息丟失的問題, 因為 RabbitMQ 會一直 等待持有消息直到消費者顯式調用 Basic.Ack 命令為止。
當 autoAck 參數置為 false,對於 RabbitMQ 服務端而言,隊列中的消息分成了兩個部分: 一部分是等待投遞給消費者的消息:一部分是己經投遞給消費者,但是還沒有收到消費者確認信號的消息。如果 RabbitMQ 一直沒有收到消費者的確認信號,並且消費此消息的消費者己經 斷開連接,則 RabbitMQ 會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
RabbitMQ 不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否己經斷開,這麼設計的原因是 RabbitMQ 允許消費者 消費一條消息的時間可以很久很久。