限流常規設計和實例

来源:https://www.cnblogs.com/killbug/archive/2019/08/17/11370694.html
-Advertisement-
Play Games

限流演算法 計數器限流 固定視窗 滑動視窗 桶限流 令牌桶 漏桶 計數器 計數器限流可以分為: 固定視窗 滑動視窗 固定視窗 固定視窗計數器限流簡單明瞭,就是限制單位之間內的請求數,比如設置QPS為10,那麼從一開始的請求進入就計數,每次計數前判斷是否到10,到達就拒絕請求,並保證這個計數周期是1秒, ...


限流演算法

  • 計數器限流
    • 固定視窗
    • 滑動視窗
  • 桶限流
    • 令牌桶
    • 漏桶

計數器

計數器限流可以分為:

  • 固定視窗
  • 滑動視窗

固定視窗

固定視窗計數器限流簡單明瞭,就是限制單位之間內的請求數,比如設置QPS為10,那麼從一開始的請求進入就計數,每次計數前判斷是否到10,到達就拒絕請求,並保證這個計數周期是1秒,1秒後計數器清零。
以下是利用redis實現計數器分散式限流的實現,曾經線上上實踐過的lua腳本:

local key = KEYS[1] 
local limit = tonumber(ARGV[1]) 
local refreshInterval = tonumber(ARGV[2]) 
local currentLimit = tonumber(redis.call('get', key) or '0') 
if currentLimit + 1 > limit then 
    return -1; 
else 
    redis.call('INCRBY', key, 1) 
    redis.call('EXPIRE', key, refreshInterval) 
    return limit - currentLimit - 1 
end 

一個明顯的弊端就是固定視窗計數器演算法無法處理突刺流量,比如10QPS,1ms中來了10個請求,後續的999ms的所有請求都會被拒絕。

滑動視窗

為瞭解決固定視窗的問題,滑動視窗將視窗細化,用更小的視窗來限制流量。比如 1 分鐘的固定視窗切分為 60 個 1 秒的滑動視窗。然後統計的時間範圍隨著時間的推移同步後移。
即便滑動時間視窗限流演算法可以保證任意時間視窗內介面請求次數都不會超過最大限流值,但是仍然不能防止在細時間粒度上面訪問過於集中的問題。
為了應對上面的問題,對於時間視窗限流演算法,還有很多改進版本,比如:
多層次限流,我們可以對同一個介面設置多條限流規則,除了 1 秒不超過 100 次之外,我們還可以設置 100ms 不超過 20 次 (代碼中實現寫了平均的兩倍),兩條規則同時限制,流量會更加平滑。
簡單實現的代碼如下:

public class SlidingWindowRateLimiter {

    // 小視窗鏈表
    LinkedList<Room> linkedList = null;
    long stepInterval = 0;
    long subWindowCount = 10;
    long stepLimitCount = 0;
    int countLimit = 0;
    int count = 0;


    public SlidingWindowRateLimiter(int countLimit, int interval){
        // 每個小視窗的時間間距
        this.stepInterval = interval * 1000/ subWindowCount;
        // 請求總數限制
        this.countLimit = countLimit;
        // 每個小視窗的請求量限制數 設置為平均的2倍
        this.stepLimitCount = countLimit / subWindowCount * 2;
        // 時間視窗開始時間
        long start = System.currentTimeMillis();
        // 初始化連續的小視窗鏈表
        initWindow(start);
    }

    Room getAndRefreshWindows(long requestTime){
        Room firstRoom = linkedList.getFirst();
        Room lastRoom = linkedList.getLast();
        // 發起請求時間在主視窗內
        if(firstRoom.getStartTime() < requestTime && requestTime < lastRoom.getEndTime()){
            long distanceFromFirst = requestTime - firstRoom.getStartTime();
            int num = (int) (distanceFromFirst/stepInterval);
            return linkedList.get(num);
        }else{
            long distanceFromLast = requestTime - lastRoom.getEndTime();
            int num = (int)(distanceFromLast/stepInterval);
            // 請求時間超出主視窗一個視窗以上的身位
            if(num >= subWindowCount){
                initWindow(requestTime);
                return linkedList.getFirst();
            }else{
                moveWindow(num+1);
                return linkedList.getLast();
            }
        }
    }

    public boolean acquire(){
        synchronized (mutex()) {
            Room room = getAndRefreshWindows(System.currentTimeMillis());
            int subCount = room.getCount();
            if(subCount + 1 <= stepLimitCount && count + 1 <= countLimit){
                room.increase();
                count ++;
                return true;
            }
            return false;
        }
    }

    /**
     * 初始化視窗
     * @param start
     */
    private void initWindow(long start){
        linkedList = new LinkedList<Room>();
        for (int i = 0; i < subWindowCount; i++) {
            linkedList.add(new Room(start, start += stepInterval));
        }
        // 總記數清零
        count = 0;
    }

    /**
     * 移動視窗
     * @param stepNum
     */
    private void moveWindow(int stepNum){
        for (int i = 0; i < stepNum; i++) {
            Room removeRoom = linkedList.removeFirst();
            count = count - removeRoom.count;
        }
        Room lastRoom = linkedList.getLast();
        long start = lastRoom.endTime;
        for (int i = 0; i < stepNum; i++) {
            linkedList.add(new Room(start, start += stepInterval));
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(20, 5);
        for (int i = 0; i < 26; i++) {
            System.out.println(slidingWindowRateLimiter.acquire());
            Thread.sleep(300);
        }
    }

    class Room{
         Room(long startTime, long endTime) {
            this.startTime = startTime;
            this.endTime = endTime;
            this.count = 0;
        }
        private long startTime;
        private long endTime;
        private int count;
        long getStartTime() {
            return startTime;
        }
        long getEndTime() {
            return endTime;
        }

        int getCount() {
            return count;
        }

        int increase(){
            this.count++;
            return this.count;
        }
    }

    private volatile Object mutexDoNotUseDirectly;

    private Object mutex() {
        Object mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
            synchronized (this) {
                mutex = mutexDoNotUseDirectly;
                if (mutex == null) {
                    mutexDoNotUseDirectly = mutex = new Object();
                }
            }
        }
        return mutex;
    }

}

以上實現使用了鏈表的特性,在一定視窗時是將前段刪除後段加上的方式移動的。移動的操作是請求進入時操作的,沒有使用單獨線程移動視窗。並且按照前面講的,兩個維度控制流量一個時視窗的總請求數,一個是每個單獨視窗的請求數。

令牌桶

原理如圖:

目前使用令牌桶實現的限流有以下幾個:

  • Spring Cloud Gateway RateLimiter
  • Guava RateLimiter
  • Bucket4j

Spring Cloud Gateway RateLimiter

zuul2跳票後Spring Cloud 自己開發的網關,內部也實現了限流器

Spring Cloud Gateway RedisRateLimiter實現原理

因為是微服務架構,多服務部署是必然場景,所以預設提供了redis為存儲載體的實現,所以直接看rua腳本是怎麼樣的就可以知道它的演算法是怎麼實現的了:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
// 速率
local rate = tonumber(ARGV[1])
// 桶容量
local capacity = tonumber(ARGV[2])
// 發起請求時間
local now = tonumber(ARGV[3])
// 請求令牌數量 現在固定是1
local requested = tonumber(ARGV[4])
// 存滿桶耗時
local fill_time = capacity/rate
// 過期時間
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
// 上次請求的信息 存在redis
local last_tokens = tonumber(redis.call("get", tokens_key))
// 可任務初始化桶是滿的
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
// 上次請求的時間
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
// 現在距離上次請求時間的差值 也就是距離上次請求過去了多久
local delta = math.max(0, now-last_refreshed)
// 這個桶此時能提供的令牌是上次剩餘的令牌數加上這次時間間隔按速率能產生的令牌數 最多不能超過片桶大小
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 能提供的令牌和要求的令牌比較
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  // 消耗調令牌
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
// 存儲剩餘的令牌
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }
Spring Cloud Gateway RedisRateLimiter總結

1,在請求令牌時計算上次請求和此刻的時間間隔能產生的令牌,來刷新桶中的令牌數,然後將令牌提供出去。
2,如果令牌不足沒有等待,直接返回。
3,實現的方式是將每個請求的時間間隔為最小單位,只要小於這個單位的請求就會拒絕,比如100qps的配置,1允許10ms一個,如果兩個請求比較靠近小於10ms,後一個會被拒絕。

Guava RateLimiter

一個Guava實現的令牌桶限流器。

Guava RateLimiter類關係

Guava RateLimiter使用

RateLimiter就是一個根據配置的rate分發permits的工具。每一次調用acquire()都會阻塞到獲得permit為止,一個permit只能用一次。RateLimiter是併發安全的,它將限制全部線程的全部請求速率,但是RateLimiter並不保證是公平的。
限速器經常被使用在限制訪問物理或邏輯資源的速率,和java.util.concurrent.Semaphore對比,Semaphore用於限制同時訪問的數量,而它限制的是訪問的速率(兩者有一定關聯:就是little's law(律特法則))。
RateLimiter主要由發放許可證的速度來定義。如果沒有其他配置,許可證將以固定的速率分發,以每秒許可證的形式定義。許可證將平穩分發,各許可證之間的延遲將被調整到配置的速率。(SmoothBursty)
也可以將RateLimiter配置為有一個預熱期,在此期間,每秒鐘發出的許可穩步增加,直到達到穩定的速率。(SmoothWarmingUp)
一個例子:我們一個任務列表需要執行,但是每秒提交的任務個數不能超過2個。

final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
 void submitTasks(List<Runnable> tasks, Executor executor) {
   for (Runnable task : tasks) {
     rateLimiter.acquire(); // may wait
     executor.execute(task);
   }
 }

另一個例子:我們生產一個數據流想以5kb每秒的速度傳輸,這個可以通過將一個permit對應一個byte,並定義5000/s的速率。

final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second
void submitPacket(byte[] packet) {
  rateLimiter.acquire(packet.length);
  networkService.send(packet);
}

有一點需要註意,請求令牌的數量不會影響請求自身的節流,但是會影響下一次請求,如果一個需要大量令牌的任務到達時,將馬上授予,但是下一次請求將會為上次昂貴任務付出代價。

Guava RateLimiter是如何設計的,為什麼?

對於RateLimiter來說最關鍵的特性是:在一般情況下允許的最大速率-“恆定速率”。這需要對傳入的請求強制執行即計算“節流”,也需要在適當的節流時間,讓請求線程等待。
維護QPS的速率最簡單的方式就是保留最後准許請求的時間戳,確保能計算從那是到現在流逝的時間和QPS。舉個例子,一個QPS為5(5個token每秒)的速率,如果我們確保請求被准許時比最後的請求早200ms,我們就達到了想要的速率。如果最後准許的請求只過去100ms,那麼我們需要等待100ms。在這個速率下,提供新的15個許可證(即調用acquire(15))自然需要3s。
RateLimiter只記住最後一次請求的時間戳,只是對過去非常淺的記憶,意識到這點很重要。假如很長一段時間沒有使用RateLimiter,然後來了一個請求,是馬上准許嗎?這樣的RateLimiter將馬上忘記剛剛過去的利用不足的時間。最終,因為現實請求速率的不規則導致要麼利用不足或溢出的結果。過去利用不足意味著存在過剩可用資源,所以RateLimiter想要利用這些資源,就需要提速一會兒。在電腦網路中應用的速率(帶寬限制),那些過去利用不足的一般會轉換成“幾乎為空的緩衝區”,可以馬上用於後續流量。
另一方面,過去利用不足也意味著“伺服器對未來請求的處理的準備變得越來越少”,也就是說,緩存變過期,請求將更有可能觸發昂貴的操作(一個更極端的例子,當服務啟動後,它通常忙於提高自己的速度)。
為了處理這類場景,我們增加了一個額外的維度,將過去利用不足建模到storedPermits欄位。沒有利用不足情況時這個欄位的值是0,隨著逐漸到達充分利用不足,這個欄位的值會增大到一個最大值maxStoredPermits。

所以,當執行acquire(permits)方法來請求許可證從兩方面獲取:

  • 1,存儲的許可證(如果有可用的),
  • 2,新的許可證(對於任何剩餘的許可)。

下麵用一個例子來解釋工作原理:假設RateLimiter每秒生成一個token,每一秒過去而RateLimiter沒有被使用的話,就會把storedPermits加1。我們不使用RateLimiter10秒(即,我們期望在一個X時間有一個請求,可是實際在X+10秒後請求才到達;這個和最後一個段落的結論有關),storedPermits會變成10(假定maxStoredPermits>=10)。在那時一個調用acquire(3)的請求到達。我們使用storedPermits將它降到7來響應這個請求(how this is translated to throttling time is discussed later),之後acquire(10)的請求馬上到達。我們使用storedPermits中全部剩餘的7個permits,剩下的3個使用RateLimiter生產的新permits。我們已經知道,獲得三個fresh permits需要多少時間:如果速率是“1秒一個token”,那麼需要3秒時間。但是提供7個存儲的permits意味著什麼?前面的解釋中沒有唯一的答案。如果主要關註處理利用不足的情況,我們存儲permits為了給出比fresh permits快,因為利用不足=空閑資源。如果主要關註overflow的場景,存儲permits可以比fresh permits慢給出。所以,我們需要一個方法將storedPermits轉換成節流時間(throttling time)。
而storedPermitsToWaitTime(double storedPermits, double permitsToTake)這個方法就扮演了這個轉換的角色。它的基礎模型是一個連續函數映射存儲令牌(從0到maxStoredPermits)在1/Rate的積分。我們利用空閑時間來存儲令牌,所以storedPermits本質上是度量了空閑時間(unused time)。Rate=permits/time,1/Rate=time/permits,所以,1/Rate和permits相乘就可以算出時間。即處理一定量的令牌請求時,對這個函數進行積分就是對應於後續請求的最小間隔。
關於storedPermitsToWaitTime的一個例子:如果storedPermits==10,我們先需要3個從storedPermits中拿,storedPermits降到7,使用storedPermitsToWaitTime(storedPermits=10, permitsToToken=3)計算節流時間,它將求出函數從7到10的積分值。
使用積分保證了一個單獨的acquire(3)和拆分成{acquire(1),acquire(1),acquire(1)}或{acquire(2),acquire(1)}都是一樣的,另外,不管函數是怎麼樣的,對於函數[7,10]的求積分是等於[7,8],[8,9],[9,10]的總和的。這就保證了我們可以正確處理不同權重(permits不同)的請求,不管函數是什麼,因此我們可以自由調整函數的演算法(很顯然,只有一個要求:可以被求積分)。註意,如果這個函數畫的是個數值剛好是1/QPS的水平線,那麼這個函數就失去作用了,因為它表示存儲令牌的速率和生產新令牌的速率是一致的,後續中會用到這個技巧。如果這個函數的值在水平線的下麵,也就是f(x)<1/Rate,表示我們減少了積分的面積,也就是相同存儲的令牌數映射的節流時間相對於正常速率產生的時間變少了,也代表RateLimiter在一段時間空閑後變快了。相反的,如果函數的值在水平線的上面,表示增加了積分的面積,獲得存儲令牌的消耗要大於新生產令牌,那就意味著RateLimiter在一段時間空閑後變慢了。
最後但也重要,如果RateLimiter採用QPS=1的速度,那麼開銷較大的acquire(100)到達時,那是沒有必要等到100s才開始實際任務,為什麼不在等待的時候做點什麼呢?更好的辦法是我們可以馬上先開始執行任務(就像它是acquire(1)一個個請求的樣子),需要把未來的請求推後。
在這個版本,我們允許馬上執行任務,並把未來的請求推後100s,所以,我們允許在這段時間里完成工作,而不是空等。
這就有了一個很重要的結論:RateLimiter並需要不記住最後請求的時間,而只需要記住期望下一個請求到來的時間。因為我們一直堅持這一點,如此我們就可以馬上識別出一個請求(tryAcquire(timeout))超時時間是否滿足下一個預期請求到達的時間點。通過這個概念我們可以重新定義“一個空閑的RateLimiter”:當我們觀察到“期望下一個請求達到時間”實際已經過去,並且差值(now-past)也就是大量時間被轉換成storedPermits。(我們把在空閑時間生產的令牌增加storedPermits)。所以,如果Rate=1 permit/s,且到達時間恰好比前一次請求晚一秒,那麼storedPermits將永遠不會增加—我們只會在到達時間晚於一秒時增加它。

SmoothWarmingUp實現原理:顧名思義,這裡想要實現的是一個有預熱能力的RateLimiter,作者在註釋中還畫了這幅圖:

在進入詳細實現前,讓我們先記住幾個基本原則:

  • Y軸表示RateLimiter(storedPermits數量)的狀態,不同的storedPermits量對應不同的節流時間。
  • 隨著RateLimiter空閑時間的推移,storedPermits不斷向右移動,直到maxPermits。
  • 如果在有storedPermits的情況下,我們優先使用它,所以隨著RateLimiter被使用,就會向左移動,直到0。
  • 當始空閑時,我們以恆定的速度前進!我們向右移動的速率被選擇為maxPermits/預熱周期。這確保從0到maxpermit所花費的時間等於預熱時間。(由coolDownIntervalMicros方法控制)
  • 當使用時,正如在前面類註釋中解釋的那樣,假設我們想使用K個storedPermits,它花費的時間等於函數在X許可證和X-K許可證之間的積分。

總之,向左移動K個permits所需要花費的節流時間等於長度為K的函數的面積。假設RateLimiter的storedPermits飽和狀態,從maxPermits到thresholdPermits就等於warmupPeriod。從thresholdPermits到0的時間是warmupPeriod/2(原因是為了維護以前的實現,其中coldFactor硬編碼為3)
計算thresholdPermits和maxPermits的公式:

  • 從thresholdPermits到0花費的節流時間等於函數從0到thresholdPermits的積分,也就是thresholdPermits*stableIntervals。也等於warmupPeriod/2。
    thresholdPermits=0.5*warmupPeriod/stableInterval

  • 從maxPermits到thresholdPermits就是函數從thresholdPermits到maxPermits的積分,也就是梯型部分的面積,它等於0.5(stableInterval+coldInterval)(maxPermits - thresholdPermits),也就是warmupPeriod
    maxPermits =&nbsp;thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)

Guava RateLimiter源碼

這裡源碼先從SmoothBursty入手,首先是RateLimiter類里的創建:

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    // maxBurstSeconds代表這個桶能存儲的令牌數換算的秒數,通過這個秒數就可以知道能存儲的令牌數,也就表示這個桶的大小。
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

我們看到外界無法自定義SmoothBursty的桶大小,所以我們無論是創建什麼速率的RateLimiter,桶的大小就必然是rate*1的大小,那麼就有人通過反射的方式在滿足自己想要修改桶大小的需求:https://github.com/vipshop/vjtools/commit/9eacb861960df0c41b2323ce14da037a9fdc0629

setRate方法:

  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    // 需要全局同步
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }
  private volatile Object mutexDoNotUseDirectly;
  // 產生一個同步使用的對象,適應雙重檢查鎖保證這個對象是個單例
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

互斥鎖來保證線程的安全,具體實現代碼中使用volatile修飾的mutexDoNotUseDirectly欄位和雙重校驗同步鎖來保證生成單例,從而保證每次調用mutex()都是鎖同一個對象。這在後續的獲取令牌中也需要用到。

doSetRate方法是子類SmoothRateLimiter實現:

  // 當前存儲的令牌
  double storedPermits;
  // 最大能夠存儲的令牌
  double maxPermits;
  // 兩次獲取令牌的固定間隔時間
  double stableIntervalMicros;
  // 下一個請求能被授予的時間,一次請求後這個值就會推後,一個大請求推後的時間比一個小請求推後的時間要多
  // 這和預消費有關係,上一次消費的令牌
  private long nextFreeTicketMicros = 0L; 

 final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    // 計算出請求授予的間隔時間
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }
  // 更新storedPermits和nextFreeTicketMicros
  void resync(long nowMicros) {
    // 當前時間大於下一個請求時間 說明這次請求不用等待 如果不是就不會出現增加令牌的操作
    if (nowMicros > nextFreeTicketMicros) {
      // 根據上次請求和這次請求間隔計算能夠增加的令牌
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      // 存儲的令牌不能超過maxPermits
      storedPermits = min(maxPermits, storedPermits + newPermits);
      // 修改nextFreeTicketMicros為當前時間
      nextFreeTicketMicros = nowMicros;
    }
  }
 // 子類實現 
 abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
 // 返回冷卻期間需要等待獲得新許可證的微秒數。子類實現
 abstract double coolDownIntervalMicros();

SmoothBursty的實現

 static final class SmoothBursty extends SmoothRateLimiter {
    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
    final double maxBurstSeconds;

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      // 計算出桶的最大值
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        // 初始化為0 後續重新設置時按新maxPermits和老maxPermits的比例計算storedPermits
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    // 對於SmoothBursty 沒有什麼冷卻時期,所以始終返回的是stableIntervalMicros
    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
  }

然後看下獲取令牌:

  public double acquire() {
    return acquire(1);
  }
  
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    // 等待microsToWait時間 控制速率
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }
  
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }
  // 獲得需要等待的時間
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }
  
  // 子類SmoothRateLimiter實現
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

reserveEarliestAvailable方法,刷新下一個請求能被授予的時間

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 每次acquire都會觸發resync
    resync(nowMicros);
    // 返回值就是下一個請求能被授予的時間
    long returnValue = nextFreeTicketMicros;
    // 選出請求令牌數和存儲令牌數較小的一個 也就是從存儲的令牌中需要消耗的數量
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 計算本次請求中需要行創建的令牌
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 計算需要等待的時間 就是存儲令牌消耗的時間+新令牌產生需要的時間
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
    // 刷新下一個請求能被授予的時間 是將這次等待的時間加上原先的值 就是把這次請求需要產生的等待時間延遲給下一次請求 這就是一個大請求會馬上授予 但後續的請求會被等待長時間 所以這裡的思路核心就是再每次請求時都是在預測下一次請求到來的時間 
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 刷新存儲令牌
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }
  
  // 這個是用來計算存儲令牌在消耗時的節流時間 也就是通過這個方法子類可以控存儲令牌的速率 我們看到的SmoothBursty的實現是始終返回0 表示消耗存儲的令牌不需要額外的等待時間 我們在預熱的實現中可以看到不一樣的實現
  abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

再來看一下請求令牌方法帶超時時間的方法:

  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }
  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      // 判定設置的超時時間是否足夠等待下一個令牌的給予,等不了,就直接失敗
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        // 獲得需要等待的時間
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    // 等待
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }
  
  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

以上可以基本理解一個普通的限流器的實現方式,可以看到實現中可以通過doSetRatestoredPermitsToWaitTimecoolDownIntervalMicros方法進行定製自己的限流策略。
那麼這裡的SmoothBursty的策略是:

  • 桶大小通過固定的maxBurstSeconds控制 maxPermits = maxBurstSeconds * permitsPerSecond;
  • 消耗累積令牌不計入時間到等待時間中
  • 累積令牌時的速率和令牌消耗速率保持一致

我們繼續看稍微複雜點的SmoothWarmingUp,畢竟為了說明它人家作者都用註釋畫了示意圖。

static final class SmoothWarmingUp extends SmoothRateLimiter {
    // 預熱累計消耗時間
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;

    private double thresholdPermits;
    // 冷卻因數 固定是3 意思是通過這個因數可以計算在令牌桶滿的時候,消耗令牌需要的最大時間
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      // 通過coldFactor就可以算出coldInterval的最高點 即stableIntervalMicros的3倍 也就是說圖中的梯形最高點是固定的了 
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      // 根據warmupPeriodMicros*2=thresholdPermits*stableIntervalMicros換算thresholdPermits 因為我們看到梯形最高點是固定的 那麼通過設置warmupPeriod是可以控制thresholdPermits,從而控制maxPermits的值
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      // 也是根據上面提到的公式計算maxPermits
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      // 傾斜角度
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // 這裡的初始值是maxPermits
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      // 超過thresholdPermits的存儲令牌
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // 這裡就開始算這個梯形的面積了 梯形面積=(上底+下底)*高/2
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }

    // 為了確保從0到maxpermit所花費的時間等於預熱時間 可以看下resync方法中對coolDownIntervalMicros方法的使用
    @Override
    double coolDownIntervalMicros() {
      return warmupPeriodMicros / maxPermits;
    }
  }
Guava RateLimiter總結

1,Guava在對RateLimiter設計中採用的是令牌桶的演算法,提供了普通和預熱兩種模型,在存儲令牌增加和消耗的設計思路是計算函數積分的方式。
2,對於第一個新來的請求,無論請求多少令牌,不阻塞。
3,內部使用線程sleep方式達到線程等待,沒超時時間會一直等到有令牌
4,令牌存儲發生在請求令牌時,不需要單獨線程實現不斷產生令牌的演算法
5,內部設計的類一些參數並不開放外部配置

漏桶

原理如圖:

我們有一個固定的桶,這個桶的出水速率是固定的,流量不斷往桶中放水,進水速度比出水速度快的時候,可以在桶中有一個緩衝,可是到達一定量超出桶的容量,就會溢出桶,無法接受新的請求。

這個思路不就是阻塞隊列嘛,只要在消費的放保持固定速率即可。
實現類似如下代碼(示意使用):

public class LeakyBucketLimiter {

    BlockingQueue<String> leakyBucket;
    long rate;

    public LeakyBucketLimiter(int capacity, long rate) {
        this.rate = rate;
        this.leakyBucket = new ArrayBlockingQueue<String>(capacity);
    }

    public boolean offer(){
        return leakyBucket.offer("");
    }

    class Consumer implements Runnable{
        public void run() {
            try {
                while (true){
                    Thread.sleep(1000/rate);
                    leakyBucket.take();
                }
            } catch (InterruptedException e) {
                
            }
        }
    }
}

和令牌桶允許一定突發請求時的高速率,以及空閑後降低速率不同的是,漏桶演算法是必然保證速率不變的。

最後

  • 限流必然帶來性能損失,如何避免?

一個思路是監控系統的狀態,比如cpu,記憶體,io等,依據這些信息來開關限流器。

  • 實際場景中是單機限流還是分散式限流?

在分散式系統中,如果想用分散式限流,就需要公用存儲的載體,比如redis,zk等。還需要考量分散式存儲載體失效,不能影響正常業務功能。

  • 拓展
    本文並不是限流的全部,關於限流這裡只聊到了相關的一些常規的演算法,可以說是冰山一角,還有很多知識等待我們去探索,前路漫漫。
    另外,後續會參考開源限流方案 Sentinel 和 Bucket4j 進一步研究實踐限流的落地方案。

參考

http://www.cs.ucc.ie/~gprovan/CS6323/2014/L11-Congesion-Control.pdf(網上很多桶演算法限流的圖都來自這裡)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 單例模式 定義 保證一個類僅有一個實例,並提供一個訪問它的全局訪問點。 通常我們可以讓一個全局變數使得一個對象被訪問,但它不能防止你實例化多個對象。一個最好的辦法就是,讓類自身負責保存它的唯一實例。這個類可以保證沒有其他實例可以創建,並且它可以提供一個訪問該實例的方法。 UML圖 方式一:單線程下的 ...
  • 使用ZooKeeper實現一個生產-消費者隊列,可用於多節點的分散式數據結構。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象併進行處理。 ...
  • 迭代器模式 定義 提供一種方法順序訪問一個聚合對象中各個元素,而又不暴露該對象的內部表示。 什麼時候用? 當你需要訪問一個聚集對象,而且不管這些對象是什麼都需要遍歷的時候,你就應該考慮用迭代器模式 。 你需要對聚集有多種方式遍歷時,可以考慮用迭代器模式。 UML圖 模板代碼 Aggregate It ...
  • 前言 我們目前已經學習了設計模式的7種設計原則。下麵本該是直接進入具體的設計模式系列文章。 但是呢在我們學習設計模式之前我們還是有必要瞭解一下uml圖。因為後續的設計模式文章不出意外應該會很多地方使用到uml圖。如果你連uml圖都看不懂的話,那麼學習起來肯定會有一定的難度。 所以說,這一節就作為承上 ...
  • 1. new一個對象在Java內部做了哪些工作? 從靜態角度來看,new一個對象表示創建一個類的對象實例。 從JVM運行角度來看,當JVM執行到new位元組碼時,首先會去查看類有沒有被載入到記憶體以及初始化,如果是第一次使用該類,則首先載入該類。載入完成後便會在堆記憶體分配該對象實例的記憶體空間,虛擬機棧分 ...
  • 1.目前PyOpenGL是用python2寫的,如果你使用的是python3需要自己修改PyOpenGL,我這裡使用的是python2.7 2.下載PyOpenGLhttps://pypi.org/project/PyOpenGL/3.1.0/#modal-close,下載對應系統版本的就可以 3. ...
  • 一、 例子:我們對傳參是有要求的必須傳入一個元組,否則報錯 二、 二、threading的使用 直接利用threading.Thread生成Thread的實例 格式: t= threading.Thread(target=函數體,args=(,))#參數args要傳遞元組 ​t.start()#啟動 ...
  • 1 字元串、數組、集合的轉換 定義字元串數組 String[] strArr 1.1 字元串數組轉集合 1.2 字元串集合轉數組 1.3 字元串集合轉逗號分隔的字元串 1.4 逗號分隔的字元串轉字元串集合 1.5 對象集合轉 Map 定義對象 Person 定義對象 Person 的集合 List ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...