什麼是RabbitMQ? RabbitMQ是一款開源的,Erlang編寫的,基於AMQP協議的消息中間件 為什麼使用MQ?MQ的優點 非同步處理 - 相比於傳統的串列、並行方式,提高了系統的吞吐量。 應用解耦 - 系統間通過消息通信,不用關心其他系統的處理。 流量削鋒 - 可以通過消息隊列長度控制請求 ...
目錄
什麼是RabbitMQ?
RabbitMQ是一款開源的,Erlang編寫的,基於AMQP協議的消息中間件
為什麼使用MQ?MQ的優點
-
非同步處理 - 相比於傳統的串列、並行方式,提高了系統的吞吐量。
-
應用解耦 - 系統間通過消息通信,不用關心其他系統的處理。
-
流量削鋒 - 可以通過消息隊列長度控制請求量,可以緩解短時間內的高併發請求。
-
消息通訊 - 消息隊列一般都內置了高效的通信機制,因此也可以用在純消息通訊上。比如實現點對點消息隊列,或者聊天室等。
-
日誌處理 - 解決大量日誌傳輸。
消息中間件比對
ActiveMQ、RabbitMQ、RocketMQ、Kafka有什麼優缺點?
ActiveMQ | RabbitMQ | RocketMQ | Kafka | ZeroMQ | |
---|---|---|---|---|---|
單機吞吐量 | 比RabbitMQ低 | 2.6w/s(消息做持久化) | 11.6w/s | 17.3w/s | 29w/s |
開發語言 | Java | Erlang | Java | Scala/Java | C |
主要維護者 | Apache | Mozilla/Spring | Alibaba | Apache | iMatix,創始人已去世 |
成熟度 | 成熟 | 成熟 | 開源版本不夠成熟 | 比較成熟 | 只有C、PHP等版本成熟 |
訂閱形式 | 點對點(p2p)、廣播(發佈-訂閱) | 提供了4種:direct, topic ,Headers和fanout。fanout就是廣播模式 | 基於topic/messageTag以及按照消息類型、屬性進行正則匹配的發佈訂閱模式 | 基於topic以及按照topic進行正則匹配的發佈訂閱模式 | 點對點(p2p) |
持久化 | 支持少量堆積 | 支持少量堆積 | 支持大量堆積 | 支持大量堆積 | 不支持 |
順序消息 | 不支持 | 不支持 | 支持 | 支持 | 不支持 |
性能穩定性 | 好 | 好 | 一般 | 較差 | 很好 |
集群方式 | 支持簡單集群模式,比如'主-備',對高級集群模式支持不好。 | 支持簡單集群,'複製'模式,對高級集群模式支持不好。 | 常用 多對'Master-Slave' 模式,開源版本需手動切換Slave變成Master | 天然的‘Leader-Slave’無狀態集群,每台伺服器既是Master也是Slave | 不支持 |
管理界面 | 一般 | 較好 | 一般 | 無 | 無 |
RabbitMQ
-
可以支撐高併發、高吞吐量、性能很高,同時有非常完善便捷的後臺管理界面可以使用。
-
另外,他還支持集群化、高可用部署架構、消息高可靠支持,功能較為完善。
-
RabbitMQ的開源社區很活躍,較高頻率的版本迭代,來修複發現的bug以及進行各種優化,因此綜合考慮過後,公司採取了RabbitMQ。
-
RabbitMQ也有一點缺陷,就是他自身是基於erlang語言開發的,所以導致較為難以分析裡面的源碼,也較難進行深層次的源碼定製和改造,需要較為扎實的erlang語言功底。
RocketMQ
-
開源的,經過阿裡生產環境的超高併發、高吞吐的考驗,性能卓越,同時還支持分散式事務等特殊場景。
-
RocketMQ是基於Java語言開發的,適合深入閱讀源碼,有需要可以站在源碼層面解決線上問題,包括源碼的二次開發和改造。
Kafka
-
Kafka提供的消息中間件的功能明顯較少一些,相對上述幾款MQ中間件要少很多。
-
Kafka的優勢在於專為超高吞吐量的實時日誌採集、實時數據同步、實時數據計算等場景。
-
Kafka在大數據領域中配合實時計算技術(比如Spark Streaming、Storm、Flink)使用的較多。但是在傳統的MQ中間件使用場景中較少採用。
選型建議
RabbitMQ, erlang 語言阻止了大量的 Java 工程師去深入研究和掌控它,對公司而言,幾乎處於不可控的狀態,但是確實人家是開源的,比較穩定的支持,活躍度也高;
RocketMQ, 越來越多的公司會去用 RocketMQ,確實很不錯,畢竟是阿裡出品,但社區可能有突然黃掉的風險(目前 RocketMQ 已捐給 Apache,但 GitHub 上的活躍度其實不算高)對自己公司技術實力有絕對自信的,推薦用 RocketMQ
中小型公司,技術實力較為一般,技術挑戰不是特別高,用 RabbitMQ 是不錯的選擇;大型公司,基礎架構研發實力較強,用 RocketMQ 是很好的選擇
如果是大數據領域的實時計算、日誌採集等場景,用 Kafka 是業內標準的,絕對沒問題,社區活躍度很高,絕對不會黃,何況幾乎是全世界這個領域的事實性規範
MQ 有哪些常見問題?ranbbitMQ如何解決這些問題?
MQ 有哪些常見問題?
消息的順序問題
消息有序指的是可以按照消息的發送順序來消費。
假如生產者產生了 2 條消息:M1、M2,假定 M1 發送到 S1,M2 發送到 S2,要保證 M1 先於 M2 被消費順序。
消息的重覆問題
造成消息重覆的常見原因是:網路不可達,重試機製造成。
所以解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:如果消費端收到兩條一樣的消息,應該怎樣處理?
消息積壓
由於消費者速率遠低於生產者,或者是消費者宕機,消息中間件中有大量消息積壓到隊列中
rabbitMQ如何解決這些問題?
rabbitMQ解決消息的順序方案
RabbitMQ:拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 但是對應一個 consumer,然後這個 consumer 內部用記憶體隊列做排隊,然後分發給底層不同的 worker 來處理。
缺陷:
-
並行度就會成為消息系統的瓶頸(吞吐量不夠)
-
更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。通過合理的設計或者將問題分解來規避。
-
不關註順序的應用實際大量存在
-
隊列無序並不意味著消息無序,所以從業務層面來保證消息的順序而不僅僅是依賴於消息系統,是一種更合理的方式。
其他解決方案
-
方案一:消費端使用redis存儲消息記錄表,通過redis鎖,控制消費者按照順序消費。
-
方案三:採用RocketMQ順序消費機制;(不建議使用,會降低系統吞吐量)
rabbitMQ解決消息的重覆問題方案
消費端處理消息的業務邏輯需要保持冪等性。使用redis存儲消息id作為日誌表,只要保持冪等性,不管來多少條重覆消息,最後處理的結果都一樣。保證每條消息都有唯一編號和redis添加一張日誌表來記錄已經處理成功的消息的 ID,如果新到的消息 ID 已經在日誌表中,那麼就不再處理這條消息。
rabbitMQ解決消息積壓方案
出現消息積壓的問題,首先要排除掉消費者宕機的問題。其次,再根據監控面板,觀察消費者和生產者消費消息及生產消息的速率。
- 生產者速率增加:一般電商系統大促時,比較常見,往往的應對手段是擴容消費端的實例數或服務降級。
- 消費者速率減少:檢查一下日誌是否有大量的消費錯誤,或是消費線程卡死或是等待資源鎖死。
- 速率無變化:可能是消費失敗導致的一條消息反覆消費,從而拖慢整個系統的消費速度
RabbitMQ消息的可靠傳輸?消息丟失怎麼辦?
RabbitMQ提供了消息確認機制來確保消息的可靠傳輸
消息消息確認機制包括兩部分:生產者到消息中間件,即消息的發佈確認。消息中間件到消費者,即消息的消費確認。
生產者的消息確認機制
rabbit的生產者客戶端提供了消息發佈的回調介面,一旦生產者消息到交換機失敗,則會觸發回調。同理,交換機到消息隊列也有回調介面,一旦交換機向消息隊列投遞消息失敗,則會觸發回調。
/**
* 消息->交換機 回調函數
* ack:true 發送成功 false: 發送失敗
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
//可以增加補償機制
if (!ack) {
log.error("sendMsg:=======> Msg To Exchange Failed! Cause:{}", cause);
}
};
/**
* 交換機->消息隊列 回調函數
* 發送失敗:觸發returnCallback回調函數
*/
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) ->
//可以增加補償機制
log.error("sendMsg:=======> Exchange To Queue Failed! Message:{} Exchange:{} RoutingKey:{} Replay:{}", message.toString(), exchange, routingKey, replyText);
消費者的消息確認機制
rabbitMQ開啟手動確認消息,消費端需要收到消息之後,手動ack才可表示消息消費成功。否則可以投遞到死信隊列或者消息重試。
消息隊列的持久化
還有一種常見情況是,我們經常會遇到需要重啟中間件,或者是中間件宕機的問題,那麼就需要開啟消息持久化,否則會發生消息還沒來得及消費會丟失的問題。
-
將queue的持久化標識durable設置為true,則代表是一個持久的隊列
-
發送消息的時候將deliveryMode=2
RabbitMQ高可用
rabbitMQ有三種模式:單機模式、普通集群模式、鏡像集群模式
- 單機模式:就是部署一個rabbitMQ實例
- 集群模式:意思就是在多台機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那麼那個實例會從 queue 所在實例上拉取數據過來。這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個 queue 的讀寫操作。
- 鏡像集群模式:這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 里的消息都會存在於多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。然後每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。RabbitMQ 有很好的管理控制台,就是在後臺新增一個策略,這個策略是鏡像集群模式的策略,指定的時候是可以要求數據同步到所有節點的,也可以要求同步到指定數量的節點,再次創建 queue 的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。這樣的話,好處在於,你任何一個機器宕機了,沒事兒,其它機器(節點)還包含了這個 queue 的完整數據,別的 consumer 都可以到其它節點上去消費數據。壞處在於,第一,這個性能開銷也太大了吧,消息需要同步到所有機器上,導致網路帶寬壓力和消耗很重!RabbitMQ 一個 queue 的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個 queue 的完整數據。
鏡像集群節點圖
slave 會準確地按照 master 執行命令順序進行動作,故 slave 和 master 上維護的狀態應該也是相同的。如果master 由於某種原因宕機了,那麼"資源最老"的slave會被提升為新的master。
根據slave 加入的時間排序,時間最長的 slave 即為"資歷最老"。發送到鏡像隊列的所有的消息會被同時發往 master 和所有的slave,如果此時 master 掛掉了,消息還會在 slave 上,這樣 slave 提升為 master 的時候消息也不會丟失
鏡像集群工作模式圖
除發送消息(Basic.Publish)外的所有動作都只會向 master 發送,然後再由 master 將命令執行的結果廣播給各個 slave。如果消費者與 slave 建立連接併進行訂閱消息,其實質上都是從 master 上獲取消息,只不過看似是從 slave 上消費而已。比如:消費者與 slave 建立了 TCP 連接之後執行一個 Basic.GET 的操作,那麼首先是由 slave 將 Basic.GET 請求發往 master,再由 master 準備好數據返回給 slave,最後由 slave投遞給消費者。