針對消息隊列的數據積壓問題,我們主要做了三個方面的優化處理,取消同步鎖、ActiveMQ參數優化、本地雙隊列優化,通過這三個方面的優化基本解決了隊列數據積壓的問題。 ...
1 概述
最近生產環境的消息通知隊列發生了大量的數據積壓問題,從而影響到整個平臺商戶的交易無法正常進行,最後只能通過臨時關閉交易量較大的商戶來緩解消息隊列積壓的問題,經線上數據分析,我們的消息隊列在面對交易突發洪峰的情況下無法快速的消費並處理隊列中的數據,考慮到後續還會出現各種交易量突髮狀況,以下為針對消息隊列(ActiveMQ)的優化過程。
2 消息隊列通信圖
3 問題定位與分析
3.1 消息通知數據為什麼會被積壓?
分析:平臺中每個交易的發生可能會產生一到多條的消息通知數據,這些通知數據會通過消息隊列(ActiveMQ)來中轉消費並處理,那麼在交易量突發洪峰的情況下會產生大量的消息通知數據,如果消息隊列(ActiveMQ)的消費能力被阻塞的話會嚴重影響到數據的吞吐量,從而積壓大量數據無法被快速處理!
3.2 配置了多個ActiveMQ的消費者為什麼數據積壓還是無法緩解?
分析:經過分析消息隊列的數據消費處理模塊的代碼,消息的消費處理是通過監聽器SessionAwareMessageListener非同步回調onMessage方法而接收消息的,但是在回調的方法onMessage上加了synchronized同步鎖,問題就在這裡,由於整個onMessage方法被鎖,導致程式只能通過串列(一次只能消費一條數據)處理數據,而無法通過多線程併發處理數據,從而影響了整個隊列的數據消費能力。
public synchronized void onMessage(Message message, Session session)
3.3 去掉synchronized同步鎖會產生多線程併發的安全性問題嗎?
分析:首先多個消費者併發處理的數據是不同的,而且多個消費者線程併發回調onMessage方法的時候並未使用到共用的變數,全部在各自線程的方法棧中,所以理論上不會出現多線程併發產生的安全性問題。
3.4 消息會被重覆多次消費嗎?
分析:
(1)通過分析ActiveMQ的消費者消息接收處理的源代碼發現,一條消息是否已經消費是通過ack確認機制來保證的,如果是通過非同步回調的方式接收消息的話,在onMessage回調函數返回之後會立即進行ack確認提交,那麼只要保證onMessage函數內部不拋出異常,及需要內部捕獲異常,那麼消息就不會被重覆消息。
(2)因為我們的系統在接收到消息後會首先存入db中進行持久化,而且每條消息在存入資料庫的時候都做了唯一性約束,那麼即使有重覆的消息也不會被正常處理。
4 階段一優化方案
4.1 準備測試數據
啟動多個線程分別往MQ消息隊列中發送數據,共發送15000個消息,然後啟動消費者模塊消費消息,設定每個消息處理耗時為10ms,配置ActiveMQ的消費者數量為concurrency = 5-100
4.2 優化前性能測試
測試次數 | 是否併發處理 | 消息數量 |
queuePrefetch | consumers | 耗時 |
---|---|---|---|---|---|
1 | 否 | 15000 | 1000 | 15 | 151s |
2 | 否 | 15000 | 1000 | 16 | 151s |
3 | 否 | 15000 | 1000 | 15 | 151s |
優化前通過測試數據發現,雖然配置了concurrency = 5-100 (消費者動態伸縮),但是只有15個消費者在忙碌,而且消息都是串列化執行的,15000條消息共需要151s的時間,效率非常差,ps:哈哈,不知道是哪位開發的大神加的同步鎖!
註:queuePrefetch 為MQ的消費者一次從Queue中拉取的數量,預設為1000,consumers為處理消息的消費者數量
4.3 優化後性能測試
4.3.1 取消同步鎖
取消在監聽器的回調方法onMessage上的synchronized同步鎖
4.3.2 取消同步鎖後的性能測試
測試次數 | 是否併發處理 | 消息數量 |
queuePrefetch | consumers | 耗時 |
---|---|---|---|---|---|
1 | 是 | 15000 | 1000 | 14 | 13s |
2 | 是 | 15000 | 1000 | 15 | 13s |
3 | 是 | 15000 | 1000 | 15 | 13s |
通過以上數據發現取消同步鎖,15000條消息只需要13s就可以處理完,相比之前快了近12倍,雖然速度提升了不少,但是發現配置了5-100的消費者,確只有15個消費者在忙碌,其他消費者都沒有消息可處理,及造成了數據傾斜,那麼接下來就要通過優化queuePrefetch 參數了。
4.3.3 優化ActiveMQ的queuePrefetch 參數
預獲取消息數量是MQ中重要的調優參數之一,為了提高網路的傳輸效率,ActiveMQ預設給Consumer批量push 1000條消息,可以從ActiveMQ源碼中的ActiveMQPrefetchPolicy類的DEFAULT_QUEUE_PREFETCH欄位得知,考慮到我們的通知消息的消費處理中涉及到資料庫的操作,以及綜合網路傳輸效率,這裡將queuePrefetch的值設置為100,具體需配置到ActiveMQ的連接地址後,如:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100
4.3.4 優化queuePrefetch參數後的性能測試
測試次數 | 是否併發處理 | 消息數量 |
queuePrefetch | consumers | 耗時 |
---|---|---|---|---|---|
1 | 是 | 15000 | 100 | 40 | 7s |
2 | 是 | 15000 | 100 | 47 | 5s |
3 | 是 | 15000 | 100 | 41 | 6s |
將ActiveMQ的queuePrefetch參數修改為100,那麼發現有近一半的消費者在處理數據,最後15000條消息需要6s中就可以處理完成。
4.3.5 結論
通過以上兩步的優化後的測試結果可以得出,取消同步鎖之後隊列的消費能力提升了近11倍,在取消同步鎖的基礎上再優化ActiveMQ批處理參數後性能又提升了近1倍,綜合以上兩步的優化處理,隊列整體的消費能力提高了30多倍。
5 階段二優化方案
階段二的優化方案是在階段一的基礎上進行的優化處理
5.1 單隊列處理
由於我們的消息通知業務屬於冪等性操作,會按照設定的通知次數來反覆通知處理,直到通知成功為止,我們系統現在的做法是將接收到MQ的消息暫存於延時隊列(DelayQueue)中,然後通過多線程輪訓取出,然後通過HTTP通知到其他模塊處理,如果通知失敗,則重新放入同一個延時隊列等待下次執行,如上圖:消息1通知失敗後會重新放入延時隊列。
註:單隊列處理的不足
由於使用了單隊列處理,使得可以一次通知成功的消息與通知多次失敗的消息混合在了一起,這樣在隊列中失敗通知的消息就會阻塞到後續可以正常通知的消息,最終導致消息整體的一個吞吐量下降
5.2 雙隊列處理
針對5.1單隊列的不足,我們可以重新設計,將單隊列設計為雙隊列處理,雙隊列的核心思想為如果隊列1中的消息通知失敗,則不再重新放入隊列1,而是放入隊列2去通知,這樣可以起到消息數據分離的作用,及失敗通知的數據不再會影響到後續可以成功通知的消息,從而提高隊列消息通知的整體性能!
6 階段三優化方案
6.1 MQ組件重選型
ActiveMQ是一個老牌的消息隊列組件,吞吐量方面表現不是很理想,適合在業務量不大的場景中使用,現在有非常多比較成熟及高性能高吞吐的消息隊列組件可供我們選擇,如:RabbitMQ、RocketMQ、Kafka,後續可根據實際情況考慮替換掉ActiveMQ組件。
7 總結
針對消息隊列的數據積壓問題,我們主要做了三個方面的優化處理,取消同步鎖、ActiveMQ參數優化、本地雙隊列優化,通過這三個方面的優化基本解決了隊列數據積壓的問題。
文章來源:https://my.oschina.net/feinik/blog/1674168
更多相關內容推薦:http://www.roncoo.com/article/index?title=mq