ActiveMQ、RabbitMQ、RocketMQ、Kafka四種消息中間件分析介紹 我們從四種消息中間件的介紹到基本使用,以及高可用,消息重覆性,消息丟失,消息順序性能方面進行分析介紹! 一、消息中間件的使用場景 消息中間件的使用場景總結就是六個字:解耦、非同步、削峰 1.解耦 如果我方系統A要與 ...
ActiveMQ、RabbitMQ、RocketMQ、Kafka四種消息中間件分析介紹
我們從四種消息中間件的介紹到基本使用,以及高可用,消息重覆性,消息丟失,消息順序性能方面進行分析介紹!
一、消息中間件的使用場景
消息中間件的使用場景總結就是六個字:解耦、非同步、削峰
1.解耦
如果我方系統A要與三方B系統進行數據對接,推送系統人員信息,通常我們會使用介面開發來進行。但是如果運維期間B系統進行了調整,或者推送過程中B系統網路進行了調整,又或者後續過程中我們需要推送信息到三方C系統中,這樣的話就需要我們進行頻繁的介面開發調整,還需要考慮介面推送消息失敗的場景。
如果我們使用消息中間件進行消息推送,我們只需要按照一種約定的數據結構進行數據推送,其他三方系統從消息中間件取值消費就可以,即便是三方系統出現宕機或者其他調整,我們都可以正常進行數據推送。
總結:通過一個 MQ,Pub/Sub 發佈訂閱消息這麼一個模型,A 系統就跟其它系統徹底解耦了。
2.非同步
繼續我們上述的消息推送業務,如果我們現在需要同時推送消息到BCD三個系統中,而BCD系統接收到消息後需要進行複雜的邏輯處理,以及讀庫寫庫處理。如果一個三方系統進行消息處理需要1s,那我們遍歷推送完一次消息,就需要三秒。
而如果我們使用消息中間件,我們只需要推送到消息中間件,然後進行介面返回,可能只需要20ms,大大提升了用戶體驗。消息推送後BCD系統各自進行消息消費即可,不需要我們等待。
3.削峰
還是上述我們的應用場景,假設某一時間段內,每秒都有一條消息推送,如果我們使用介面進行推送,BCD三個系統處理完就需要三秒。這樣會導致用戶前端體驗較差,而且系統後臺一直處於阻塞狀態,後續的消息推送介面一直在等待。
如果我們使用消息中間件,我們只需要將消息推送至消息中間件中,BCD系統對積壓的消息進行相應的處理。
在上述高頻發的消息時間段內,會在消息中間中產生消息積壓,BCD系統在上述時間段外對積壓消息進行相應的處理即可。
二、消息中間件的優缺點
消息中間件的優點其實就是他的使用場景。
消息中間件的缺點與優點也是相輔相成的,主要有以下三個
1.系統可用性降低
系統關聯的中間件越多,越容易引發宕機問題。
如上述案例中的問題,原本進行消息推送我們只需要開發介面進行推送即可,引入消息中間件後就需要考慮消息中間件的高可用問題,如果消息中間件出現宕機問題,我們所有的消息推送都會失敗。
2.系統複雜度提高
上述案例中,如果我們使用介面進行消息推送,我們只需要考慮介面超時以及介面推送消息失敗的問題。但我們引入消息中間件後,就需要考慮消息中間件的維護,以及消息重覆消費,消息丟失的問題。
3.一致性問題
上述案例中,如果我們使用介面進行消息推送,推送消息我們可以放在事務中處理,如果推送過程中出現異常,我們可以進行數據回滾,但我們引入消息中間件後,就需要考慮消息推送後,消費失敗的問題,以及如果我們同時推送消息到BCD系統中,如何保證他們的事務一致性。
三、四種消息中間件的基本介紹
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單機吞吐量 | 萬級,比 RocketMQ、Kafka 低一個數量級 | 同 ActiveMQ | 10 萬級,支撐高吞吐 | 10 萬級,高吞吐,一般配合大數據類的系統來進行實時數據計算、日誌採集等場景 |
topic 數量對吞吐量的影響 | topic 可以達到幾百/幾千的級別,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic | topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 儘量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源 | ||
時效性 | ms 級 | 微秒級,這是 RabbitMQ 的一大特點,延遲最低 | ms 級 | 延遲在 ms 級以內 |
可用性 | 高,基於主從架構實現高可用 | 同 ActiveMQ | 非常高,分散式架構 | 非常高,分散式,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用 |
消息可靠性 | 有較低的概率丟失數據 | 基本不丟 | 經過參數優化配置,可以做到 0 丟失 | 同 RocketMQ |
功能支持 | MQ 領域的功能極其完備 | 基於 erlang 開發,併發能力很強,性能極好,延時很低 | MQ 功能較為完善,還是分散式的,擴展性好 | 功能較為簡單,主要支持簡單的 MQ 功能,在大數據領域的實時計算以及日誌採集被大規模使用 |
其他 | Apache軟體基金會開發、起步較早,但沒有經過大量吞吐場景驗證,目前社區不是很活躍 | 開源,穩定,社區活躍度高 | 阿裡出品,目前已交給Apache,但社區活躍度較低 | Apache軟體基金會開發、開源、高通吐量,社區活躍度高 |
1.ActiveMQ
1.1:Activemq 是什麼
Activemq 是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級消息通信。
1.2:Activemq 的作用及原理
Activemq 的作用就是系統之間進行通信,原理就是生產者生產消息, 把消息發送給activemq, Activemq 接收到消息, 然後查看有多少個消費者,
然後把消息轉發給消費者, 此過程中生產者無需參與。 消費者接收到消息後做相應的處理和生產者沒有任何關係。
1.3:Activemq 的通信方式
publish(發佈)-subscribe(訂閱)(發佈-訂閱方式)
發佈/訂閱方式用於多接收客戶端的方式,作為發佈訂閱的方式,可能存在多個接收客戶端,並且接收端客戶端與發送客戶端存在時間上的依賴。一個接收端只能接收他創建以後發送客戶端發送的信息。
p2p(point-to-point)(點對點)
p2p的過程則理解起來比較簡單。它好比是兩個人打電話,這兩個人是獨享這一條通信鏈路的。一方發送消息,另外一方接收,就這麼簡單。在實際應用中因為有多個用戶對使用p2p的鏈路,相互通信的雙方是通過一個類似於隊列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個發送者和多個接收者,而在p2p里一個queue只有一個發送者和一個接收者。
1.4:Activemq 的消息持久化機制
JDBC: 持久化到資料庫
AMQ :日誌文件(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中查看預設的broker持久化機制。
1.5:Activemq 的消息確認機制
(1)AUTO_ACKNOWLEDGE = 1 自動確認
(2)CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
(3)DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
(4)SESSION_TRANSACTED = 0 事務提交並確認
(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認
前四種是JMS API中提供的客戶端ACK_MODE。第五種是InforSuiteMQ自定義補充的一種ACK_MODE。
2.RabbitMQ
2.1:RabbitMQ是什麼
RabbitMQ是一個由erlang語言編寫的、開源的、在AMQP基礎上完整的、可復用的企業消息系統。
2.2:RabbitMQ的作用及原理
基本概念
關鍵名稱 | 說明 |
---|---|
Channel(通道) | 消息推送使用的通道 |
Producer(消息的生產者) | 向消息隊列發佈消息的客戶端應用程式 |
Consumer(消息的消費者) | 從消息隊列取得消息的客戶端應用程式 |
Message(消息) | 消息由消息頭和消息體組成 |
Routing Key(路由鍵) | 消息頭的一個屬性,用於標記消息的路由規則,決定了交換機的轉發路徑 |
Queue(消息隊列) | 用於存儲生產者的消息 |
Exchange(交換器路由器) | 提供Producer到Queue之間的匹配 |
Binding(綁定) | 用於建立Exchange和Queue之間的關聯 |
Binding Key(綁定鍵) | Exchange與Queue的綁定關係,用於匹配Routing Key |
Broker(服務主體) | RabbitMQ Server,伺服器實體 |
2.3:RabbitMQ的通信方式
2.3.1:簡單隊列
最簡單的工作隊列,其中一個消息生產者,一個消息消費者,一個隊列。也稱為點對點模式
2.3.2:工作隊列模式
一個消息生產者,一個交換器,一個消息隊列,多個消費者。同樣也稱為點對點模式
2.3.3:發佈訂閱模式
Pulish/Subscribe,無選擇接收消息,一個消息生產者,一個交換機(交換機類型為fanout),多個消息隊列,多個消費者
生產者只需把消息發送到交換機,綁定這個交換機的隊列都會獲得一份一樣的數據。
2.3.4:路由模式
在發佈/訂閱模式的基礎上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。
2.3.5:主體模式
topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。
2.3.6:RPC模式
與上面其他5種所不同之處,該模式是擁有請求/回覆的。也就是有響應的,上面5種都沒有。
RPC是指遠程過程調用,也就是說兩台伺服器A,B,一個應用部署在A伺服器上,想要調用B伺服器上應用提供的處理業務,處理完後然後在A伺服器繼續執行下去,把非同步的消息以同步的方式執行。
2.4:RabbitMQ的消息持久化機制
Queue(消息隊列)的持久化是通過durable=true來實現的。
Message(消息)的持久化 ,通過設置消息是持久化的標識。
Exchange(交換機)的持久化 。
2.5:RabbitMQ的消息確認機制
confirm機制:確認消息是否成功發送到Exchange
ack機制:確認消息是否被消費者成功消費
- AcknowledgeMode.NONE:自動確認
- AcknowledgeMode.AUTO:根據情況確認
- AcknowledgeMode.MANUAL:手動確認
3.RocketMQ
3.1:RocketMQ是什麼
RocketMQ是阿裡開發的一款純java、分散式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。
3.2:RocketMQ的作用及原理
基本概念
關鍵名稱 | 說明 |
---|---|
Producer | 消息生產者 |
Producer Group | 生產者組 |
Consumer | 消息消費者 |
Consumer Group | 消費者組 |
Topic | Topic用於將消息按主題做劃分,Producer將消息發往指定的Topic,Consumer訂閱該Topic就可以收到這條消息 |
Message | 代表一條消息 |
Tag | 標簽可以被認為是對 Topic 進一步細化 |
Broker | 負責接收並存儲消息 |
Queue | Topic和Queue是1對多的關係,一個Topic下可以包含多個Queue,主要用於負載均衡 |
Offset | RocketMQ在存儲消息時會為每個Topic下的每個Queue生成一個消息的索引文件,每個Queue都對應一個Offset記錄當前Queue中消息條數。 |
NameServer | NameServer可以看作是RocketMQ的註冊中心 |
3.3:RocketMQ的通信方式
RocketMQ消息訂閱有兩種模式
一種是Push模式(MQPushConsumer),即MQServer主動向消費端推送
另外一種是Pull模式(MQPullConsumer),即消費端在需要時,主動到MQ Server拉取
但在具體實現時,Push和Pull模式本質都是採用消費端主動拉取的方式,即consumer輪詢從broker拉取消息
集群模式和廣播模式
集群模式:預設情況下我們都是使用的集群模式,也就是說消費者組收到消息後,只有其中的一臺機器會接收到消息。
廣播模式:消費者組內的每台機器都會收到這條消息。
3.4:RocketMQ的消息持久化機制
exchange持久化、queue持久化、message持久化
CommitLog:日誌數據文件,存儲消息內容,所有 queue 共用,不區分 topic ,順序讀寫 ,1G 一個文件
ConsumeQueue:邏輯 Queue,基於 topic 的 CommitLog 的索引文件,消息先到達 commitLog,然後非同步轉發到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,消息實體內容大小和 Message Tag 的 hash 值,大於 600W 個位元組,寫滿之後重新生成,順序寫
IndexFile:基於 Key 或 時間區間的 CommitLog 的索引文件,文件名以創建的時間戳命名,固定的單個 indexFile 大小為 400M,可以保存 2000W 個索引
3.5:RocketMQ的消息確認機制
confirm機制:確認消息是否成功發送到Exchange
ack機制:確認消息是否被消費者成功消費
4.Kafka
4.1:Kafka是什麼
Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一個分散式、分區的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統,可作為消息中間件
4.2:Kafka的作用及原理
基本概念
關鍵名稱 | 說明 |
---|---|
producer | 生產者 |
consumer | 消費者 |
consumer group | 消費者組 |
broker | 一臺kafka伺服器就是一個broker,一個集群由多個broker組成,一個broker可以容納多個topic |
topic | 一個消息隊列,生產者和消費者都是面對一個Topic |
partition | 每個partition時一個有序隊列,partition是topic中存儲數據和消費數據所使用的隊列所在 |
replica | 副本,為了保證當前某個節點發生故障時,當前節點上的數據不會發生丟失 |
leader | 每個分區多個副本的“主”,生產者生產數據的對象,以及消費組消費者消費的對象 |
follower | 每個分區多個副本的“從”,實時從leader數據的同步 |
4.3:Kafka的通信方式
生產者發送模式
1.發後即忘(fire-and-forget):只管往Kafka中發送消息而並不關心消息是否正確到達
2.同步(sync):一般是在send()方法里指定一個Callback的回調函數,Kafka在返迴響應時調用該函數來實現非同步的發送確認。
3.非同步(async):send()方法會返回Futrue對象,通過調用Futrue對象的get()方法,等待直到結果返回
消費者消費模式
1.At-most-once(最多一次):在每一條消息commit成功之後,再進行消費處理;設置自動提交為false,接收到消息之後,首先commit,然後再進行消費。
2.At-least-once(最少一次):在每一條消息處理成功之後,再進行commit;設置自動提交為false;消息處理成功之後,手動進行commit。
3.Exactly-once(正好一次):將offset作為唯一id與消息同時處理,並且保證處理的原子性;設置自動提交為false;消息處理成功之後再提交。
4.4:Kafka的消息持久化機制
Kafka直接將數據寫入到日誌文件中,以追加的形式寫入
4.5:Kafka的消息確認機制
confirm機制:確認消息是否成功發送
ack機制:確認消息是否被消費者成功消費
四、消息隊列高可用
引言:系統應用MQ作為消息中間件後,會導致系統可用性降低。所以只要你用了 MQ,高可用肯定是要考慮到的
1.ActiveMQ高可用
ActiveMQ的部署方式有三種,分別為:單節點部署(不支持高可用),Master-Slave部署方式(主從模式),Broker-Cluster部署方式(負載均衡)
1.1.單節點部署(不支持高可用)
單節點部署方式因為不支持高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
1.2.Master-Slave部署方式(支持高可用)
1.2.1.shared filesystem Master-Slave部署方式
主要是通過共用存儲目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共用目錄的控制權,哪個應用搶到了控制權,它就成為master。
多個共用存儲目錄的應用,誰先啟動,誰就可以最早取得共用目錄的控制權成為master,其他的應用就只能作為slave。
1.2.2.shared database Master-Slave方式
與shared filesystem方式類似,只是共用的存儲介質由文件系統改成了資料庫而已。
1.2.3.Replicated LevelDB Store方式
這種主備方式是ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟並接受客戶端連接。
其他node轉入slave模式,連接master並同步他們的存儲狀態。slave不接受客戶端連接。所有的存儲操作都將被覆制到連接至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網路中並連接master進入slave mode。所有需要同步的disk的消息操作都將等待存儲狀態被覆制到其他法定節點的操作完成才能完成。
當一個新的master被選中,你需要至少保障一個法定node線上以能夠找到擁有最新狀態的node。這個node將會成為新的master。因此,推薦運行至少3個replica nodes,以防止一個node失敗了,服務中斷。
1.3.Broker-Cluster部署方式(不支持高可用)
前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分散式的問題。Broker-Cluster的部署方式就可以解決負載均衡的問題。
Broker-Cluster部署方式中,各個broker通過網路互相連接,並共用queue。當broker-A上面指定的queue-A中接收到一個message處於pending狀態,而此時沒有consumer連接broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的消息,那麼broker-B會先通過內部網路獲取到broker-A上面的message,並通知自己的consumer來消費。
1.3.1.static Broker-Cluster部署
在activemq.xml文件中靜態指定Broker需要建立橋連接的其他Broker
1.3.2.Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動後動態查找
1.4.Master-Slave與Broker-Cluster相結合的部署方式
可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負載均衡,
Broker-Cluster解決了負載均衡,但當其中一個Broker突然宕掉的話,那麼存在於該Broker上處於Pending狀態的message將會丟失,無法達到高可用的目的。
Master-Slave與Broker-Cluster相結合的部署方式是目前ActiveMQ比較推薦的部署方案。
2.RabbitMQ高可用
RabbitMQ的部署方式有三種,分別為:單機模式(不支持高可用),普通集群模式(不支持高可用),鏡像集群模式(支持高可用)
2.1單機模式(不支持高可用)
單節點部署方式因為不支持高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
2.2普通集群模式(不支持高可用)
普通集群模式,意思就是在多台機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那麼那個實例會從 queue 所在實例上拉取數據過來。
這種方式確實很麻煩,也不怎麼好,沒做到所謂的分散式,就是個普通集群。因為這導致你要麼消費者每次隨機連接一個實例然後拉取數據,要麼固定連接那個 queue 所在實例消費數據,前者有數據拉取的開銷,後者導致單實例性能瓶頸。
而且如果那個放 queue 的實例宕機了,會導致接下來其他實例就無法從那個實例拉取,如果你開啟了消息持久化,讓 RabbitMQ 落地存儲消息的話,消息不一定會丟,得等這個實例恢復了,然後才可以繼續從這個 queue 拉取數據。
2.3鏡像集群模式(支持高可用)
這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 里的消息都會存在於多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。然後每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。
3.RocketMQ高可用
RocketMQ的部署方式有兩種,分別為:單節點模式(不支持高可用),多節點模式
3.1.單節點模式(不支持高可用)
單節點部署方式因為不支持高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
3.2.多節點模式
3.2.1.多Master模式(不支持高可用)
一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master
配置簡單,單個Master 宕機或重啟維護對應用無影響。
單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到受到影響。
3.2.2.多Master多Slave模式(非同步複製)(支持高可用)
每個 Master 配置一個 Slave,有多對Master-Slave, HA,採用非同步複製方式,主備有短暫消息延遲,毫秒級。
即使磁碟損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機後,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。
Master 宕機,磁碟損壞情況,會丟失少量消息。
3.2.3.多Master多Slave模式(同步雙寫)(支持高可用)
每個 Master 配置一個 Slave,有多對Master-Slave, HA採用同步雙寫方式,主備都寫成功,嚮應用返回成功。
數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高
性能比非同步複製模式略低,大約低 10%左右,發送單個消息的 RT會略高。
4.Kafka高可用
Kafka的部署方式有三種,分別為:單broke節點(不支持高可用),單機多broker模式(支持高可用),多機多broker模式(支持高可用)
4.1.單broke節點(不支持高可用)
單節點部署方式因為不支持高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
4.2.單機多broker模式(支持高可用)
這種部署方式其實是一種偽集群模式,單機部署多節點如果出現伺服器宕機,那麼所有節點都不能正常提供服務。
4.3.多機多broker模式(支持高可用)
Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
Kafka 0.8 以後,提供了 HA 機制,就是 replica(複製品) 副本機制。每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那麼生產和消費都跟這個 leader 打交道,然後其他 replica 就是 follower。寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那麼就要 care 數據一致性的問題,系統複雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分佈在不同的機器上,這樣才可以提高容錯性。
五、消息重覆消費問題
引言:為什麼要考慮重覆消費的問題?比如我們消費後通過消費中間件來調用,扣費10元,但是消費者消費消息後還沒來得及進行確認,消息中間件進行了重啟,那麼消息者就會進行再次扣費處理,這樣就會出問題!
ActiveMQ、RabbitMQ、RocketMQ、Kafka,都有可能會出現消息重覆消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。
我們以Kafka為例說明一下重覆消費的問題:
Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然後 consumer 消費了數據之後,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”。
但是,如果在這期間重啟系統或者直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset。重啟之後,少數消息會再次消費一次。
如果消費者乾的事兒是拿一條數據就往資料庫里寫一條,會導致說,你可能就把數據在資料庫里插入了 2 次,那麼數據就錯啦。
重覆消費問題引發後,我們就需要考慮怎麼保證冪等性。
冪等性,通俗點說,就一個數據,或者一個請求,給你重覆來多次,你得確保對應的數據是不會改變的,不能出錯。
保證冪等性的具體實現方式需要結合對應的業務去實現,這裡提供幾個思路:
- 如果是數據插入操作,插入前我們根據唯一鍵先進行查詢,如果已有數據那我們只進行更新就行。
- 如果是寫 Redis,則我們無需考慮冪等性,反正每次都是 set,天然冪等性。
- 如果是基於資料庫的唯一鍵來保證重覆數據不會重覆插入多條。因為有唯一鍵約束了,重覆數據插入只會報錯,不會導致資料庫中出現臟數據。
- 如果不是以上集中通用的場景,那需要我們發送消息的時候攜帶唯一ID,消費者在消費前進行相應的查重處理,處理後在進行相應的業務操作。
六、消息丟失問題
引言:MQ 有個基本原則,就是數據不能多一條,也不能少一條。
不能多,就是上面說的重覆消費和冪等性問題。
不能少,就是說這數據別搞丟了。那這個問題你必須得考慮一下。
消息丟失的問題需要從生產者、MQ、消費者三個方面來進行考慮,相應的解決方案也需要從這三方面出發(生產者確認機制,MQ消息持久化、消費者確認機制)。
1.ActiveMQ
1.生產者丟失消息
生產者丟失消息的問題可以通過消息重投、重試機制來解決
2.ActiveMQ丟失消息
ActiveMQ丟失消息的問題需要通過ActiveMQ消息持久化機制+高可用(見ActiveMQ章節)來解決,ActiveMQ的消息持久化機制有以下幾種
JDBC: 持久化到資料庫
AMQ :日誌文件(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中查看預設的broker持久化機制。
3.消息者丟失消息
消費者丟失消息通過ack機制來解決,消息者進行業務處理後,再進行ack確認,避免消息丟失。
2.RabbitMQ
1.生產者丟失消息
生產者消息丟失,通過confirm機制來確認消息發送,然後進行相應的消息重投、重試機制
2.RabbitMQ丟失消息
RabbitMQ丟失消息的問題需要通過RabbitMQ消息持久化機制+高可用(見RabbitMQ章節)來解決,
RabbitMQ持久化包含:
Queue(消息隊列)的持久化是通過durable=true來實現的。
Message(消息)的持久化 ,通過設置消息是持久化的標識。
Exchange(交換機)的持久化 。
3.消息者丟失消息
消費者丟失消息通過ack機制來解決,消息者進行業務處理後,再進行ack確認,避免消息丟失。
3.RocketMQ
1.生產者丟失消息
生產者消息丟失,通過confirm機制來確認消息發送,然後進行相應的消息重投、重試機制
2.RocketMQ丟失消息
RocketMQ丟失消息的問題需要通過RocketMQ消息持久化機制+高可用(見RocketMQ章節)來解決,
RocketMQ持久化包含:exchange持久化、queue持久化、message持久化
3.消息者丟失消息
消費者丟失消息通過ack機制來解決,消息者進行業務處理後,再進行ack確認,避免消息丟失。
4.Kafka
1.生產者丟失消息
生產者消息丟失,通過confirm機制來確認消息發送,然後進行相應的消息重投、重試機制
2.Kafka丟失消息
Kafka直接將數據寫入到日誌文件中,以追加的形式寫入
3.消息者丟失消息
消費者丟失消息通過ack機制來解決,消息者進行業務處理後,再進行ack確認,避免消息丟失。
總結:其實MQ消息丟失,無非就是生產者發送時丟失,MQ傳遞時丟失,消費者消費時丟失幾種問題,我們相應的從以上三方面解決就可以,但是上述三種方式使用後,其實也不能保證100%消息不丟失,所以往往在業務場景還會使用資料庫輔助記錄的方式,來保證消息不丟失。但資料庫輔助記錄方式對相關性能以及使用用較大的影響,所以一般數據只需要進行上面三種方式處理,就能保證消息基本不丟失。發生消息丟失時我們配合日誌進行相應的消息恢復就可以。
資料庫輔助記錄:生產者發送消息時同步發送一條消息到資料庫中,消費者拿到消息並完成業務處理後,從資料庫刪除對應的記錄。
七、消息順序性問題
引言:為什麼要保證消息的順序性?
比如現在我們有個賬號餘額為5,我們充值50元,購買一件20元的商品,但因消息不能保證順序,導致先進行扣費處理,這樣就會導致我們購買失敗。
消息順序性消費情況,尤其在高可用(集群方式)下一定要考慮。
1.ActiveMQ
ActiveMQ因為預設是單queue 隊列,所以它模式就是保證消息順序性消費的。
2.RabbitMQ
- 將RabbitMQ拆分多個 queue,每個 queue 一個 consumer,保證消息的順序性。
- 一個 queue 但是對應一個 consumer,然後這個 consumer 內部用記憶體隊列做排隊,然後分發給底層不同的 worker 來處理。
3.RocketMQ
RocketMQ保證消息順序性方法與Kafka大致相同。
- 一個 topic,一個 queue,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。
- 寫 N 個記憶體 queue,具有相同 key 的數據都到同一個記憶體 queue;然後對於 N 個線程,每個線程分別消費一個記憶體 queue 即可,這樣就能保證順序性。
4.Kafka
- 一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。
- 寫 N 個記憶體 queue,具有相同 key 的數據都到同一個記憶體 queue;然後對於 N 個線程,每個線程分別消費一個記憶體 queue 即可,這樣就能保證順序性。
八、消息積壓問題
引言:如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以後該怎麼處理?有幾百萬消息持續積壓幾小時,怎麼處理?
其實消息積壓的問題,一般都是由消費端出了問題導致的,在實際業務場景中一般不會出現,但是出現問題一般都是大問題。
模擬場景:
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。由於消費者宕機導致現在MQ中積壓幾百萬數據
解決思路:
- 先修複 consumer 的問題,確保其恢復消費速度,然後將現有 consumer 都停掉(避免重覆消費)。
- 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
- 然後寫一個臨時的分發數據的 consumer 程式,這個程式部署上去消費積壓的數據,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
- 接著臨時徵用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。
- 等快速消費完積壓數據之後,得恢複原先部署的架構,重新用原先的 consumer 機器來消費消息。
mq 中的消息過期失效了
假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。
假設 1 萬個訂單積壓在 mq 裡面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程式把那 1000 個訂單給查出來,手動發到 mq 里去再補一次。
mq 都快寫滿了
如果消息積壓在 mq 里,長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?
這種情況下只能是通過增加臨時Consumer將數據進行快速消費,等MQ恢復正常後再補充數據。
RocketMQ方案
對於 RocketMQ,官方針對消息積壓問題,提供瞭解決方案。
- 提高消費並行度
絕大部分消息消費行為都屬於 IO 密集型,即可能是操作資料庫,或者調用 RPC,這類消費行為的消費速度在於後端資料庫或者外系統的吞吐量,通過增加消費並行度,可以提高總的消費吞吐量,但是並行度增加到一定程度,反而會下降。所以,應用必須要設置合理的並行度。 如下有幾種修改消費並行度的方法:
同一個 ConsumerGroup 下,通過增加 Consumer 實例數量來提高並行度(需要註意的是超過訂閱隊列數的 Consumer 實例無效)。可以通過加機器,或者 在已有機器啟動多個進程的方式。 提高單個 Consumer 的消費並行線程,通過修改參數 consumeThreadMin、consumeThreadMax 實現。
-
批量方式消費
某些業務流程如果支持批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設置 consumer 的 consumeMessageBatchMaxSize 返個參數,預設是 1,即一次只消費一條消息,例如設置為 N,那麼每次消費的消息數小於等於 N。 -
跳過非重要消息
發生消息堆積時,如果消費速度一直追不上發送速度,如果業務對數據要求不高的話,可以選擇丟棄不重要的消息。例如,當某個隊列的消息數堆積到 100000 條以上,則嘗試丟棄部分或全部消息,這樣就可以快速追上發送消息的速度。示例代碼如下: -
優化每條消息消費過程
舉例如下,某條消息的消費過程如下:
九、自我實現消息隊列思路
引言:如果讓你寫一個消息隊列,該如何進行架構設計?
比如說消息隊列系統,我們從以下幾個角度來考慮一下:
- 可擴展性:就是需要的時候快速擴容,就可以增加吞吐量和容量。可以參考afka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分數據。
- 持久化:為了保證MQ的消息不丟失,設計時一定要考慮消息的持久化機制,且持久化要順序寫,這樣就沒有磁碟隨機讀寫的定址開銷,磁碟順序讀寫的性能是很高的,這就是 kafka 的思路。
- 高可用:保證MQ的可靠性,可以參考kafka 。多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務。
- 能不能支持數據 0 丟失啊?可以的,參考我們之前說的那個 kafka 數據零丟失方案。
十、MQ總結
其實MQ的使用,無非就是從原理,高可用,重覆消息,順序讀寫,數據丟失幾個方面開展。
上述的介紹是偏重思路方面來進行展開的,至於具體的MQ使用細節,我想你有了對應的思路去查會有一大堆。這也是我學習技術的一個思路,先掌握一個大的方向,然後沿著一個大的方向再進行相應的詳細學習。
最後,上述MQ介紹中,大部分都是有我平時開發積累所得,也有一部分是藉助網路現場學習。
如有不足或錯誤,歡迎大家指出,我們共同學習進步!