Redis的List數據類型作為消息隊列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱發佈又無法支持數據的持久化,相對前兩者,Redis Stream作為消息隊列的使用更為有優勢。 相信球迷小伙伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的? 具體說就是用什麼技術實現最為合 ...
Redis的List數據類型作為消息隊列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱發佈又無法支持數據的持久化,相對前兩者,Redis Stream作為消息隊列的使用更為有優勢。 相信球迷小伙伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的? 具體說就是用什麼技術實現最為合適?如何面對數以百萬計的讀壓力?廣告消息是如何插播進來的?最後的歷史消息如何歸檔,如何持久化存儲? 文字直播其實就是解說員作為生產者,生產消息(文字信息),各種客戶端作為消費者,消費信息(刷新文字內容)。 典型的消息隊列實現,可以用隊列或者類似隊列的功能實現,這裡只是簡單想象一下,結合redis中的stream數據類型,來學習stream作為消息隊列的功能實現。
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713084148560-1490132091.png)
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713085539703-629477595.png)
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713090958041-1811331556.png)
xlen "NBA_Match_001"
,也就是上面寫入的10條消息
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713090638106-564724145.png)
1.4 限制某一個stream的最大長度,maxlen
依據先進先出的原則,自動刪除超出最長長度的消息
xadd "NBA_Match_001" maxlen 50000 * "2019-07-13 08:26:39" "反擊哈騰,一條龍上籃得分"
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713091119668-1712700478.png)
正向查詢
xrange "NBA_Match_001" # 查詢所有消息
xrange "NBA_Match_001" - + # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 + # 指定最小消息ID的列表
xrange "NBA_Match_001"- 1562980142175-0 # 指定最大消息ID的列表
反向查詢
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -
xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -
1.6 刪除消息
xdel stream_name id,刪除消息並不是真正的物理刪除,隊列的長度不變,指示標記當前消息被刪除
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713093035083-294845154.png)
2 xread:獨立消費 類似於List,生產者往list中寫數據,消費者從list中讀數據,只能有一個消費者
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713093202675-2016189792.png)
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713093351803-629761693.png)
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713093724444-205592830.png)
3 多消費者xgroup
:消費組,每個組中的消費者獨立消費stream中的消息
典型的比如文字直播的安卓App客戶端,蘋果App客戶端,網頁客戶端等等。多個終端,都可以獨立地消費隊列裡面的
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713093917352-882059705.png)
3.1 創建消費組
對消息隊列"NBA_Match_001"創建了兩個消費組,一個是cg1,一個是cg2,比如網頁客戶端與App客戶端
1,xgroup create "NBA_Match_001" cg1 0-0 # 表示從頭開始消費 創建消費組cg1,消費組必須綁定一個steam(NBA_Match_001),從頭(0-0 )開始消費"NBA_Match_001"中的消息 2,xgroup create "NBA_Match_001" cg2 0-0 # 表示從頭開始消費 3,2 從消費組中創建消費者 xreadgroup指令可以進行消費組的組內消費xreadgroup GROUP cg1 c1 count 1 streams "NBA_Match_001" >
>號表示從當前消費組的last_delivered_id後面開始讀 , 每當消費者讀取一條消息,last_delivered_id變數就會前進
![](https://img2018.cnblogs.com/blog/380271/201907/380271-20190713094434043-2012743419.png)
每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。
每個消費者者有一個組內唯一名稱。
關於消費組,可能不太好理解,舉個例子就比較清楚
假設有2個消費組cg1,cg2,對於cg1,其組內共有3個消費者c1,、c2、c3。一個消息隊列中共有5條消息a,b,c,d,e,那麼一種可能的消費方式如下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是說3個消費者,對於消息的消費是互斥的,消費的消息是沒有交集的
而對於cg2,同樣可以消費a,b,c,d,e這5條消息,不依賴於cg1消費組以及消費情況,同理,具體怎麼消費,取決於其組內的消費者數量
就好比體育直播的客戶端,正常情況下,網頁客戶端可以收到所有的直播消息,手機App客戶端也可以收到所有的直播消息一樣,不同消費組間對消息的消費互不幹擾。
4 多個生產者和多個消費者
這種情況類似以上,不用的是增加了多個消費者,在上面的基礎上做了擴展。
其實不難想象,文字直播插播的廣告消息,可能是類似如下結構,是另外一個獨立的生產者,與文字直播員一樣生成寫入消息到隊列,然後客戶端看到的就是夾雜了廣告的直播。
目前就個人認識而言,stream數據類型實現消息隊列並不完美,最大的問題就是單點壓力問題:這裡是說單點壓力,而不是單點故障,stream類型數據,其實從邏輯上看,是一個key值(stream_name),跟著一系列value(消息),這些消息只能存儲在一個Redis實例中,如何緩解多個消費者對單個Key值中的消息消費壓力?說來說去,不就是想說kafka的partition麽……
參考:
http://database.51cto.com/art/201812/588189.htm
https://www.zhihu.com/question/279540635