流量管制-令牌桶與漏桶

来源:https://www.cnblogs.com/Cylon/archive/2022/06/15/16379709.html
-Advertisement-
Play Games

Principle of token bucket 隨著互聯網的發展,在處理流量的方法也不僅僅為 first-come,first-served,而在共用網路中實現流量管理的基本機制就是排隊。而公平演算法則是實現在優先順序隊列中基於哪些策略來排隊的”公平隊列“。Token Bucket 則是為公平排隊提 ...


Principle of token bucket

隨著互聯網的發展,在處理流量的方法也不僅僅為 first-come,first-served,而在共用網路中實現流量管理的基本機制就是排隊。而公平演算法則是實現在優先順序隊列中基於哪些策略來排隊的”公平隊列“Token Bucket 則是為公平排隊提供了替代方案。Fair Queue 與 Token Bucket的區別主要在,對於Fair Queue來講,如果請求者目前空閑,Queue會將該請求者的帶寬分配給其他請求者;而 Token Bucket 則是分配給請求者的帶寬是帶寬的上限。

通過例子瞭解演算法原理

假設出站帶寬是 4個數據包/ms,此時有一個需求為,為一個特定的發送端 A 來分配 1個數據包/ms的帶寬。此時可以使用公平排隊的方法分給發送 A 25%的帶寬。

此時存在的問題是我們希望可以靈活地允許 A 的數據包以無規則的時間間隔發送。例如假設 A 在每個數據包發送後等待1毫秒後再開始下一個數據包的發送。

  • sence1:此時假設 A 以 1ms 的間隔去發送數據包,而由於某種原因導致應該在 t=6 到達的數據包卻在 t=6.5 到達。隨後的數據包在 t=7 準時到達,在這種情況下是否應該保留到t=7.5?
  • sence2:或者是否允許在 t=6.5 發送一個遲到的數據包,在 t=7 發送下一個數據包,此時理論上平均速率仍然還是 1 個數據包/ms?

顯然sence2是合理的,這個場景的解決方法就是令牌桶演算法,規定 A 的配額,允許指定平均速率和突發容量。當數據包不符合令牌桶規範,那麼就認為其不合理,此時會做出一下相應:

  • delay,直到桶準備好
  • drop
  • mark,標記為不合規的數據包

delay 被稱為 整形 shaping , shaping 是指在某個時間間隔內發送超過 Bc(Committed Burst)的大小,Bc 在這裡指桶的尺寸。由於數據流量是突發性的,當在一段時間內不活動後,再次激活後的在一個間隔內發送的數量大於 Bc ,那麼額外的流量被稱為Be (burst excess)。

將流量丟棄或標記超額流量,保持在一個流量速率限制稱為 管制 policing

Definition

令牌桶的定義是指,有一個桶,以穩定的速度填充令牌;桶中的任何一個溢出都會被丟棄。當要發送一個數據包,需要能夠從桶中取出一個令牌;如果桶是空的那麼此時數據包是不合規的數據包,必須進行 delay , drop , mark 操作。如果桶是滿的,則會發送與桶容量相對應的突發(短時間內的高帶寬傳輸),這是桶是空的。

令牌桶的規範:\(TB(r,B_{max})\)

  • \(r\) :r個token每秒的令牌填充率,表示桶填充令牌的速率
  • \(B\) :桶容量,\(B_{mac} > 0\)

那麼公式則表示,桶以指定的速率填充令牌,最大為 \(B_{max}\) 。這就說明瞭為了使大小為 S 的數據包合規,桶內必須至少有 S 個令牌,即 \(B \ge S\),否則數據包不合規,在發送時,桶為 \(B=B-S\)

image

Examples

場景1:假設令牌桶規範為 \(TB(\frac{1}{3}\ packet/ms, 4\ packet)\),桶最初是滿的,數據包在以下時間到達 [0, 0, 0, 2, 3, 6, 9, 12]

在處理完所有 T=0 的數據包後,桶中還剩 1 個令牌。到第四個數據包 T=2 到達時,桶內已經有1個令牌 + \(\frac{2}{3}\) 個令牌;當發送完第四個數據包時,桶內令牌數為 \(\frac{2}{3}\) 。到 T=3 數據包時,桶內令牌為1,滿足發送第 5 個數據包。萬松完成後桶是空的,在後面 6 9 12時,都滿足3/ms 一個數據包,都可以發送成功

場景2:另外一個實例,在同樣的令牌桶規範下 \(TB(\frac{1}{3}, 4)\),數據包到達時間為 [0, 0, 0, 0, 12, 12, 12, 12, 24, 24, 24, 24] ,可以看到在這個場景下,數據到達為3個突發,每個突發4個數據包,此時每次發送完成後桶被清空,當再次填滿時需要12ms,此時另外一組突發達。故這組數據是合規的。、

場景3:在同樣的令牌桶規範下 \(TB(\frac{1}{3}, 4)\),數據包到達時間為 [0, 1, 2, 3, 4, 5] , 這組數據是不合規的

用表格形式表示如下:

數據包到達時間 0 1 2 3 4 5
發送前桶內令牌 4 3 \(\frac{1}{3}\) 2 \(\frac{2}{3}\) 2 1 \(\frac{1}{3}\) \(\frac{2}{3}\)
發送後桶內令牌 3 2 \(\frac{1}{3}\) 1 \(\frac{2}{3}\) 1 \(\frac{1}{3}\) \(\frac{2}{3}\)

如果一個數據包在桶中沒有足夠的令牌來發送它時到達,可以進行整形或管制,整形使數據包等到足夠的令牌積累。管制會丟棄數據包。或者發送方可以立即發送數據包,但將其標記為不合規。

Principle of leaky bucket

漏桶 (leaky bucket)是一種臨時存儲可變數量的請求並將它們組織成設定速率輸出的數據包的方法。漏桶的概念與令牌桶比起是相反的,漏桶可以理解為是一個具有恆定服務時間的隊列。

由下圖可以看出,漏桶的概念是一個底部有孔的桶。無論水進入桶的速度是多少,它都會以恆定的速度通過孔從桶中泄漏出來。如果桶中沒有水,則流速為零,如果桶已滿,則多餘的水溢出並丟失。

image

和令牌桶一樣,漏桶用於流量整形和流量管制

Difference between Token and Leaky

Leaky Token
桶中存放的是所有到達的數據包,必須入桶 桶中存放的是定期生成的令牌
桶以恆定速率泄漏 桶有最大容量 \(B_{max}\)
突發流量入桶轉換為恆定流量發送 發送數據包需要小號對應的token

token較leaky的優勢

  • 在令牌桶中,如果桶已滿,處理的方式有 shaping和policing兩種模型三種方式(延遲、丟棄、標記),而漏桶中的流量僅為shaping。
    • 通俗來說,就是令牌桶已滿,丟棄的是令牌,漏桶中丟棄的則是數據包
  • 令牌桶可以更快的速率發送大突發流量,而漏桶僅是恆定速率

Implementation with go

Token

在golang中,內置的 rate 包實現了一個令牌桶演算法,通過 rate.NewLimiter(r,B) 進行構造。與公式\(TB(r,B_{max})\) 意思相同。

type Limiter struct {
	limit Limit // 向桶中放置令牌的速率
	burst int // 桶的容量
	mu     sync.Mutex
	tokens float64 // 可用令牌容量
	last time.Time // 上次放入token的時間
	lastEvent time.Time
}

Limiter中帶有三種方法, AllowReserveWait 分別表示Token Bucket中的 shapingpolicing

  • Allow:丟棄超過速率的事件,類似 drop
  • Wait:等待,直到獲取到令牌或者取消或deadline/timeout
  • Reserve:等待或減速,不丟棄事件,類似於 delay

Reserve/ReserveN

  • Reserve() 返回了 ReserveN(time.Now(), 1)
  • ReserveN() 無論如何都會返回一個 Reservation,指定了調用者在 n 個事件發生之前必須等待多長時間。
  • Reservation 是一個令牌桶事件信息
  • Reservation 中的 Delay() 方法返回了需要等待的時間,如果時間為0則不需要等待
  • Reservation 中的 Cancel() 將取消等待

wait/waitN

Allow/AllowN

  • 在獲取不到令牌是丟棄對應的事件
  • 返回的是一個 reserveN() 拿到token是合規的,並消耗掉token

AllowN 為截止到某一時刻,當前桶內桶中數目是否至少為 n 個,滿足則返回 true,同時從桶中消費 n 個 token。反之不消費 Token,false。

func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok // 由於僅需要一個合規否,顧合規的通過,不合規的丟棄
}

reserveN() 是三個行為的核心,AllowN中指定的為 0 ,因為 maxFutureReserve 是最大的等待時間,AllowN給定的是0,即如果突發大的情況下丟棄額外的 Bc

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	// 這裡拿到的是now,上次更新token時間和桶內token數量
	now, last, tokens := lim.advance(now)
	// 計算剩餘的token
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// 確定是否合規,n是token
    // token 的數量要小於桶的容量,並且 等待時間小於最大等待時間
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

在reserveN中調用了一個 advance() 函數,

func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
   last := lim.last
   if now.Before(last) { // 計算上次放入token是否在傳入now之前
      last = now
   }

   // 當 last 很舊時,避免在下麵進行 delta 溢出。
   // maxElapsed 計算裝滿需要多少時間
   maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
   elapsed := now.Sub(last) // 上次裝入到現在的時差
   if elapsed > maxElapsed { // 上次如果放入token時間超長,就讓他與裝滿時間相等
      elapsed = maxElapsed // 即,讓桶為滿的
   }

   // 裝桶的動作,下麵函數表示,elapsed時間內可以生成多少個token
   delta := lim.limit.tokensFromDuration(elapsed)
   tokens := lim.tokens + delta // 當前的token
   if burst := float64(lim.burst); tokens > burst {
      tokens = burst // 這裡表示token溢出,讓他裝滿就好
   }

   return now, last, tokens
}

wait/waitN

  • 桶內令牌可以>N時,返回,在獲取不到令牌是阻塞,等待context取消或者超時
  • 返回的是一個 reserveN() 拿到token是合規的,並消耗掉token
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	if n > lim.burst && lim.limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
	}
	// 外部已取消
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	// Determine wait limit
	now := time.Now()
	waitLimit := InfDuration
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(now)
	}
	// 三個方法的核心,這裡給定了deatline
	r := lim.reserveN(now, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
	// Wait if necessary
	delay := r.DelayFrom(now)
	if delay == 0 {
		return nil
	}
	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		r.Cancel()
		return ctx.Err()
	}
}

Dynamic Adjustment

rate.limiter 中,支持調整速率和桶大小,這樣就可以根據現有環境和條件,來動態的改變 Token生成速率和桶容量

  • SetLimit(Limit) 更改生成 Token 的速率
  • SetBurst(int) 改變桶容量

Example

一個流量整形的場景

package main

import (
	"log"
	"strconv"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	timeLayout := "2006-01-02:15:04:05.0000"
	limiter := rate.NewLimiter(1, 5) // BT(1,5)
	log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
	length := 20 // 一共請求20次
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			err := limiter.Allow()
			if !err {
				ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
			}

			time.Sleep(time.Duration(5) * time.Millisecond)
			ch <- "Task-" + taskId + " run success  " + time.Now().Format(timeLayout)
			return

		}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
	}
	for _, ch := range chs {
		log.Println("task start at " + <-ch)
	}
}

image

通過執行結果可以看出,在突發為20的情況下,allow僅允許了獲得token的事件執行,,這種場景下實現了流量整形的特性。

一個流量管制的場景

package main

import (
	"context"
	"log"
	"strconv"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	timeLayout := "2006-01-02:15:04:05.0000"
	limiter := rate.NewLimiter(1, 5) // BT(1,5)
	log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
	length := 20 // 一共請求20次
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			err := limiter.Wait(context.TODO())
			if err != nil {
				ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
			}
			ch <- "Task-" + taskId + " run success  " + time.Now().Format(timeLayout)
			return

		}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
	}
	for _, ch := range chs {
		log.Println("task start at " + <-ch)
	}
}

image

結果可以看出,在大突發的情況下,在拿到token的任務會立即執行,沒有拿到token的會等待拿到token後繼續執行,這種場景下實現了流量管制的特性

Reference

tokenbucket
QoS Policing

作者:鋼閘門
出處:http://lc161616.cnblogs.com/ 本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。 阿裡雲優惠:點擊力享低價 墨墨學英語:幫忙點一下
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這裡給大家分享我在網上總結出來的一些JavaScript 知識,希望對大家有所幫助 一、日期處理 1. 檢查日期是否有效 該方法用於檢測給出的日期是否有效: const isDateValid = (...val) => !Number.isNaN(new Date(...val).valueOf( ...
  • 來源 tree-shaking 最早由 Rich Harris 在 rollup 中提出。 為了減少最終構建體積而誕生。 以下是 MDN 中的說明: tree-shaking 是一個通常用於描述移除 JavaScript 上下文中的未引用代碼(dead-code) 行為的術語。 它依賴於 ES201 ...
  • 簡介 在 JavaScript 中,迭代器是一個對象,它定義一個序列,併在終止時可能返回一個返回值。 更具體地說,迭代器是通過使用 next() 方法實現迭代器協議的任何一個對象,該方法返回具有兩個屬性的對象: value,這是序列中的 next 值;和 done ,如果已經迭代到序列中的最後一個值 ...
  • 1. Express框架是什麼 1.1 Express是一個基於Node平臺的web應用開發框架,它提供了一系列的強大特性,幫助你創建各種Web應用。我們可以使用 npm install express 命令進行下載。 1.2 Express初體驗 // 引入express框架 const expr ...
  • 遞歸查找文件 引言 或許是文件太多,想找某個文件又忘記放哪了;又或者是項目改造,需要將外部調用介面進行改造,項目太多,又無法排查。那麼怎麼快速找到自己想要的內容就是一件值得思考的事情了。 根據特定內容尋找文件位置 package com.lizi.globalexception.Utils; imp ...
  • 大家好,本文我將繼續來剖析SpringCloud中負載均衡組件Ribbon的源碼。本來我是打算接著OpenFeign動態代理生成文章直接講Feign是如何整合Ribbon的,但是文章寫了一半發現,如果不把Ribbon好好講清楚,那麼有些Ribbon的細節理解起來就很困難,所以我還是打算單獨寫一篇文章 ...
  • phpcurl函數類模擬Curl get post header refer攜帶Cookie模擬訪問來源Refer模擬UseaAgent ...
  • 一個工作5年的粉絲找到我,他說參加美團面試,遇到一個基礎題沒回答上來。 這個問題是:“資料庫連接池有什麼用?以及它有哪些關鍵參數”? 我說,這個問題都不知道,那你項目裡面的連接池配置怎麼設置的? 你們猜他怎麼回答。懂得懂得啊。 好的,關於這個問題,我們來看看普通人和高手的回答。 普通人: 資料庫連接 ...
一周排行
    -Advertisement-
    Play Games
  • C#TMS系統代碼-基礎頁面BaseCity學習 本人純新手,剛進公司跟領導報道,我說我是java全棧,他問我會不會C#,我說大學學過,他說這個TMS系統就給你來管了。外包已經把代碼給我了,這幾天先把增刪改查的代碼背一下,說不定後面就要趕鴨子上架了 Service頁面 //using => impo ...
  • 委托與事件 委托 委托的定義 委托是C#中的一種類型,用於存儲對方法的引用。它允許將方法作為參數傳遞給其他方法,實現回調、事件處理和動態調用等功能。通俗來講,就是委托包含方法的記憶體地址,方法匹配與委托相同的簽名,因此通過使用正確的參數類型來調用方法。 委托的特性 引用方法:委托允許存儲對方法的引用, ...
  • 前言 這幾天閑來沒事看看ABP vNext的文檔和源碼,關於關於依賴註入(屬性註入)這塊兒產生了興趣。 我們都知道。Volo.ABP 依賴註入容器使用了第三方組件Autofac實現的。有三種註入方式,構造函數註入和方法註入和屬性註入。 ABP的屬性註入原則參考如下: 這時候我就開始疑惑了,因為我知道 ...
  • C#TMS系統代碼-業務頁面ShippingNotice學習 學一個業務頁面,ok,領導開完會就被裁掉了,很突然啊,他收拾東西的時候我還以為他要旅游提前請假了,還在尋思為什麼回家連自己買的幾箱飲料都要叫跑腿帶走,怕被偷嗎?還好我在他開會之前拿了兩瓶芬達 感覺感覺前面的BaseCity差不太多,這邊的 ...
  • 概述:在C#中,通過`Expression`類、`AndAlso`和`OrElse`方法可組合兩個`Expression<Func<T, bool>>`,實現多條件動態查詢。通過創建表達式樹,可輕鬆構建複雜的查詢條件。 在C#中,可以使用AndAlso和OrElse方法組合兩個Expression< ...
  • 閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
  • 1. 案例 成某三甲醫預約系統, 該項目在2024年初進行上線測試,在正常運行了兩天後,業務系統報錯:The connection pool has been exhausted, either raise MaxPoolSize (currently 800) or Timeout (curren ...
  • 背景 我們有些工具在 Web 版中已經有了很好的實踐,而在 WPF 中重新開發也是一種費時費力的操作,那麼直接集成則是最省事省力的方法了。 思路解釋 為什麼要使用 WPF?莫問為什麼,老 C# 開發的堅持,另外因為 Windows 上已經裝了 Webview2/edge 整體打包比 electron ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...