Redis的List數據類型作為消息隊列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱發佈又無法支持數據的持久化,相對前兩者,Redis Stream作為消息隊列的使用更為有優勢。 相信球迷小伙伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的? 具體說就是用什麼技術實現最為合 ...
Redis的List數據類型作為消息隊列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱發佈又無法支持數據的持久化,相對前兩者,Redis Stream作為消息隊列的使用更為有優勢。 相信球迷小伙伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的? 具體說就是用什麼技術實現最為合適?如何面對數以百萬計的讀壓力?廣告消息是如何插播進來的?最後的歷史消息如何歸檔,如何持久化存儲? 文字直播其實就是解說員作為生產者,生產消息(文字信息),各種客戶端作為消費者,消費信息(刷新文字內容)。 典型的消息隊列實現,可以用隊列或者類似隊列的功能實現,這裡只是簡單想象一下,結合redis中的stream數據類型,來學習stream作為消息隊列的功能實現。 1,生成者:生產者隊列的創建,與消息的增刪改 1.1 創建並寫入消息 語法:xadd queue_name Id filed value(filed value) 1,每一組消息需要一個唯一的Id,*號表示伺服器自動生成ID,後面順序跟著一組或者多組消息(filed value) 2,消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,並且是該毫秒內產生的第5條消息。 消息ID可以由伺服器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是後面加入的消息的ID要大於前面的消息ID。 3,消息元素的的結構為key-value,必須成對出現,如果key或者value元素中有空格,必須用"abc def"或者'abc def'括起來 1.2 生產者寫入消息 語法:xadd queue_name *|Id filed value 1.3 xlen 當前stream的長度:xlen stream_name
xlen "NBA_Match_001"
,也就是上面寫入的10條消息
1.4 限制某一個stream的最大長度,maxlen
依據先進先出的原則,自動刪除超出最長長度的消息
xadd "NBA_Match_001" maxlen 50000 * "2019-07-13 08:26:39" "反擊哈騰,一條龍上籃得分"
1.5 查詢消息(查詢是生產者查詢自己生產的消息,跟消費者的消費是兩碼事)
正向查詢
xrange "NBA_Match_001" # 查詢所有消息
xrange "NBA_Match_001" - + # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 + # 指定最小消息ID的列表
xrange "NBA_Match_001"- 1562980142175-0 # 指定最大消息ID的列表
反向查詢
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -
xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -
1.6 刪除消息
xdel stream_name id,刪除消息並不是真正的物理刪除,隊列的長度不變,指示標記當前消息被刪除
2 xread:獨立消費 類似於List,生產者往list中寫數據,消費者從list中讀數據,只能有一個消費者 2. 1,從頭部讀取消息,從某個streams中讀取n條消息,0-0只從頭開始,或者指定從streams的Id開始 xread count 1 streams "NBA_Match_001" 0-0 xread count 1 streams "NBA_Match_001" 1562980142175-0 2.2,從尾部讀取最新的一條消息 xread count 1 streams "NBA_Match_001" $ 此時預設不返回任何消息 xread block 0 count 1 streams "NBA_Match_001" $ 以阻塞的方式讀取尾部最新的一條消息,直到新的消息的到來
3 多消費者xgroup
:消費組,每個組中的消費者獨立消費stream中的消息
典型的比如文字直播的安卓App客戶端,蘋果App客戶端,網頁客戶端等等。多個終端,都可以獨立地消費隊列裡面的
3.1 創建消費組
對消息隊列"NBA_Match_001"創建了兩個消費組,一個是cg1,一個是cg2,比如網頁客戶端與App客戶端
1,xgroup create "NBA_Match_001" cg1 0-0 # 表示從頭開始消費 創建消費組cg1,消費組必須綁定一個steam(NBA_Match_001),從頭(0-0 )開始消費"NBA_Match_001"中的消息 2,xgroup create "NBA_Match_001" cg2 0-0 # 表示從頭開始消費 3,2 從消費組中創建消費者 xreadgroup指令可以進行消費組的組內消費xreadgroup GROUP cg1 c1 count 1 streams "NBA_Match_001" >
>號表示從當前消費組的last_delivered_id後面開始讀 , 每當消費者讀取一條消息,last_delivered_id變數就會前進 當一個組的消費則消費完全部消息之後,就沒有新的消息了
每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。
每個消費者者有一個組內唯一名稱。
關於消費組,可能不太好理解,舉個例子就比較清楚
假設有2個消費組cg1,cg2,對於cg1,其組內共有3個消費者c1,、c2、c3。一個消息隊列中共有5條消息a,b,c,d,e,那麼一種可能的消費方式如下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是說3個消費者,對於消息的消費是互斥的,消費的消息是沒有交集的
而對於cg2,同樣可以消費a,b,c,d,e這5條消息,不依賴於cg1消費組以及消費情況,同理,具體怎麼消費,取決於其組內的消費者數量
就好比體育直播的客戶端,正常情況下,網頁客戶端可以收到所有的直播消息,手機App客戶端也可以收到所有的直播消息一樣,不同消費組間對消息的消費互不幹擾。
4 多個生產者和多個消費者
這種情況類似以上,不用的是增加了多個消費者,在上面的基礎上做了擴展。
其實不難想象,文字直播插播的廣告消息,可能是類似如下結構,是另外一個獨立的生產者,與文字直播員一樣生成寫入消息到隊列,然後客戶端看到的就是夾雜了廣告的直播。
目前就個人認識而言,stream數據類型實現消息隊列並不完美,最大的問題就是單點壓力問題:這裡是說單點壓力,而不是單點故障,stream類型數據,其實從邏輯上看,是一個key值(stream_name),跟著一系列value(消息),這些消息只能存儲在一個Redis實例中,如何緩解多個消費者對單個Key值中的消息消費壓力?說來說去,不就是想說kafka的partition麽……
參考:
http://database.51cto.com/art/201812/588189.htm
https://www.zhihu.com/question/279540635