# Go Redis 管道和事務之 go-redis ## [Go Redis 管道和事務官方文檔介紹](https://redis.uptrace.dev/zh/guide/go-redis-pipelines.html) Redis pipelines(管道) 允許一次性發送多個命令來提高性能, ...
Go Redis 管道和事務之 go-redis
Go Redis 管道和事務官方文檔介紹
Redis pipelines(管道) 允許一次性發送多個命令來提高性能,go-redis支持同樣的操作, 你可以使用go-redis一次性發送多個命令到伺服器,並一次讀取返回結果,而不是一個個命令的操作。
Go Redis 管道和事務: https://redis.uptrace.dev/zh/guide/go-redis-pipelines.html
#管道
通過 go-redis Pipeline
一次執行多個命令並讀取返回值:
pipe := rdb.Pipeline()
incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)
cmds, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
// 結果你需要再調用 Exec 後才可以使用
fmt.Println(incr.Val())
或者你也可以使用 Pipelined
方法,它將自動調用 Exec:
var incr *redis.IntCmd
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
incr = pipe.Incr(ctx, "pipelined_counter")
pipe.Expire(ctx, "pipelined_counter", time.Hour)
return nil
})
if err != nil {
panic(err)
}
fmt.Println(incr.Val())
同時會返回每個命令的結果,你可以遍歷結果集:
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Get(ctx, fmt.Sprintf("key%d", i))
}
return nil
})
if err != nil {
panic(err)
}
for _, cmd := range cmds {
fmt.Println(cmd.(*redis.StringCmd).Val())
}
#Watch 監聽
使用 Redis 事務, 監聽key的狀態,僅當key未被其他客戶端修改才會執行命令, 這種方式也被成為 樂觀鎖。
Redis 事務:https://redis.io/docs/manual/transactions/
樂觀鎖:
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC
#事務
你可以使用 TxPipelined
和 TxPipeline
方法,把命令包裝在 MULTI
、 EXEC
中, 但這種做法沒什麼意義:
cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Get(ctx, fmt.Sprintf("key%d", i))
}
return nil
})
if err != nil {
panic(err)
}
// MULTI
// GET key0
// GET key1
// ...
// GET key99
// EXEC
你應該正確的使用 Watch + 事務管道, 比如以下示例,我們使用 GET
, SET
和 WATCH
命令,來實現 INCR
操作, 註意示例中使用 redis.TxFailedErr
來判斷失敗:
const maxRetries = 1000
// increment 方法,使用 GET + SET + WATCH 來實現Key遞增效果,類似命令 INCR
func increment(key string) error {
// 事務函數
txf := func(tx *redis.Tx) error {
// // 獲得當前值或零值
n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
return err
}
n++ // 實際操作
// 僅在監視的Key保持不變的情況下運行
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, n, 0)
return nil
})
return err
}
for i := 0; i < maxRetries; i++ {
err := rdb.Watch(ctx, txf, key)
if err == nil {
// Success.
return nil
}
if err == redis.TxFailedErr {
// 樂觀鎖失敗
continue
}
return err
}
return errors.New("increment reached maximum number of retries")
}
Go Redis 管道和事務 實操
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
// 聲明一個全局的 rdb 變數
var rdb *redis.Client
// 初始化連接
func initRedisClient() (err error) {
// NewClient將客戶端返回給Options指定的Redis Server。
// Options保留設置以建立redis連接。
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 沒有密碼,預設值
DB: 0, // 預設DB 0 連接到伺服器後要選擇的資料庫。
PoolSize: 20, // 最大套接字連接數。 預設情況下,每個可用CPU有10個連接,由runtime.GOMAXPROCS報告。
})
// Background返回一個非空的Context。它永遠不會被取消,沒有值,也沒有截止日期。
// 它通常由main函數、初始化和測試使用,並作為傳入請求的頂級上下文
ctx := context.Background()
_, err = rdb.Ping(ctx).Result()
if err != nil {
return err
}
return nil
}
// watchDemo 在key值不變的情況下將其值+1
func watchKeyDemo(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// Watch準備一個事務,並標記要監視的密鑰,以便有條件執行(如果有密鑰的話)。
// 當fn退出時,事務將自動關閉。
// func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string)
return rdb.Watch(ctx, func(tx *redis.Tx) error {
// Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
// 獲取 Key 的值 n
n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
fmt.Printf("redis get failed, err: %v\n", err)
return err
}
// 假設操作耗時5秒
// 5秒內我們通過其他的客戶端修改key,當前事務就會失敗
time.Sleep(5 * time.Second)
// txpipeline 執行事務中fn隊列中的命令。
// 當使用WATCH時,EXEC只會在被監視的鍵沒有被修改的情況下執行命令,從而允許檢查和設置機制。
// Exec總是返回命令列表。如果事務失敗,則返回TxFailedErr。否則Exec返回第一個失敗命令的錯誤或nil
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// 業務邏輯 如果 Key 沒有變化,則在原來的基礎上加 1
pipe.Set(ctx, key, n+1, time.Hour)
return nil
})
return err
}, key)
}
func main() {
if err := initRedisClient(); err != nil {
fmt.Printf("initRedisClient failed: %v\n", err)
return
}
fmt.Println("initRedisClient started successfully")
defer rdb.Close() // Close 關閉客戶端,釋放所有打開的資源。關閉客戶端是很少見的,因為客戶端是長期存在的,併在許多常式之間共用。
err := watchKeyDemo("watch_key")
if err != nil {
fmt.Printf("watchKeyDemo failed: %v\n", err)
return
}
fmt.Printf("watchKeyDemo succeeded!\n")
}
運行
Code/go/redis_demo via