概要:在使用storm分散式計算框架進行數據處理時,如何保證進入storm的消息的一定會被處理,且不會被重覆處理。這個時候僅僅開啟storm的ack機制並不能解決上述問題。那麼該如何設計出一個好的方案來解決上述問題? 現有架構背景:本人所在項目組的實時系統負責為XXX的實時產生的交易記錄進行處理,根 ...
概要:在使用storm分散式計算框架進行數據處理時,如何保證進入storm的消息的一定會被處理,且不會被重覆處理。這個時候僅僅開啟storm的ack機制並不能解決上述問題。那麼該如何設計出一個好的方案來解決上述問題?
現有架構背景:本人所在項目組的實時系統負責為XXX的實時產生的交易記錄進行處理,根據處理的結果向用戶推送不同的信息。實時系統平時接入量每秒1000條,雙十一的時候,最大幾十萬條。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6219878.html 可接網站開發,java開發。 新浪微博:intsmaze劉洋洋哥 微信:intsmaze
|
架構設計:
storm設置的超時時間為3分鐘;kafkaspout的pending的長度為2000;storm開啟ack機制,拓撲程式中如果出現異常則調用ack方法,向spout發出ack消息;每一個交易數據會有一個全局唯一性di。
處理流程:
交易數據會發送到kafka,然後拓撲A去kafka取數據進行處理,拓撲A中的OnceBolt會先對從kafka取出的消息進行一個唯一性過濾(根據該消息的全局id判斷該消息是否存儲在redis中,如果有,則說明拓撲A已經對該消息處理過了,則不會把該消息發送該下游的calculateBolt,直接向spout發送ack響應;如果沒有,則把該消息發送該下游的calculateBolt。),calculateBolt對接收到來自上游的數據進行規則的匹配,根據該消息所符合的規則推送到不同的kafka通知主題中。
拓撲B則是不同的通知拓撲,去kafka讀取對應通知的主題,然後把該消息推送到不同的客戶端(微信客戶端,支付寶客戶端等)。
架構設計的意義:
通過借用redis,來保證消息不會被重覆處理,對異常的消息,我們不讓該消息重發。
因為系統只是對交易成功後的數據通過配置的規則進行區分來向用戶推送不同的活動信息,從業務上看,系統並不需要保證所有交易的用戶都一定要收到活動信息,只需要保證交易的用戶不會收到重覆的數據即可。
但是線上上運行半年後,還是發現了消息重覆處理的問題,某些用戶還是會收到兩條甚至多條重覆信息。
通過對現有架構的查看,我們發現問題出在拓撲B中(各個不同的通知拓撲),原因是拓撲B沒有添加唯一性過濾bolt,雖然上游的拓撲對消息進行唯一性過濾了(保證了外部系統向kafka生產消息出現重覆下,拓撲A不進行重覆處理),但是回看拓撲B,我們可以知道消息重發絕對不是kafka主題中存在重覆的兩條消息,且拓撲B消息重覆不是系統異常導致的(我們隊異常進行ack應答),那麼導致消息重覆處理的原因就一定是消息超時導致的。ps:消息在storm中被處理,沒有發生異常,而是由於集群硬體資源的爭搶或者下游介面瓶頸無法快速處理拓撲B推送出去的消息,導致一條消息在3分鐘內沒有處理完,spout就認為該消息fail,而重新發該消息,但是超時的那一條消息並不是說不會處理,當他獲得資源了,仍然會處理結束的。
解決方案:在拓撲B中添加唯一性過濾bolt即可解決。
個人推測:當時實時系統架構設計時,設計唯一性過濾bolt時,可能僅僅是考慮到外部系統向kafka推送數據可能會存在相同的消息,並沒有想到storm本身tuple超時導致的消息重覆處理。
該系統改進:雖然從業務的角度來說,並不需要保證每一個交易用戶都一定要收到活動信息,但是我們完全可以做到每一個用戶都收到活動信息,且收到的消息不重覆。
我們可以做到對程式的異常進行控制,但是超時導致的fail我們無法控制。
我們對消息處理異常控制,當發生異常信息,我們在發送fail應答前,把該異常的消息存儲到redis中,這樣唯一性過濾的bolt就會對收到的每一條消息進行判斷,如果在redis中,我們就知道該消息是異常導致的失敗,就讓該消息繼續處理,如果該消息不在redis中,我們就知道該消息是超時導致的fail,那麼我們就過濾掉該消息,不進行下一步處理。
這樣我們就做到了消息的可靠處理且不會重覆處理。