**原文鏈接:** [](https://mp.weixin.qq.com/s/--AdUcwOQyP6r5W8ziVwUg) 上一篇文章介紹了 [如何實現計數器限流?](https://mp.weixin.qq.com/s/CTemkZ2aKPCPTuQiDJri0Q)主要有兩種實現方式,分別是固 ...
上一篇文章介紹了 如何實現計數器限流?主要有兩種實現方式,分別是固定視窗和滑動視窗,並且分析了 go-zero 採用固定視窗方式實現的源碼。
但是採用固定視窗實現的限流器會有兩個問題:
- 會出現請求量超出限制值兩倍的情況
- 無法很好處理流量突增問題
這篇文章來介紹一下令牌桶演算法,可以很好解決以上兩個問題。
工作原理
演算法概念如下:
- 令牌以固定速率生成;
- 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多餘的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執行;
- 如果桶空了,那麼嘗試取令牌的請求會被直接丟棄。
令牌桶演算法既能夠將所有的請求平均分佈到時間區間內,又能接受伺服器能夠承受範圍內的突發請求,因此是目前使用較為廣泛的一種限流演算法。
源碼實現
源碼分析我們還是以 go-zero 項目為例,首先來看生成令牌的部分,依然是使用 Redis 來實現。
// core/limit/tokenlimit.go
// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 當前時間戳
local now = tonumber(ARGV[3])
// 請求數量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填滿
local fill_time = capacity/rate
// 向下取整,ttl 為填滿時間 2 倍
local ttl = math.floor(fill_time*2)
// 當前桶剩餘容量,如果為 nil,說明第一次使用,賦值為桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
last_tokens = capacity
end
// 上次請求時間戳,如果為 nil 則賦值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
last_refreshed = 0
end
// 距離上一次請求的時間跨度
local delta = math.max(0, now-last_refreshed)
// 距離上一次請求的時間跨度能生成的 token 數量和桶內剩餘 token 數量的和
// 與桶容量比較,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判斷請求數量和桶內 token 數量的大小
local allowed = filled_tokens >= requested
// 被請求消耗掉之後,更新剩餘 token 數量
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新時間
redis.call("setex", KEYS[2], ttl, now)
return allowed`
Redis 中主要保存兩個 key,分別是 token 數量和刷新時間。
核心思想就是比較兩次請求時間間隔內生成的 token 數量 + 桶內剩餘 token 數量,和請求量之間的大小,如果滿足則允許,否則則不允許。
限流器初始化:
// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
// 生成 token 速率
rate int
// 桶容量
burst int
store *redis.Redis
// 桶 key
tokenKey string
// 桶刷新時間 key
timestampKey string
rescueLock sync.Mutex
// redis 健康標識
redisAlive uint32
// redis 健康監控啟動狀態
monitorStarted bool
// 內置單機限流器
rescueLimiter *xrate.Limiter
}
// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
tokenKey := fmt.Sprintf(tokenFormat, key)
timestampKey := fmt.Sprintf(timestampFormat, key)
return &TokenLimiter{
rate: rate,
burst: burst,
store: store,
tokenKey: tokenKey,
timestampKey: timestampKey,
redisAlive: 1,
rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
}
}
其中有一個變數 rescueLimiter
,這是一個進程內的限流器。如果 Redis 發生故障了,那麼就使用這個,算是一個保障,儘量避免系統被突發流量拖垮。
提供了四個可調用方法:
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
return lim.AllowNCtx(ctx, time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(context.Background(), now, n)
}
// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
return lim.reserveN(ctx, now, n)
}
最終調用的都是 reverveN
方法:
func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
// 判斷 Redis 健康狀態,如果 Redis 故障,則使用進程內限流器
if atomic.LoadUint32(&lim.redisAlive) == 0 {
return lim.rescueLimiter.AllowN(now, n)
}
// 執行限流腳本
resp, err := lim.store.EvalCtx(ctx,
script,
[]string{
lim.tokenKey,
lim.timestampKey,
},
[]string{
strconv.Itoa(lim.rate),
strconv.Itoa(lim.burst),
strconv.FormatInt(now.Unix(), 10),
strconv.Itoa(n),
})
// redis allowed == false
// Lua boolean false -> r Nil bulk reply
if err == redis.Nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logx.Errorf("fail to use rate limiter: %s", err)
return false
}
if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
// 如果有異常的話,會啟動進程內限流
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
code, ok := resp.(int64)
if !ok {
logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
// redis allowed == true
// Lua boolean true -> r integer reply with value of 1
return code == 1
}
最後看一下進程內限流的啟動與恢復:
func (lim *TokenLimiter) startMonitor() {
lim.rescueLock.Lock()
defer lim.rescueLock.Unlock()
// 需要加鎖保護,如果程式已經啟動了,直接返回,不要重覆啟動
if lim.monitorStarted {
return
}
lim.monitorStarted = true
atomic.StoreUint32(&lim.redisAlive, 0)
go lim.waitForRedis()
}
func (lim *TokenLimiter) waitForRedis() {
ticker := time.NewTicker(pingInterval)
// 更新監控進程的狀態
defer func() {
ticker.Stop()
lim.rescueLock.Lock()
lim.monitorStarted = false
lim.rescueLock.Unlock()
}()
for range ticker.C {
// 對 redis 進行健康監測,如果 redis 服務恢復了
// 則更新 redisAlive 標識,並退出 goroutine
if lim.store.Ping() {
atomic.StoreUint32(&lim.redisAlive, 1)
return
}
}
}
以上就是本文的全部內容,如果覺得還不錯的話歡迎點贊,轉發和關註,感謝支持。
參考文章:
推薦閱讀: