需求背景 用戶下訂單成功之後隔20分鐘給用戶發送上門服務通知簡訊 訂單完成一個小時之後通知用戶對上門服務進行評價 業務執行失敗之後隔10分鐘重試一次 類似的場景比較多 簡單的處理方式就是使用定時任務 假如數據比較多的時候 有的數據可能延遲比較嚴重,而且越來越多的定時業務導致任務調度很繁瑣不好管理。 ...
需求背景
- 用戶下訂單成功之後隔20分鐘給用戶發送上門服務通知簡訊
- 訂單完成一個小時之後通知用戶對上門服務進行評價
-
業務執行失敗之後隔10分鐘重試一次
類似的場景比較多 簡單的處理方式就是使用定時任務 假如數據比較多的時候 有的數據可能延遲比較嚴重,而且越來越多的定時業務導致任務調度很繁瑣不好管理。
隊列設計
目前可以考慮使用rabbitmq
來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的MQ中間件。
開發前需要考慮的問題?
- 及時性 消費端能按時收到
- 同一時間消息的消費權重
- 可靠性 消息不能出現沒有被消費掉的情況
- 可恢復 假如有其他情況 導致消息系統不可用了 至少能保證數據可以恢復
- 可撤回 因為是延遲消息 沒有到執行時間的消息支持可以取消消費
- 高可用 多實例 這裡指HA/主備模式並不是多實例同時一起工作
-
消費端如何消費
當然初步選用
redis
作為數據緩存的主要原因是因為redis
自身支持zset
的數據結構(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間維度去判定執行的順序 同時也支持map
list
數據結構。
簡單定義一個消息數據結構
private String topic;/***topic**/ private String id;/***自動生成 全局惟一 snowflake**/ private String bizKey; private long delay;/***延時毫秒數**/ private int priority;//優先順序 private long ttl;/**消費端消費的ttl**/ private String body;/***消息體**/ private long createTime=System.currentTimeMillis(); private int status= Status.WaitPut.ordinal(); |
運行原理:
- 用
Map
來存儲元數據。id作為key,整個消息結構序列化(json/…)之後作為value,放入元消息池中。 - 將
id
放入其中(有N個)一個zset
有序列表中,以createTime+delay+priority作為score。修改狀態為正在延遲中 - 使用timer實時監控
zset
有序列表中top 10的數據 。 如果數據score<
=當前時間毫秒就取出來,根據topic
重新放入一個新的可消費列表(list
)中,在zset中刪除已經取出來的數據,並修改狀態為待消費 - 客戶端獲取數據只需要從可消費隊列中獲取就可以了。並且狀態必須為待消費 運行時間需要<=當前時間的 如果不滿足 重新放入
zset
列表中,修改狀態為正在延遲。如果滿足修改狀態為已消費。或者直接刪除元數據。
客戶端
因為涉及到不同程式語言的問題,所以當前預設支持http
訪問方式。
- 添加延時消息添加成功之後返回消費唯一ID POST /push {…..消息體}
- 刪除延時消息 需要傳遞消息ID GET /delete?id=
- 恢復延時消息 GET /reStore?expire=true|false expire是否恢復已過期未執行的消息。
- 恢復單個延時消息 需要傳遞消息ID GET /reStore/id
- 獲取消息 需要長連接 GET /get/topic
用nginx暴露服務,配置為輪詢 在添加延遲消息的時候就可以流量平均分配。
目前系統中客戶端並沒有採用HTTP長連接的方式來消費消息,而是採用MQ的方式來消費數據這樣客戶端就可以不用關心延遲消息隊列。只需要在發送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統處理。
消息可恢復
實現恢復的原理 正常情況下一般都是記錄日誌,比如mysql
的binlog
等。
這裡我們直接採用mysql
資料庫作為記錄日誌。
目前打算創建以下2張表:
- 消息表 欄位包括整個消息體
- 消息流轉表 欄位包括消息ID、變更狀態、變更時間、
zset
掃描線程Name、host/ip
定義zset
掃描線程Name是為了更清楚的看到消息被分發到具體哪個zset
中。前提是zset
的key和監控zset
的線程名稱要有點關係 這裡也可以是zset key。
舉個慄子
假如redis
伺服器宕機了,重啟之後發現數據也沒有了。所以這個恢復是很有必要的,只需要從表1也就是消息表
中把消息狀態不等於已消費的數據全部重新分發到延遲隊列中去,然後同步一下狀態就可以了。
當然恢復單個任務也可以這麼乾。
關於高可用
分散式協調還是選用zookeeper
吧。
如果有多個實例最多同時只能有1個實例工作 這樣就避免了分散式競爭鎖帶來的壞處,當然如果業務需要多個實例同時工作也是支持的,也就是一個消息最多只能有1個實例處理,可以選用zookeeper
或者redis
就能實現分散式鎖了。
最終做了一下測試多實例同時運行,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。所以比較推薦基於docker的主備部署模式。
擴展
支持zset
隊列個數可配置 避免大數據帶來高延遲的問題。
目前存在日誌和redis
元數據有可能不一致的問題 如mysql
掛了,寫日誌不會成功。
設計圖:
歡迎關註我的微信公眾號<笑笑笑技術圈> 我會不定期發佈一些不限於技術的文章