Redis5新特性Streams作消息隊列

来源:https://www.cnblogs.com/ytao-blog/archive/2020/03/19/12522070.html
-Advertisement-
Play Games

前言 Redis 5 新特性中,Streams 數據結構的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊列使用時,得到更完善,更強大的原生支持,其中尤為明顯的是持久化消息隊列。同時,stream 借鑒了 kafka 的消費組模型概念和設計,使消費消息處理上 ...


前言

Redis 5 新特性中,Streams 數據結構的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊列使用時,得到更完善,更強大的原生支持,其中尤為明顯的是持久化消息隊列。同時,stream 借鑒了 kafka 的消費組模型概念和設計,使消費消息處理上更加高效快速。本文就 Streams 數據結構中常用 API 進行分析。

準備

本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

添加消息

Streams 添加數據使用 XADD 指令進行添加,消息中的數據以 K-V 鍵值對的形式進行操作。一條消息可以存在多個鍵值對,添加命令格式:

XADD key ID field string [field string ...]

其中 key 為 Streams 的名稱,ID 為消息的唯一標誌,不可重覆,field string 就為鍵值對。下麵我們就添加以 person 為名稱的流,進行操作。

XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 號複製,這裡代表著服務端自動生成 Id,添加後返回數據 "1578238486193-0"

這裡自動生成的 Id 格式為 <millisecondsTime>-<sequenceNumber>
Id 是由兩部分組成:

  1. millisecondsTime 為當前伺服器時間毫秒時間戳。
  2. sequenceNumber 當前序列號,取值來源於當前毫秒內,生成消息的順序,預設從 0 開始加 1 遞增。

比如:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條消息。

除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:

  1. Id 中的前後部分必須為數字。
  2. 最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。
  3. 添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 後半部分不能比存在前半部分相同的最大後半部分小。

否則,當不滿足上述條件時,添加後會拋出異常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

實際上,當添加一條消息時,會進行兩部操作。第一步,先判斷如果不存在 Streams,則創建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時,由於 Id 異常,也可以在 Redis 中存在以當前 Streams 的名稱。
Streams 中 Id 也可作為指針使用,因為它是一個有序的標記。

生產中,如果這樣使用添加消息,會存在一個問題,那就是消息數量太大時,會使服務宕機。這裡 Streams 的設計初期也有考慮到這個問題,那就是可以指定 Streams 的容量。如果容量操作這個設定的值,就會對調舊的消息。在添加消息時,設置 MAXLEN 參數。

XADD person MAXLEN 5 * name ytao des https://ytao.top

這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多餘的消息:

XTRIM person MAXLEN 8

消息數量

查看消息數量使用 XLEN 指令進行操作。

XLEN key

例:查看 person 流中的消息數量:

> XLEN person
(integer) 5

查詢消息

查詢 Streams 中的消息使用 XRANGEXREVRANGE 指令。

XRANGE

查詢數據時,可以按照指定 Id 範圍進行查詢,XRANGE 查詢指令格式:

XRANGE key start end [COUNT count]

參數說明:

  • key 為 Streams 的名稱
  • start 為範圍查詢開始 Id,包含本 Id。
  • start 為範圍查詢結束 Id,包含本 Id。
  • Count 為查詢返回最大的消息數量,非必填。

這裡 start 和 end 有-+兩個非指定值,他們分別表示無窮小和無窮大,所以當使用這個兩個值時,會查詢出全部的消息。

> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"

上面查詢的消息數據,可以看到是按照先進先出的順序查詢出來的。

使用 COUNT 指定查詢返回的數量:

# 查詢所有的消息,並且返回一條數據
> XRANGE person - + COUNT 1
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

在範圍查詢中,Id 的後半部分可省略,後半部分中的數據會全部查詢到。

XREVRANGE

XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數順序進行了調換:

XREVRANGE key end start [COUNT count]

使用案例:

> XREVRANGE person +  -
1) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

查詢後的結果與 XRANGE 的結果順序剛好相反,其他都一樣,這兩個指令可進行消息的升序和降序的返回。

刪除消息

刪除消息使用 XDEL 指令操作,只需指定將要刪除的 Streams 名稱和 Id 即可,支持一次刪除多個消息 。

XDEL key ID [ID ...]

刪除案例:

# 查詢所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
# 刪除消息      
> XDEL person 2-0
(integer) 1
# 再次查詢刪除後的所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
# 查詢刪除後的長度      
> XLEN person
(integer) 2            

從上面可以看到,刪除消息後,長度也會減少相應的數量。

消費消息

在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費消息,在 Streams 數據結構中,同樣也能實現同等功能,當沒有新的消息時,可進行阻塞等待。不僅支持單獨消費,而且還可以支持群組消費。

單獨消費

單獨消費使用 XREAD 指令。可以看到,下麵命令中,STREAMS,key, 以及 ID 為必填項。ID 表示將要讀取大於該 ID 的消息。當 ID 值使用 $ 賦予時,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 參數用來指定讀取的最大數量,與 XRANGE 的用法一樣。

> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"

> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"
      2) 1) "0-2"
         2) 1) "name"
            2) "luffy"
            3) "des"
            4) "valiant!"

XREAD 裡面還有個 BLOCK 參數,這個是用來阻塞訂閱消息的,BLOCK 攜帶的參數為阻塞時間,單位為毫秒,如果在這個時間內沒有新的消息消費,那麼就會釋放該阻塞。當這裡的時間指定為 0 時,會一直阻塞,直到有新的消息來消費到。

# 視窗 1 開啟阻塞,等待新消息的到來
> XREAD BLOCK 0 STREAMS person $

# 另開一個連接視窗 2,添加一條新的消息
> XADD person 2-2 name tao des coder
"2-2"

# 視窗 1,獲取到有新的消息來消費,並且帶有阻塞的時間
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
   2) 1) 1) "2-2"
         2) 1) "name"
            2) "tao"
            3) "des"
            4) "coder"
(60.81s)

當使用 XREAD 進行順序消費時,需要額外記錄下讀取到位置的 Id,方便下次繼續消費。

群組消費

群組消費的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達到這一肝功能需求,我們需要做三件事:創建群組群組讀取消息向服務端確認消息以處理

群組操作

操作群組使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  • CREATE 創建消費組。
  • SETID 修改下一個處理消息的 Id。
  • DESTROY 銷毀消費組。
  • DELCONSUMER 刪除消費組中指定的消費者。

我們當前需要使用的是創建消費組:

# 以當前存在的最大 Id 作為消費起始 
> XGROUP CREATE person group1 $
OK

群組讀取消息

群組讀取使用 XREADGROUP 指令,COUNTBLOCK的使用類似 XREAD 的操作,只是多了個群組和消費者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由於群組消費和單獨消費類似,這裡只進行個阻塞分析,這裡 Id 也有個特殊值>,表示還未進行消費的消息:

# 視窗 1,消費群組中,taotao 消費者建立阻塞監聽
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

# 視窗 2,消費群組中,yangyang 消費者建立阻塞監聽 
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

# 視窗 3,添加消費消息
> XADD person 3-1 name tony des 666
"3-1"

# 視窗 1,讀取到新消息,此時 視窗 2 沒有任何反應
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-1"
         2) 1) "name"
            2) "tony"
            3) "des"
            4) "666"
(77.54s)

# 視窗 3,再次添加消費消息
> XADD person 3-2 name james des abc!
"3-2"

# 視窗 2,讀取到新消息,此時 視窗 1 沒有任何反應
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"
(76.36s)

以上執行流程中,group1 群組中有兩個消費者,當添加兩條消息後,這兩個消費者輪流消費。

消息ACK

消息消費後,為避免再次重覆消費,這是需要向服務端發送 ACK,確保消息被消費後的標記。
例如下列情況,我們上面我們將最新兩條消息已進行了消費,但是當我們再次讀取消息時,還是被讀到:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"

這時,我們使用 XACK 指令告訴伺服器,我們已處理的消息:

XACK key group ID [ID ...]0

讓伺服器標記 3-2 已處理:

> XACK person group1 3-2
(integer) 1

再次獲取群組讀取消息:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) (empty list or set)

隊列中沒有了可讀消息。
除了上面以講解到的 API 外,查看消費群組信息可使用 XINFO 指令查看,本文不做分析。

總結

上面對 Streams 常用 API 進行了分析,我們可以感受到 Redis 在消息隊列支持的道路上,也越來越強大。如果使用過它的 PUB/SUB 功能的話,就會感受到 5.x 迭代正是將你的一些痛點進行了優化。

個人博客: https://ytao.top
關註公眾號 【ytao】,更多原創好文
我的公眾號


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一 Kubernetes集群安全 1.1 安全機制 Kubernetes通過一系列機制來實現集群的安全控制,其中包括API Server的認證授權、準入控制機制及保護敏感信息的Secret機制等。集群的安全性主要有如下目標: 保證容器與其所在宿主機的隔離。 限制容器給基礎設施或其他容器帶來的干擾。 ...
  • (1)Maven項目介紹 詳細介紹請移步官網: http://maven.apache.org/what-is-maven.htm 我們需要知道Maven的主要介紹信息: 1.Maven是屬於apache軟體基金會下一個開源免費的項目,是跨平臺的項目管理工具. 2.Maven採用了一種被稱之為Pro ...
  • 一、安裝、運行、配置docker 1、安裝docker yum install -y docker 2、查看docker是否安裝成功 yum list installed |grep docker 3、啟動docker服務(並設置開機自啟) systemctl start docker.servic ...
  • 首先附上要使用的 "scrcpy源地址" 接下來是如何使用(我用的是安卓手機+win10): 1. 下載好後,首先使用數據線連接手機到電腦,並且手機需要打開 開發人員選項 (不知道如何打開的自行百度); 2. 打開到安裝scrcpy的目錄,按住shift右鍵打開Powershell,輸入以下命令,設 ...
  • 結論: 當MySQL中欄位為int類型時,搜索條件where num='111' 與where num=111都可以使用該欄位的索引。當MySQL中欄位為varchar類型時,搜索條件where num='111' 可以使用索引,where num=111 不可以使用索引 驗證過程: 建表語句: 1 ...
  • 場景 Centos中Redis的下載編譯與安裝(超詳細): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/103967334 Redis的啟動和關閉(前臺啟動和後臺啟動): https://blog.csdn.net/BADAO_ ...
  • 場景 Centos中Redis的下載編譯與安裝(超詳細): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/103967334 Redis的啟動和關閉(前臺啟動和後臺啟動): https://blog.csdn.net/BADAO_ ...
  • 下載 鏈接:https://pan.baidu.com/s/1DpBiI3ZBXVDnFxRxPxnpKg 提取碼:9mcv 一、Oracle11gR2(64位)安裝配置 1、下載後,同時選擇兩個壓縮文件進行解壓,得到安裝程式文件夾; 2、進入安裝文件夾,指定stepup.exe: (1)配置安全更 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...