微服務架構|go-zero 的自適應熔斷器

来源:https://www.cnblogs.com/alwaysbeta/archive/2023/09/02/17673549.html
-Advertisement-
Play Games

**原文鏈接:** [go-zero 的自適應熔斷器](https://mp.weixin.qq.com/s/r1kTYUK_r-JalvhzAKKQwg) 上篇文章我們介紹了微服務的限流,詳細分析了計數器限流和令牌桶限流演算法,這篇文章來說說熔斷。 熔斷和限流還不太一樣,限流是控制請求速率,只要還能 ...


原文鏈接: go-zero 的自適應熔斷器

上篇文章我們介紹了微服務的限流,詳細分析了計數器限流和令牌桶限流演算法,這篇文章來說說熔斷。

熔斷和限流還不太一樣,限流是控制請求速率,只要還能承受,那麼都會處理,但熔斷不是。

在一條調用鏈上,如果發現某個服務異常,比如響應超時。那麼調用者為了避免過多請求導致資源消耗過大,最終引發系統雪崩,會直接返回錯誤,而不是瘋狂調用這個服務。

本篇文章會介紹主流熔斷器的工作原理,並且會藉助 go-zero 源碼,分析 googleBreaker 是如何通過滑動視窗來統計流量,並且最終執行熔斷的。

工作原理

這部分主要介紹兩種熔斷器的工作原理,分別是 Netflix 開源的 Hystrix,其也是 Spring Cloud 預設的熔斷組件,和 Google 的自適應的熔斷器。

Hystrix is no longer in active development, and is currently in maintenance mode.

註意,Hystrix 官方已經宣佈不再積極開發了,目前處在維護模式。

Hystrix 官方推薦替代的開源組件:Resilience4j,還有阿裡開源的 Sentinel 也是不錯的替代品。

hystrixBreaker

Hystrix 採用了熔斷器模式,相當於電路中的保險絲,系統出現緊急問題,立刻禁止所有請求,已達到保護系統的作用。

系統需要維護三種狀態,分別是:

  • 關閉: 預設狀態,所有請求全部能夠通過。當請求失敗數量增加,失敗率超過閾值時,會進入到斷開狀態。
  • 斷開: 此狀態下,所有請求都會被攔截。當經過一段超時時間後,會進入到半斷開狀態。
  • 半斷開: 此狀態下會允許一部分請求通過,並統計成功數量,當請求成功時,恢復到關閉狀態,否則繼續斷開。

通過狀態的變更,可以有效防止系統雪崩的問題。同時,在半斷開狀態下,又可以讓系統進行自我修複。

googleBreaker

googleBreaker 實現了一種自適應的熔斷模式,來看一下演算法的計算公式,客戶端請求被拒絕的概率

參數很少,也比較好理解:

  1. requests:請求數量
  2. accepts:後端接收的請求數量
  3. K:敏感度,一般推薦 1.5-2 之間

通過分析公式,我們可以得到下麵幾個結論,也就是產生熔斷的實際原理:

  1. 正常情況下,requests 和 accepts 是相等的,拒絕的概率就是 0,沒有產生熔斷
  2. 當正常請求量,也就是 accepts 減少時,概率會逐漸增加,當概率大於 0 時,就會產生熔斷。如果 accepts 等於 0 了,則完全熔斷。
  3. 當服務恢復後,requests 和 accepts 的數量會同時增加,但由於 K * accepts 增長的更快,所以概率又會很快變回到 0,相當於關閉了熔斷。

總的來說,googleBreaker 的實現方案更加優雅,而且參數也少,不用維護那麼多的狀態。

go-zero 就是採用了 googleBreaker 的方案,下麵就來分析代碼,看看到底是怎麼實現的。

介面設計

介面定義這部分我個人感覺還是挺不好理解的,看了好多遍才理清了它們之間的關係。

其實看代碼和看書是一樣的,書越看越薄,代碼會越看越短。剛開始看感覺代碼很長,隨著看懂的地方越來越多,明顯感覺代碼變短了。所以遇到不懂的代碼不要怕,反覆看,總會看懂的。

首先來看一下 breaker 部分的 UML 圖,有了這張圖,很多地方看起來還是相對清晰的,下麵來詳細分析。

這裡用到了靜態代理模式,也可以說是介面裝飾器,接下來就看看到底是怎麼定義的:

// core/breaker/breaker.go
internalThrottle interface {
    allow() (internalPromise, error)
    doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

// core/breaker/googlebreaker.go
type googleBreaker struct {
    k     float64
    stat  *collection.RollingWindow
    proba *mathx.Proba
}

這個介面是最終實現熔斷方法的介面,由 googleBreaker 結構體實現。

// core/breaker/breaker.go
throttle interface {
    allow() (Promise, error)
    doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

type loggedThrottle struct {
    name string
    internalThrottle
    errWin *errorWindow
}

func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
    return loggedThrottle{
        name:             name,
        internalThrottle: t,
        errWin:           new(errorWindow),
    }
}

這個是實現了日誌收集的結構體,首先它實現了 throttle 介面,然後它包含了一個欄位 internalThrottle,相當於具體的熔斷方法是代理給 internalThrottle 來做的。

// core/breaker/breaker.go
func (lt loggedThrottle) allow() (Promise, error) {
    promise, err := lt.internalThrottle.allow()
    return promiseWithReason{
        promise: promise,
        errWin:  lt.errWin,
    }, lt.logError(err)
}

func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
        accept := acceptable(err)
        if !accept && err != nil {
            lt.errWin.add(err.Error())
        }
        return accept
    }))
}

所以當它執行相應方法時,都是直接調用 internalThrottle 介面的方法,然後再加上自己的邏輯。

這也就是代理所起到的作用,在不改變原方法的基礎上,擴展原方法的功能。

// core/breaker/breaker.go
circuitBreaker struct {
    name string
    throttle
}

// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
    var b circuitBreaker
    for _, opt := range opts {
        opt(&b)
    }
    if len(b.name) == 0 {
        b.name = stringx.Rand()
    }
    b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())

    return &b
}

最終的熔斷器又將功能代理給了 throttle

這就是它們之間的關係,如果感覺有點亂的話,就反覆看,看的次數多了,就清晰了。

日誌收集

上文介紹過了,loggedThrottle 是為了記錄日誌而設計的代理層,這部分內容來分析一下是如何記錄日誌的。

// core/breaker/breaker.go
type errorWindow struct {
    // 記錄日誌的數組
    reasons [numHistoryReasons]string
    // 索引
    index   int
    // 數組元素數量,小於等於 numHistoryReasons
    count   int
    lock    sync.Mutex
}

func (ew *errorWindow) add(reason string) {
    ew.lock.Lock()
    // 記錄錯誤日誌內容
    ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
    // 對 numHistoryReasons 進行取餘來得到數組索引
    ew.index = (ew.index + 1) % numHistoryReasons
    ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
    ew.lock.Unlock()
}

func (ew *errorWindow) String() string {
    var reasons []string

    ew.lock.Lock()
    // reverse order
    for i := ew.index - 1; i >= ew.index-ew.count; i-- {
        reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
    }
    ew.lock.Unlock()

    return strings.Join(reasons, "\n")
}

核心就是這裡採用了一個環形數組,通過維護兩個欄位來實現,分別是 indexcount

count 表示數組中元素的個數,最大值是數組的長度;index 是索引,每次 +1,然後對數組長度取餘得到新索引。

我之前有一次面試就讓我設計一個環形數組,當時答的還不是很好,這次算是學會了。

滑動視窗

一般來說,想要判斷是否需要觸發熔斷,那麼首先要知道一段時間的請求數量,一段時間內的數量統計可以使用滑動視窗來實現。

首先看一下滑動視窗的定義:

// core/collection/rollingwindow.go

type RollingWindow struct {
    lock          sync.RWMutex
    // 視窗大小
    size          int
    // 視窗數據容器
    win           *window
    // 時間間隔
    interval      time.Duration
    // 游標,用於定位當前應該寫入哪個 bucket
    offset        int
    // 彙總數據時,是否忽略當前正在寫入桶的數據
    // 某些場景下因為當前正在寫入的桶數據並沒有經過完整的視窗時間間隔
    // 可能導致當前桶的統計並不准確
    ignoreCurrent bool
    // 最後寫入桶的時間
    // 用於計算下一次寫入數據間隔最後一次寫入數據的之間
    // 經過了多少個時間間隔
    lastTime      time.Duration // start time of the last bucket
}

再來看一下 window 的結構:

type Bucket struct {
    // 桶內值的和
    Sum   float64
    // 桶內 add 次數
    Count int64
}

func (b *Bucket) add(v float64) {
    b.Sum += v
    b.Count++
}

func (b *Bucket) reset() {
    b.Sum = 0
    b.Count = 0
}

type window struct {
    // 桶,一個桶就是一個時間間隔
    buckets []*Bucket
    // 視窗大小,也就是桶的數量
    size    int
}

有了這兩個結構之後,我們就可以畫出這個滑動視窗了,如圖所示。

現在來看一下向視窗中添加數據,是怎樣一個過程。

func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    // 獲取當前寫入下標
    rw.updateOffset()
    // 向 bucket 中寫入數據
    rw.win.add(rw.offset, v)
}

func (rw *RollingWindow) span() int {
    // 計算距離 lastTime 經過了多少個時間間隔,也就是多少個桶
    offset := int(timex.Since(rw.lastTime) / rw.interval)
    // 如果在視窗範圍內,返回實際值,否則返回視窗大小
    if 0 <= offset && offset < rw.size {
        return offset
    }

    return rw.size
}

func (rw *RollingWindow) updateOffset() {
    // 經過了多少個時間間隔,也就是多少個桶
    span := rw.span()
    // 還在同一單元時間內不需要更新
    if span <= 0 {
        return
    }

    offset := rw.offset
    // reset expired buckets
    // 這裡是清除過期桶的數據
    // 也是對數組大小進行取餘的方式,類似上文介紹的環形數組
    for i := 0; i < span; i++ {
        rw.win.resetBucket((offset + i + 1) % rw.size)
    }

    // 更新游標
    rw.offset = (offset + span) % rw.size
    now := timex.Now()
    // align to interval time boundary
    // 這裡應該是一個時間的對齊,保持在桶內指向位置是一致的
    rw.lastTime = now - (now-rw.lastTime)%rw.interval
}

// 向桶內添加數據
func (w *window) add(offset int, v float64) {
    // 根據 offset 對數組大小取餘得到索引,然後添加數據
    w.buckets[offset%w.size].add(v)
}

// 重置桶數據
func (w *window) resetBucket(offset int) {
    w.buckets[offset%w.size].reset()
}

我畫了一張圖,來模擬整個滑動過程:

主要經歷 4 個步驟:

  1. 計算當前時間距離上次添加時間經過了多少個時間間隔,也就是多少個 bucket
  2. 清理過期桶數據
  3. 更新 offset,更新 offset 的過程實際就是模擬視窗滑動的過程
  4. 添加數據

比如上圖,剛開始 offset 指向了 bucket[1],經過了兩個 span 之後,bucket[2]bucket[3] 會被清空,同時,新的 offset 會指向 bucket[3],新添加的數據會寫入到 bucket[3]

再來看看數據統計,也就是視窗內的有效數據量是多少。

// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
    rw.lock.RLock()
    defer rw.lock.RUnlock()

    var diff int
    span := rw.span()
    // ignore current bucket, because of partial data
    if span == 0 && rw.ignoreCurrent {
        diff = rw.size - 1
    } else {
        diff = rw.size - span
    }
    // 需要統計的 bucket 數量,視窗大小減去 span 數量
    if diff > 0 {
        // 獲取統計的起始位置,span 是已經被重置的 bucket
        offset := (rw.offset + span + 1) % rw.size
        rw.win.reduce(offset, diff, fn)
    }
}

func (w *window) reduce(start, count int, fn func(b *Bucket)) {
    for i := 0; i < count; i++ {
        // 自定義統計函數
        fn(w.buckets[(start+i)%w.size])
    }
}

統計出視窗數據之後,就可以判斷是否需要熔斷了。

執行熔斷

接下來就是執行熔斷了,主要就是看看自適應熔斷是如何實現的。

// core/breaker/googlebreaker.go

const (
    // 250ms for bucket duration
    window     = time.Second * 10
    buckets    = 40
    k          = 1.5
    protection = 5
)

視窗的定義部分,整個視窗是 10s,然後分成 40 個 bucket,每個 bucket 就是 250ms。

// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
    k     float64
    stat  *collection.RollingWindow
    proba *mathx.Proba
}

func (b *googleBreaker) accept() error {
    // 獲取最近一段時間的統計數據
    accepts, total := b.history()
    // 根據上文提到的演算法來計算一個概率
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    // 如果小於等於 0 直接通過,不熔斷
    if dropRatio <= 0 {
        return nil
    }

    // 隨機產生 0.0-1.0 之間的隨機數與上面計算出來的熔斷概率相比較
    // 如果隨機數比熔斷概率小則進行熔斷
    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}

func (b *googleBreaker) history() (accepts, total int64) {
    b.stat.Reduce(func(b *collection.Bucket) {
        accepts += int64(b.Sum)
        total += b.Count
    })

    return
}

以上就是自適應熔斷的邏輯,通過概率的比較來隨機淘汰掉部分請求,然後隨著服務恢復,淘汰的請求會逐漸變少,直至不淘汰。

func (b *googleBreaker) allow() (internalPromise, error) {
    if err := b.accept(); err != nil {
        return nil, err
    }

    // 返回一個 promise 非同步回調對象,可由開發者自行決定是否上報結果到熔斷器
    return googlePromise{
        b: b,
    }, nil
}

// req - 熔斷對象方法
// fallback - 自定義快速失敗函數,可對熔斷產生的err進行包裝後返回
// acceptable - 對本次未熔斷時執行請求的結果進行自定義的判定,比如可以針對http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    if err := b.accept(); err != nil {
        // 熔斷中,如果有自定義的fallback則執行
        if fallback != nil {
            return fallback(err)
        }

        return err
    }

    defer func() {
        // 如果執行req()過程發生了panic,依然判定本次執行失敗上報至熔斷器
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    // 上報結果
    if acceptable(err) {
        b.markSuccess()
    } else {
        b.markFailure()
    }

    return err
}

熔斷器對外暴露兩種類型的方法:

1、簡單場景直接判斷對象是否被熔斷,執行請求後必須需手動上報執行結果至熔斷器。

func (b *googleBreaker) allow() (internalPromise, error)

2、複雜場景下支持自定義快速失敗,自定義判定請求是否成功的熔斷方法,自動上報執行結果至熔斷器。

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

個人感覺,熔斷這部分代碼,相較於前幾篇文章,理解起來是更困難的。但其中的一些設計思想,和底層的實現原理也是非常值得學習的,希望這篇文章能夠對大家有幫助。

以上就是本文的全部內容,如果覺得還不錯的話歡迎點贊轉發關註,感謝支持。


參考文章:

推薦閱讀:


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • >5月份時曾部署上線了C++的Web伺服器,溫故而知新,本篇文章梳理總結一下部署流程知識; >- 最初的解決方案:https://blog.csdn.net/BinBinCome/article/details/129750951?spm=1001.2014.3001.5501 >- 後來的解決方案 ...
  • 運算符用於對變數和值執行操作。 加號運算符(+)將兩個值相加,如下麵的示例所示: **示例代碼:** ```Go package main import ( "fmt" ) func main() { var a = 15 + 25 fmt.Println(a) } ``` 儘管加號運算符通常用於將 ...
  • 寫代碼的時候大腦想的總是數據結構和演算法。大學學習 C 語言的時候, 書上看到的,有位編程大師說的就是, 編程就等於數據結構加演算法。C 語言 有數組這個數據結構。有人說不是啊不是還有鏈表,不是還有棧,不是還 有隊列 其實這 是表象,底層都是以數組的 形式組織設計的。C 語言 編程的時候 會使用到數組, ...
  • ## Gateway 簡介 Spring Cloud Gateway 基於 Spring 5、Spring Boot 2 和 Project Reactor 等技術,是在 Spring 生態系統之上構建的 API 網關服務,Gateway 旨在提供一種簡單而有效的方式來對 API 進行路由以及提供一 ...
  • 如何在Python中進行素因式分解。 ### 質因數分解的概述 在數學中,一個數的因數是指那些可以除以給定數並留下零餘數的數字。 質數是只有兩個因數的獨特數字,一個和數字本身。這類數字的一些例子是3,7,11,13,等等。 素數因數化是指找到所有乘以原數的素數。我們可以考慮一個簡單的例子:數字6。 ...
  • 在測試並行開發(TPD)中,代碼開發是第一位的。儘管如此,我們還是要寫出開發的測試,並執行它們來驗證代碼的準確性(而不是直接運行代碼或使用控制台)。 在Python中,我們有一個叫做單元測試的過程,裡面有mock 和patch 函數。這篇文章將討論這兩個角色的用途和區別。 ### Mock 和Pat ...
  • # 字元串擴展 ## 1、字元串的三種定義方式 單引號,雙引號,三引號 ``` a='abc'; b="sdf"; c='''ewrc'''; print(a,b,c); ``` ## 2、字元串的拼接 ``` #字元串字面量之間的拼接 print("我是一名"+"大學生"+","+"學習智能醫學工 ...
  • 今天學習了spring cloud 中的註冊中心——eureka,作為微服務的註冊中心,eureka需要對服務的可用狀態進行一個體現。直觀的體現方式就是在eureka啟動後的ui界面上可以看到服務的是否可用。 在某一個時刻下,如果後端某一個服務不可用了,eureka不會立即將其從ui界面上刪除。而是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...