深入理解 Redis 新特性:Stream

来源:https://www.cnblogs.com/xiao2shiqi/archive/2023/04/17/17324547.html
-Advertisement-
Play Games

該數據結構需要 Redis 5.0.0 + 版本才可用使用 概述 Redis stream 是 Redis 5 引入的一種新的數據結構,它是一個高性能、高可靠性的消息隊列,主要用於非同步消息處理和流式數據處理。在此之前,想要使用 Redis 實現消息隊列,通常可以使用例如:列表,有序集合、發佈與訂閱 ...


該數據結構需要 Redis 5.0.0 + 版本才可用使用

概述

Redis stream 是 Redis 5 引入的一種新的數據結構,它是一個高性能、高可靠性的消息隊列,主要用於非同步消息處理和流式數據處理。在此之前,想要使用 Redis 實現消息隊列,通常可以使用例如:列表,有序集合、發佈與訂閱 3 種數據結構。但是 stream 相比它們具有以下的優勢:

  • 支持範圍查找:內置的索引功能,可以通過索引來對消息進行範圍查找
  • 支持阻塞操作:避免低效的反覆輪詢查找消息
  • 支持 ACK:可以通過確認機制來告知已經成功處理了消息,保證可靠性
  • 支持多個消費者:多個消費者可以同時消費同一個流,Redis 會確保每個消費者都可以獨立地消費流中的消息

話不多說,接下來具體看看如何使用它。(PS:萬字長文,行駛途中請系好安全帶)

XADD 添加元素

XADD 命令的語法格式如下:

XADD stream-name id field value [field value]
  • stream-name: 指定 redis stream 的名字
  • id: 是指 stream 中的消息 ID,通常使用 * 號表示自動生成
  • field value: 就是消息的內容,是 K-V 格式的鍵值對

關於使用 XADD 添加元素,還有以下特點:

  • 自動創建流:當 my-stream 流不存在時,redis 會自動創建,然後將元素追加在流的末尾處
  • 任意鍵值對:流中的每個元素可以包含一個或任意多個鍵值對

下麵是一個使用 XADD 命令添加新消息的示例:

XADD my-stream * name John age 30 email [email protected]

上述命令的說明:

  1. 向名為 my-stream 的 Redis stream 中添加了一條新消息。
  2. * 表示使用自動生成的消息 ID,
  3. nameageemail 是消息的欄位名
  4. John30[email protected] 是消息的欄位值。

流元素 ID

XADD 命令在成功執行後會返回元素 ID 作為結果:

"1681138020163-0"

每個元素的 ID 是一個遞增的唯一標識符,由兩部分組成:一個時間戳和一個序列號。

  • 時間戳部分是一個 64 位的有符號整數,以毫秒為單位表示自 Unix 時間起經過的毫秒數。
  • 序列號部分是一個遞增的整數,從 0 開始逐步增加。

為了證明,我們可以指定消息 ID 向指定流中發送一條消息:

XADD my-stream 1681138020163-1 name Mary age 25 email [email protected]

返回結果:

"1681138020163-1"

最後,可以提前使用 XRANGE 指令查看推入流中的數據

XRANGE my-stream - +

返回結果:

1) 1) "1681138020163-0"
   2) 1) "name"
      2) "John"
      3) "age"
      4) "30"
      5) "email"
      6) "[email protected]"
2) 1) "1681138020163-1"
   2) 1) "name"
      2) "Mary"
      3) "age"
      4) "25"
      5) "email"
      6) "[email protected]"

流元素 ID 的限制

元素 ID 在 Redis stream 中扮演著非常重要的角色,它不僅保證了元素的唯一性和順序性,還提供了高效的範圍查詢和分析功能。在使用 Redis stream 時,需要特別註意元素 ID 的限制,並保證 ID 的唯一性和遞增性。

限制如下::

  • ID 必須是唯一的
  • 新元素的 ID 必須比流中所有已有元素的 ID 都要大

還有一些長度和特殊字元的限制等等,不符合上述限制的添加元素操作,會被 redis 拒絕,並且返回一個錯誤等。

最大元素 ID 是如何更新的 ?

在成功執行XADD命令之後,流的最大元素ID也會隨之更新。

為什麼要限制 新元素的 ID 必須比流中所有已有元素的 ID 都要大 ?

限制新元素的 ID 必須比流中所有已有元素的 ID 都要大,是為了保證 stream 中每個元素的唯一性和順序性。這種特性對於使用流實現消息隊列和事件系統的用戶來說是非常重要的:用戶可以確信,新的消息和事件只會出現在已有消息和事件之後,就像現實世界里新事件總是發生在已有事件之後一樣,一切都是有序進行的。

自動生成 ID 的規則

示例開始就演示自動生成消息向流中推送數據,在日常使用非常方便,這裡說一下它的生成規則:

  1. 時間戳部分是當前時間的毫秒數。表示自 Unix 時間起經過的毫秒數
  2. 序列號從 0 開始遞增。序列號是一個 64 位的整數,從 0 開始遞增

限制流長度

流的數據大多只是臨時保存的,如果不對流的長度進行限制,會出現以下情況:

  1. 存儲耗盡:隨著流中消息的增加,占用的記憶體也會相應增加。長時間運行的應用程式可能會面臨記憶體耗盡的風險
  2. 影響性能:隨著數據越多,查詢和操作流的速度會更慢,維護也更困難

為了避免該問題,在使用 Redis stream 時,可以使用 MAXLEN 選項指定 stream 的最大長度,命令格式如下:

XADD stream [MAXLEN len] id field value [field value ...]

示例:

XADD mini-stream MAXLEN 3 * k1 v1
XADD mini-stream MAXLEN 3 * k2 v2
XADD mini-stream MAXLEN 3 * k3 v3
XADD mini-stream MAXLEN 3 * k4 v4

# 我們向一個限制長度為 3 的 `mini-stream` 流中添加 4 條數據,然後查看流內的消息:
XRANGE mini-stream - +
1) 1) "1681140898447-0"
   2) 1) "k2"
      2) "v2"
2) 1) "1681140901790-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1681140906703-0"
   2) 1) "k4"
      2) "v4"

最後會看到最早創建的 k1 消息已經被移除,redis 刪除在流中存在時間最長的元素,從而來保證流的整體長度。

XTRIM 限制流

除了在 XADD 命令時限制流,Redis 還提供單獨限制流長度的 MAXLEN 命令,基礎語法如下:

XTRIM stream MAXLEN len

示例:

XTRIM my-stream MAXLEN 2
(integer) 1

這條命令 XTRIM my-stream MAXLEN 2 的作用是將名為 my-stream 的流修剪為最多包含 2 條消息。換句話說,流中超出這個長度的較舊消息將被移除。

XDEL 移除元素

XDEL 用於從流中刪除特定的消息。這個命令需要提供流的鍵(key)和一個或多個消息 ID 作為參數。當消息被成功刪除時,XDEL 命令會返回被刪除消息的數量。

XDEL 的基本語法如下:

XDEL key ID [ID ...]

示例:

# 這個命令將從名為 `mystream` 的流中刪除消息 ID 為 `1681480521617-0` 的消息。
XDEL my-stream 1681480521617-0
(integer) 1

# 你也可以傳入多個 `id` 參數進行批量刪除
XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0
(integer) 3

註意:,XDEL 不會修改流的長度計數,這意味著刪除消息後,流的長度保持不變。

XLEN 獲取流長度

XLEN 用於獲取流中消息的數量。這個命令非常簡單且高效,因為它只要一個參數。

XLEN 的基本語法如下:

XLEN key

示例:

XLEN my-stream
(integer) 4

註意:XLEN 命令僅返迴流中消息的數量,並不提供消息的具體內容。獲取消息內容的命令,看下麵的 XRANGE

XRANGE 查詢消息

XRANG 主要用於獲取流中的一段連續消息,它還有一個非常相似的 XREVRANGE 命令,區別:

  • XRANGE 按照消息 ID 順序返回結果
  • XREVRANGE 按照消息 ID 逆序返回結果(用來查詢流中最新的消息,非常有用!)

XRANG 的的基本語法如下:

XRANGE key start end [COUNT count]

獲取指定消息

獲取指定消息,我們可以把 startend 設置同一條消息 ID,可以用來達到查詢指定消息 ID 的效果。使用示例:

# 獲取指定消息 ID
XRANGE my-stream 1681480968241-0 1681480968241-0

獲取多條消息

獲取多條消息,可以利用 COUNT 選項參數,使用示例:

# 獲取流中最早的 5 條消息
XRANGE my-stream - + COUNT 5

這條命令獲取流中最早的 5 條消息(按消息 ID 順序排序)。-+ 分別表示最小和最大的消息 ID,用於獲取流中的所有消息。

獲取全部消息

想要讀取流中全部消息內容,移除 COUNT 即可:

# 獲取全部消息
XRANGE my-stream - +

逆序獲取流

XREVRANGE 按照消息 ID 逆序返回結果,基本語法如下:

XREVRANGE key end start [COUNT count]

用法完全和 XRANGE 一樣,這裡就不過多介紹了,使用示例:

XREVRANGE my-stream + - COUNT 5

這個命令將返回名為 mystream 的流中的最新的 3 條消息(按消息 ID 逆序排序)。

XRANGE 的使用場景

在實際業務場景中,可以利用 XRANGEXREVRANGE 命令可以用於實現以下功能:

  • 分頁查詢:通過指定 startendCOUNT 參數,可以實現對流中消息的分頁查詢
  • 實時監控:可以使用這些命令來獲取流中的最新消息,以便在實時監控或分析系統中展示
  • 數據導出:如果需要將流中的數據導出到其他系統或文件中,可以使用這些命令來獲取指定範圍內的消息

XREAD 阻塞讀取流

相比 XRANGE,XREVRANGE 類似,XREAD 也是用於從流中讀取消息的命令,但它們之間有一些關鍵區別:

  • XREAD 支持同時讀取多個流的消息
  • XREAD 支持阻塞模式,可以在新消息到達時候,立即粗處理
  • XREAD 支持 BLOCK 阻塞等待時間參數,控制阻塞時間

XREAD 的阻塞模式,可以更好的構建實時數據處理應用程式,如事件驅動系統、實時分析系統等。

XREAD 命令的基本語法如下:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

查詢模式

查詢的話,除了同時讀取多個流的特點外,其他和 XRANGE,XREVRANGE 類似。

使用示例:

  1. 讀取單個流的消息:
XREAD STREAMS my-stream 0

這個命令將從名為 my-stream 的流中讀取消息,0 代表讀取所有消息,如果指定的消息 ID,表示從該消息 ID 之後開始讀取

  1. 讀取多個流的消息:
XREAD STREAMS my-stream mini-stream 0 0

這個命令將從名為 my-streammini-stream 的流中分別讀取所有消息,後面的 2 個參數 0 分別對應 2 個消息 ID 0 開始的位置

阻塞模式

當使用阻塞模式時,XREAD 命令會在以下幾種情況下表現出不同的行為:

  1. 不會阻塞的情況:找到符合條件的元素會立即返回
  2. 會解除阻塞的情況:超時,或者新消息到達
  3. 一直阻塞的情況:一直阻塞,等待新消息的到達

使用示例:

  1. 不會阻塞的情況

如果流中有滿足條件的消息(即從指定的消息 ID 之後的新消息),那麼 XREAD 命令會立即返回這些消息,不會發生阻塞。

XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
   2) 1) 1) "1681480968241-0"
         2) 1) "k5"
            2) "v5"
  1. 會解除阻塞的情況

XREAD 命令解除阻塞也分 2 情況:超時,新消息到達

示例代碼:

# 超時: 阻塞超時,沒有新消息到達,解除阻塞
XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0
(nil)
(5.09s)

# 新消息到達: 新消息到達,且滿足讀取條件 (新消息的 ID 大於指定的消息 ID) 解除阻塞
XREAD BLOCK 50000 STREAMS my-stream 1681482023346-0
1) 1) "my-stream"
   2) 1) 1) "1681485525804-0"
         2) 1) "newMessage"
            2) "v1"
(18.46s)
  1. 一直阻塞的情況:

如果設置的阻塞等待時間為 0,那麼 XREAD 命令會一直阻塞:

示例代碼:

XREAD BLOCK 0 STREAMS my-stream $

這個命令將一直阻塞等待,直到新消息到達。$ 符號表示只讀取新消息。

當然如果客戶端主動斷開連接,阻塞的 XREAD 命令也會被取消

在實際應用中,XREAD 使用阻塞模式,可以在新消息到達時立即處理,實現實時消息處理。

消費組

在 Redis 流的消息模型中,是通過消費者組(Consumer Group)來組織和管理多個消費者以協同處理來自同一個流的消息的機制。消費者組的主要目的是在多個消費者之間分發消息,實現負載均衡、高可用性和容錯能力。

工作原理:

  1. Stream 將消息分發,所有訂閱的消費者組 Consumer Group 都會收到消息(消費組組共用 stream 的消息)
  2. 消費者組本身不處理消息,而是再將消息分發給消費者,由消費者進行真正的消費(消費者獨占組內的消息)

如圖所示:

graph LR Stream((Stream)) -- messages --> ConsumerGroup1(Consumer Group 1) Stream((Stream)) -- messages --> ConsumerGroup2(Consumer Group 2) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1A(Consumer 1A) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1B(Consumer 1B) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2A(Consumer 2A) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2B(Consumer 2B)

使用消費者組這種模型的設計,以為在 Redis Stream 中實現以下功能:

  • 負載均衡:消費者組可以將消息分發給多個消費者,實現負載均衡
  • 高可用性:在某個消費者發生故障的情況下,仍然可以確保消息被處理
  • 容錯能力:消費者組支持重新處理失敗的消息,這有助於確保消息被可靠地處理

接下來我們再詳細說明消費組相關的命令使用

XGOUP 管理消費組

CREATE 創建消費組

通過 XGROUP 命令可以為你的 Redis Stream 創建和管理消費組。

命令格式如下:

XGROUP CREATE stream group id

參數說明:

  • <stream>:要關聯的流的鍵。
  • <group>:消費組的名稱。
  • <id>:開始讀取消息的起始 ID。通常使用 $ 表示僅消費新消息,或者使用 0 表示消費流中的所有消息。
  • [MKSTREAM](可選):如果流不存在,自動創建一個新的流。

使用示例:

# 創建消費組,如果流不存在則自動創建
XGROUP CREATE mystream mygroup $ MKSTREAM
OK

# 查看流中的消費組
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "0-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

以上命令是使用 XGROUP CREATE 命令創建一個名為 mygroup 的消費組,從最新的消息開始消費,使用 MKSTREAM 選項,如果流不存在則會自動創建流,返回 OK 既代表創建成功。最後使用 XINFO 查看結果。

SETID 修改組的最後消息 ID

在某些情況下,你可能想要消費組忽略某些消息,或者重新處理某些消息來重現 bug,那麼可以使用 XGROUP SETID 命令設置消費組的起始消息 ID。

命令格式非常簡單:

XGROUP SETID stream group id

使用示例:

# 設置 mygroup 組的最新消息為指定 ID
XGROUP SETID mystream mygroup 1681655893911-0
OK

# 查看消費組
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1681655893911-0"		# 已被改變
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 4
   
# 設置 mygroup 組的最新消息為流的最新消息 ID
XGROUP SETID mystream mygroup $

# 查看消費組
127.0.0.1:6379> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1681655916001-0"		# 已更新
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

以上命令將 mygroup 組的最新消息 ID 更新為指定 ID 和流的最新 ID 的使用示例。

XREADGROUP 讀取消息

使用 XREADGROUP 命令讀取消費組裡面的消息,基本語法:

XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...

參數說明

  • <group>:消費組的名稱。
  • <consumer>:消費者的名稱。
  • <n>(可選):要讀取的最大消息數。
  • <ms>(可選):阻塞等待新消息的時間(以毫秒為單位)。
  • <stream_key_1>, <stream_key_2>:要從中讀取消息的流的鍵。
  • <id_1>, <id_2>:從每個流中開始讀取的消息 ID,通常使用特殊字元 > 表示從上次讀取的位置開始讀取新的消息。

使用示例:

我們創建一個 myconsumer 的消費組讀取上面創建 mygroup 消費組的信息,以下是多種用法示例:

# 以 myconsumer 消費者身份從 mystream 中讀取分配給 mygroup 的消息
# 讀取所有最新的消息(常用)
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
(nil)

# 其他用法:
# 讀取最多 10 條消息
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >

# 進行阻塞讀取最新消息
XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >

這裡拿不到數據是因為我們上面把消費組 mygroup 的消息 ID 設置為最新,我們嘗試修改消息 ID 重新消費試試

# 設置消費組的消息 ID,進行重新消費
XGROUP SETID mystream mygroup 1681655893911-0

# 消費組 myconsumer 讀取消費組的消息
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1681655897993-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1681655899297-0"
         2) 1) "k1"
            2) "v1"
      3) 1) "1681655915496-0"
         2) 1) "k1"
            2) "v1"
      4) 1) "1681655916001-0"
         2) 1) "k1"
            2) "v1"
            
# 查看消費組的信息
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 1		# 消費組有一個消費者
    5) "pending"
    6) (integer) 4		# 有 4 條正在處理的消息
    7) "last-delivered-id"
    8) "1681655916001-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

通過以上命令可以確認,myconsumer 消費者拿到 mygroup 消費組的消息未確認處理,所以看到有 4 條消息正在等待處理中。

XPENDING 查看消息

通過 XPENDING 命令,可以獲取指定流的指定消費者組目前的待處理消息的相關信息。在很多場景下,你需要通過它來觀察和瞭解消費者的處理情況,從而做出處理,例如以下場景:

  • 您可以獲取消費組中掛起(未確認)的消息信息,從而瞭解消費者處理消息的速度和效率
  • 如果某個消費者的掛起消息數量不斷增加,或者某些消息長時間未被處理,可能表明該消費者存在問題
  • 在高併發或高負載的情況下,消費者可能無法及時處理所有消息。通過 XPENDING 命令檢測到積壓消息
  • 通過定期運行 XPENDING 命令,您可以在發現掛起消息數量超過預設閾值時觸發報警

基本語法:

XPENDING stream group [start stop count] [consumer]

參數說明

  • <stream>:流的鍵。
  • <group>:消費組的名稱。
  • <start>(可選):掛起消息範圍的起始 ID。
  • <stop>(可選):掛起消息範圍的結束 ID。
  • <count>(可選):返回的最大掛起消息數。
  • <consumer>(可選):篩選特定消費者的掛起消息。

使用示例:

使用 XPENDING 命令查看上面的 mygroup 組的消息去哪兒了:

XPENDING mystream mygroup
1) (integer) 4			# 待處理消息數量
2) "1681655897993-0"	# 首條消息 ID
3) "1681655916001-0"	# 最後一條消息的 ID
4) 1) 1) "myconsumer"	# 各消費者正在處理的消息數量
      2) "4"

以上展示的彙總信息,你還可以通過以下命令,查看待處理消息更詳細的信息:

# 查看指定待處理消息
XPENDING mystream mygroup 1681655897993-0 1681655897993-0 1
1) 1) "1681655897993-0"		# 消息 ID
   2) "myconsumer"			# 所屬消費者
   3) (integer) 2397387		# 最後一次投遞時間
   4) (integer) 1			# 投遞次數

從以上信息你可以看到消息正在被誰處理和處理的時間,你也可以指定消費者查看信息:

XPENDING mystream mygroup - + 10 myconsumer
1) 1) "1681655897993-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
2) 1) "1681655899297-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
3) 1) "1681655915496-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
4) 1) "1681655916001-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1

以上命令列出 myconsumer 消費者所有待處理的消息的詳細信息

XACK 處理消息

XACK 用於確認消費組中的特定消息已被處理。在消費者成功處理消息後,應使用 XACK 命令通知 Redis,以便從消費組的掛起消息列表中移除該消息。

命令格式:

XACK stream group id [id id ...]

使用示例:

通過 XACK 命令,我們將上面 myconsumer 消費者的消息進行確認處理:

# 確認消息
XACK mystream mygroup 1681655897993-0
(integer) 1
# .....

當消費者對所有消息進行處理後,再查看消費組內容進行驗證:

XPENDING mystream mygroup - + 10 myconsumer
(empty array)

XPENDING mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

使用 XACK 可以確保消息不會重覆處理防止其他消費者或相同消費者在故障恢復後重覆處理該消息等等好處。

XCLAIM 消息轉移

XCLAIM 消息轉移類似我們生活中的呼叫轉移,當一個消費者無法處理某個消息或出現故障時,XCLAIM 可以確保其他消費者接管並處理這些消息。命令格式非常簡單:

XCLAIM stream group new_consumer max_pending_time id [id id id]

使用示例:

# 使用 XPENDING 命令查詢消費組中掛起的消息
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
      2) "2"
      
# 使用 XCLAIM 命令將消息轉移
XCLAIM mystream mygroup myconsumer2 10000 1681660259887-0
1) 1) "1681660259887-0"			# 被轉移的消息 ID
   2) 1) "k1"					# 消息內容
      2) "v1"

上面的命令意思是:如果消息 ID 1681660259887-0 處理時間超過 10000ms,那麼消息轉移給 myconsumer2,我們使用 XPENDING 命令來驗證:

XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
      2) "1"
   2) 1) "myconsumer2"
      2) "1"

XINFO 查看流和組信息

XINFO 用於獲取流或消費組的詳細信息。XINFO 命令有多個子命令,可以提供不同類型的信息。

以下是一些常用的 XINFO 子命令及其介紹:

XINFO STREAM:此子命令用於獲取流的詳細信息,包括長度、消費組數量、第一個和最後一個條目等。例如:

XINFO STREAM mystream

XINFO GROUPS:此子命令用於獲取流中消費組的列表及其相關信息。例如:

XINFO GROUPS mystream

XINFO CONSUMERS:此子命令用於獲取消費組中消費者的列表及其相關信息。例如:

XINFO CONSUMERS mystream mygroup

通過使用這些子命令,您可以瞭解流、消費組和消費者的狀態,從而監控和優化 Redis Stream 應用程式的性能。在處理問題或分析系統性能時,這些信息可能特別有用。

刪除操作

刪除消費者

當用戶不再需要某個消費者的時候,可以通過執行以下命令將其刪除,命令格式:

XGROUP DELCONSUMER stream group consumer

使用示例:

# 刪除 myconsumer 消費者
XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
刪除消費組

當你不需要消費組時,可以通過以下命令刪除它,命令格式:

XGROUP DESTROY stream group

使用示例:

# 刪除 mygroup 消費組
XGROUP DESTROY mystream mygroup
(integer) 1

總結

以下是本篇文章涉及的 Redis Stream 命令命令和簡要總結:

  1. XADD:向流中添加新的消息。
  2. XREAD:從流中讀取消息。
  3. XREADGROUP:從消費組中讀取消息。
  4. XRANGE:根據消息 ID 範圍讀取流中的消息。
  5. XREVRANGE:與 XRANGE 類似,但以相反順序返回結果。
  6. XDEL:從流中刪除消息。
  7. XTRIM:根據 MAXLEN 參數修剪流的長度。
  8. XLEN:獲取流的長度。
  9. XGROUP:管理消費組,包括創建、刪除和修改。
  10. XACK:確認消費組中的消息已被處理。
  11. XPENDING:查詢消費組中掛起(未確認)的消息。
  12. XCLAIM:將掛起的消息從一個消費者轉移到另一個消費者。
  13. XINFO:獲取流、消費組或消費者的詳細信息。

這些命令提供了對 Redis Stream 的全面操作支持,包括添加、刪除、讀取、修剪消息以及管理消費組和消費者。通過熟練使用這些命令,您可以實現高效且可擴展的消息傳遞和日誌處理系統。edis Stream 是 Redis 提供的一種強大、持久且可擴展的數據結構,用於實現消息傳遞和日誌處理等場景。Stream 數據結構類似於日誌文件,消息以有序的方式存儲在流中,同時還支持消費組的概念,允許多個消費者並行處理消息。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近由於項目需要,需要給每個用戶分配一個充幣地址,考慮到錢包安全性和方便管理,於是自己動手寫了一個本地網頁版的錢包,附上源代碼跟大家交流下。 Github 源代碼地址 錢包和項目是分離的,項目通過鑒權訪問錢包的介面,主要實現了以下功能: 1、可以導入助記詞、私鑰,也可以隨機生成臨時私鑰; 2、一套助 ...
  • 包 CommunityToolkit.Mvvm (又名 MVVM 工具包,以前名為 Microsoft.Toolkit.Mvvm) 是一個現代、快速且模塊化的 MVVM 庫。 它是 .NET 社區工具包的一部分,圍繞以下原則構建: 平臺和運行時獨立 - .NET Standard 2.0、 .NET ...
  • 作者:盧文雙 資深資料庫內核研發 去年年底通過微信公眾號【資料庫內核】設定了一個目標——2023 年要寫一系列 特性介紹+內核解析 的文章(現階段還是以 MySQL 為主)。 雖然關註者很少,但本著“說到就要做到”的原則,從這篇就開始了。 序言: 以前對 MySQL 測試框架 MTR 的使用,主要集 ...
  • Redis命令 1.Redis數據結構介紹 Redis是一個key-value的資料庫,key一般是String類型,value的類型多種多樣,value常見的八種類型: Redis支持五種基本的數據類型:string(字元串),hash(哈希),list(列表),set(集合)及zset(sort ...
  • 影響MySQL查詢性能的因素有很多,我們經常會對查詢語句、索引欄位做一些優化,而其實在表設計的階段就可能產生一些問題。對於表設計,可以對錶結構進行優化,也可以對錶欄位進行優化。以下通過一個具體的案例演示一些常用的表設計優化的方法。 一、業務需求 這裡,就以學生-教師-課程業務作為示例。資料庫需要存放 ...
  • 通過ESLint 檢測 JS/TS 代碼、Prettier 格式化代碼、Stylelint 檢測 CSS/SCSS 代碼和配置 EditorConfig 來實現前端代碼規範約束和統一。 ...
  • “我苦心鍛煉了三年,我變禿了,也變強了。” —— 琦玉老師 0x00 大綱 0x01 前言 四個月前,我在《你是來找茬的吧?對自己的博客進行調優》一文中探討了以博客的使用者而不是開發者身份去進行優化,究竟能做到何種程度的問題。當時以 Edge 瀏覽器的開發者工具里的 lighthouse 評分和載入 ...
  • 並不是所有的場景都適合微服務,我理解技術開發者都有一顆追求新技術的心,但是更重要的是業務場景及團隊。 關於微服務 微服務架構,說白了就是一種上層體系的演變。從最早的單體架構,到前後分離,SOA,甚至微服務架構,其實它們都在做一件事,並且都朝著一個方向去發展:那就是分而治之!從簡! 分而治之有什麼好處 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...