go-zero 是如何實現令牌桶限流的?

来源:https://www.cnblogs.com/alwaysbeta/archive/2023/08/11/17621475.html
-Advertisement-
Play Games

**原文鏈接:** [](https://mp.weixin.qq.com/s/--AdUcwOQyP6r5W8ziVwUg) 上一篇文章介紹了 [如何實現計數器限流?](https://mp.weixin.qq.com/s/CTemkZ2aKPCPTuQiDJri0Q)主要有兩種實現方式,分別是固 ...


原文鏈接:

上一篇文章介紹了 如何實現計數器限流?主要有兩種實現方式,分別是固定視窗和滑動視窗,並且分析了 go-zero 採用固定視窗方式實現的源碼。

但是採用固定視窗實現的限流器會有兩個問題:

  1. 會出現請求量超出限制值兩倍的情況
  2. 無法很好處理流量突增問題

這篇文章來介紹一下令牌桶演算法,可以很好解決以上兩個問題。

工作原理

演算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多餘的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執行;
  • 如果桶空了,那麼嘗試取令牌的請求會被直接丟棄。

令牌桶演算法既能夠將所有的請求平均分佈到時間區間內,又能接受伺服器能夠承受範圍內的突發請求,因此是目前使用較為廣泛的一種限流演算法。

源碼實現

源碼分析我們還是以 go-zero 項目為例,首先來看生成令牌的部分,依然是使用 Redis 來實現。

// core/limit/tokenlimit.go

// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 當前時間戳
local now = tonumber(ARGV[3])
// 請求數量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填滿
local fill_time = capacity/rate
// 向下取整,ttl 為填滿時間 2 倍
local ttl = math.floor(fill_time*2)
// 當前桶剩餘容量,如果為 nil,說明第一次使用,賦值為桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end

// 上次請求時間戳,如果為 nil 則賦值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end

// 距離上一次請求的時間跨度
local delta = math.max(0, now-last_refreshed)
// 距離上一次請求的時間跨度能生成的 token 數量和桶內剩餘 token 數量的和
// 與桶容量比較,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判斷請求數量和桶內 token 數量的大小
local allowed = filled_tokens >= requested
// 被請求消耗掉之後,更新剩餘 token 數量
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end

// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新時間
redis.call("setex", KEYS[2], ttl, now)

return allowed`

Redis 中主要保存兩個 key,分別是 token 數量和刷新時間。

核心思想就是比較兩次請求時間間隔內生成的 token 數量 + 桶內剩餘 token 數量,和請求量之間的大小,如果滿足則允許,否則則不允許。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
    // 生成 token 速率
    rate           int
    // 桶容量
    burst          int
    store          *redis.Redis
    // 桶 key
    tokenKey       string
    // 桶刷新時間 key
    timestampKey   string
    rescueLock     sync.Mutex
    // redis 健康標識
    redisAlive     uint32
    // redis 健康監控啟動狀態
    monitorStarted bool
    // 內置單機限流器
    rescueLimiter  *xrate.Limiter
}

// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

其中有一個變數 rescueLimiter,這是一個進程內的限流器。如果 Redis 發生故障了,那麼就使用這個,算是一個保障,儘量避免系統被突發流量拖垮。

提供了四個可調用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
    return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
    return lim.reserveN(ctx, now, n)
}

最終調用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
    // 判斷 Redis 健康狀態,如果 Redis 故障,則使用進程內限流器
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }

    // 執行限流腳本
    resp, err := lim.store.EvalCtx(ctx,
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    if err == redis.Nil {
        return false
    }
    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
        logx.Errorf("fail to use rate limiter: %s", err)
        return false
    }
    if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 如果有異常的話,會啟動進程內限流
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

最後看一下進程內限流的啟動與恢復:

func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()

    // 需要加鎖保護,如果程式已經啟動了,直接返回,不要重覆啟動
    if lim.monitorStarted {
        return
    }

    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)

    go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 更新監控進程的狀態
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // 對 redis 進行健康監測,如果 redis 服務恢復了
        // 則更新 redisAlive 標識,並退出 goroutine
        if lim.store.Ping() {
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

以上就是本文的全部內容,如果覺得還不錯的話歡迎點贊轉發關註,感謝支持。


參考文章:

推薦閱讀:


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 來源:blog.csdn.net/zjhred/article/details/84976734 在文章的開頭,先說下NPE問題,NPE問題就是,我們在開發中經常碰到的NullPointerException。假設我們有兩個類,他們的UML類圖如下圖所示 ![](https://img2023.cn ...
  • HotSpot的演算法實現 HotSpot的演算法實現概要 1、枚舉根節點 由於目前的主流Java虛擬機使用的都是準確式GC(這個概念在第1章介紹Exact VM對Classic VM的改進時講過),所以當執行系統停頓下來後,並不需要一個不漏地檢查完所有執行上下文和全局的引用位置,虛擬機應當是有辦法直接 ...
  • 問題:在Windows環境下部署java的jar包,若有多個服務同時啟動,很難找到相應服務重啟。每次都重啟全部服務很麻煩。應用場景大多用於部署測試。 適用:jar部署,war部署不適用。 解決方案:找到相應jar服務關閉並重啟。 註意: 1、正確設置埠,jar服務運行的埠; 2、正確設置jar文 ...
  • 數據類型是編程語言中的一個重要概念,它定義了數據的類型和提供了特定的操作和方法。在 python 中,數據類型的作用是將不同類型的數據進行分類和定義,例如數字、字元串、列表、元組、集合、字典等。這些數據類型不僅定義了數據的類型,還為數據提供了一些特定的操作和方法,例如字元串支持連接和分割,列表支持排... ...
  • ## 依賴導入 ```xml org.hibernate.orm hibernate-core 6.2.7.Final com.mysql mysql-connector-j 8.0.33 ``` ## 配置文件 ```xml com.mysql.cj.jdbc.Driver jdbc:mysql: ...
  • 在實際開發過程中,我們可能會遇到併發寫文件的場景,如果處理不當很可能出現文件內容亂序問題。下麵我們通過一個示常式序描述這一過程並給出解決該問題的方法。 ...
  • # 前言 在面試這一篇我們介紹過[CountDownLatch和CyclicBarrier](https://github.com/jmilktea/jtea/blob/master/%E9%9D%A2%E8%AF%95/CountDownLatch%E5%92%8CCyclicBarrier.md ...
  • 按照業務拆分的方式稱為垂直分片,又稱為縱向拆分,它的核心理念是專庫專用;水平分片又稱為橫向拆分,是通過某些欄位根據某種規則將數據分散至多個庫或表中。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...