利用Kubernetes中的leaderelection實現組件高可用

来源:https://www.cnblogs.com/silenceper/archive/2020/02/16/12318619.html
-Advertisement-
Play Games

在Kubernetes中,通常kube schduler和kube controller manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工 ...


在Kubernetes中,通常kube-schduler和kube-controller-manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 leaderelection 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工作。

不單單隻是k8s中的這兩個組件用到,在其他服務中也可以看到這個包的使用,比如cluster-autoscaler等都能看得到這個包的,今天就來看看這個包的使用以及它內部是如何實現的。

使用

以下是一個簡單使用的例子,編譯完成之後同時啟動多個進程,但是只有一個進程在工作,當把leader進程kill掉之後,會重新選舉出一個leader進行工作,即執行其中的 run 方法:

/*
例子來源於client-go中的example包中
*/

package main

import (
    "context"
    "flag"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/google/uuid"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
    "k8s.io/klog"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
    if kubeconfig != "" {
        cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
        if err != nil {
            return nil, err
        }
        return cfg, nil
    }

    cfg, err := rest.InClusterConfig()
    if err != nil {
        return nil, err
    }
    return cfg, nil
}

func main() {
    klog.InitFlags(nil)

    var kubeconfig string
    var leaseLockName string
    var leaseLockNamespace string
    var id string

    flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
    flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
    flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
    flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
    flag.Parse()

    if leaseLockName == "" {
        klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
    }
    if leaseLockNamespace == "" {
        klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
    }

    // leader election uses the Kubernetes API by writing to a
    // lock object, which can be a LeaseLock object (preferred),
    // a ConfigMap, or an Endpoints (deprecated) object.
    // Conflicting writes are detected and each client handles those actions
    // independently.
    config, err := buildConfig(kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }
    client := clientset.NewForConfigOrDie(config)

    run := func(ctx context.Context) {
        // complete your controller loop here
        klog.Info("Controller loop...")

        select {}
    }

    // use a Go context so we can tell the leaderelection code when we
    // want to step down
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // listen for interrupts or the Linux SIGTERM signal and cancel
    // our context, which the leader election code will observe and
    // step down
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-ch
        klog.Info("Received termination, signaling shutdown")
        cancel()
    }()

    // we use the Lease lock type since edits to Leases are less common
    // and fewer objects in the cluster watch "all Leases".
    // 指定鎖的資源對象,這裡使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用)
    lock := &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      leaseLockName,
            Namespace: leaseLockNamespace,
        },
        Client: client.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: id,
        },
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: lock,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,//租約時間
        RenewDeadline:   15 * time.Second,//更新租約的
        RetryPeriod:     5 * time.Second,//非leader節點重試時間
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                //變為leader執行的業務代碼
                // we're notified when we start - this is where you would
                // usually put your code
                run(ctx)
            },
            OnStoppedLeading: func() {
                 // 進程退出
                // we can do cleanup here
                klog.Infof("leader lost: %s", id)
                os.Exit(0)
            },
            OnNewLeader: func(identity string) {
                //當產生新的leader後執行的方法
                // we're notified when new leader elected
                if identity == id {
                    // I just got the lock
                    return
                }
                klog.Infof("new leader elected: %s", identity)
            },
        },
    })
}

關鍵啟動參數說明:

kubeconfig: 指定kubeconfig文件地址
lease-lock-name:指定lock的名稱
lease-lock-namespace:指定lock存儲的namespace
id: 例子中提供的區別參數,用於區分實例
logtostderr:klog提供的參數,指定log輸出到控制台
v: 指定日誌輸出級別

同時啟動兩個進程:
啟動進程1

go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0215 19:56:37.049658   48045 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:56:37.080368   48045 leaderelection.go:252] successfully acquired lease default/example
I0215 19:56:37.080437   48045 main.go:87] Controller loop...

啟動進程2:

➜  leaderelection git:(master) ✗ go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
I0215 19:57:35.870051   48791 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:57:35.894735   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:35.894769   48791 leaderelection.go:247] failed to acquire lease default/example
I0215 19:57:35.894790   48791 main.go:151] new leader elected: 1
I0215 19:57:44.532991   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:44.533028   48791 leaderelection.go:247] failed to acquire lease default/example

這裡可以看出來id=1的進程持有鎖,並且運行的程式,而id=2的進程表示無法獲取到鎖,在不斷的進程嘗試。

現在kill掉id=1進程,在等待lock釋放之後(有個LeaseDuration時間),leader變為id=2的進程執行工作

I0215 20:01:41.489300   48791 leaderelection.go:252] successfully acquired lease default/example
I0215 20:01:41.489577   48791 main.go:87] Controller loop...

深入理解

基本原理其實就是利用通過Kubernetes中 configmap , endpoints 或者 lease 資源實現一個分散式鎖,搶(acqure)到鎖的節點成為leader,並且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次迴圈。當leader節點掛掉之後,租約到期,其他節點就成為新的leader。

入口

通過 leaderelection.RunOrDie 啟動,

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}

傳入參數 LeaderElectionConfig :

type LeaderElectionConfig struct {
    // Lock 的類型
    Lock rl.Interface
    //持有鎖的時間
    LeaseDuration time.Duration
    //在更新租約的超時時間
    RenewDeadline time.Duration
    //競爭獲取鎖的時間
    RetryPeriod time.Duration
    //狀態變化時執行的函數,支持三種:
    //1、OnStartedLeading 啟動是執行的業務代碼
    //2、OnStoppedLeading leader停止執行的方法
    //3、OnNewLeader 當產生新的leader後執行的方法
    Callbacks LeaderCallbacks

    //進行監控檢查
    // WatchDog is the associated health checker
    // WatchDog may be null if its not needed/configured.
    WatchDog *HealthzAdaptor
    //leader退出時,是否執行release方法
    ReleaseOnCancel bool
    
    // Name is the name of the resource lock for debugging
    Name string
}

LeaderElectionConfig.lock 支持保存在以下三種資源中:
configmap 
endpoint 
lease 
包中還提供了一個 multilock ,即可以進行選擇兩種,當其中一種保存失敗時,選擇第二張
可以在interface.go中看到:

    switch lockType {
    case EndpointsResourceLock://保存在endpoints
        return endpointsLock, nil
    case ConfigMapsResourceLock://保存在configmaps
        return configmapLock, nil
    case LeasesResourceLock://保存在leases
        return leaseLock, nil
    case EndpointsLeasesResourceLock://優先嘗試保存在endpoint失敗時保存在lease
        return &MultiLock{
            Primary:   endpointsLock,
            Secondary: leaseLock,
        }, nil
    case ConfigMapsLeasesResourceLock://優先嘗試保存在configmap,失敗時保存在lease
        return &MultiLock{
            Primary:   configmapLock,
            Secondary: leaseLock,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }

以lease資源對象為例,可以在查看到保存的內容:

➜  ~ kubectl get lease example -n default -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2020-02-15T11:56:37Z"
  name: example
  namespace: default
  resourceVersion: "210675"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
  uid: a3470a06-6fc3-42dc-8242-9d6cebdf5315
spec:
  acquireTime: "2020-02-15T12:01:41.476971Z"//獲得鎖時間
  holderIdentity: "2"//持有鎖進程的標識
  leaseDurationSeconds: 60//lease租約
  leaseTransitions: 1//leader更換次數
  renewTime: "2020-02-15T12:05:37.134655Z"//更新租約的時間

關註其spec中的欄位,分別進行標註,對應結構體如下:

type LeaderElectionRecord struct {
    HolderIdentity       string      `json:"holderIdentity"`//持有鎖進程的標識,一般可以利用主機名
    LeaseDurationSeconds int         `json:"leaseDurationSeconds"`//  lock的租約
    AcquireTime          metav1.Time `json:"acquireTime"`//持有鎖的時間
    RenewTime            metav1.Time `json:"renewTime"`//更新時間
    LeaderTransitions    int         `json:"leaderTransitions"`//leader更換的次數
}

獲取的鎖以及更新鎖

Run方法中包含了獲取鎖以及更新鎖的入口

// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
    defer func() {
        //進行退出執行
        runtime.HandleCrash()
        //停止時執行回調方法
        le.config.Callbacks.OnStoppedLeading()
    }()
    //不斷的進行獲得鎖,如果獲得鎖成功則執行後面的方法,否則不斷的進行重試
    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    //獲取鎖成功,當前進程變為leader,執行回調函數中的業務代碼
    go le.config.Callbacks.OnStartedLeading(ctx)
    //不斷的迴圈進行進行租約的更新,保證鎖一直被當前進行持有
    le.renew(ctx)
}

le.acquirele.renew 內部都是調用了 le.tryAcquireOrRenew 函數,只是對於返回結果的處理不一樣。

le.acquire 對於 le.tryAcquireOrRenew 返回成功則退出,失敗則繼續。

le.renew 則相反,成功則繼續,失敗則退出。

我們來看看 tryAcquireOrRenew 方法:

func (le *LeaderElector) tryAcquireOrRenew() bool {
    now := metav1.Now()
    //鎖資源對象內容
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),//唯一標識
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. obtain or create the ElectionRecord
    // 第一步:從k8s資源中獲取原有的鎖
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        //資源對象不存在,進行鎖資源創建
        if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        le.observedRecord = leaderElectionRecord
        le.observedTime = le.clock.Now()
        return true
    }

    // 2. Record obtained, check the Identity & Time
    // 第二步,對比存儲在k8s中的鎖資源與上一次獲取的鎖資源是否一致
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.observedRecord = *oldLeaderElectionRecord
        le.observedRawRecord = oldLeaderElectionRawRecord
        le.observedTime = le.clock.Now()
    }
    //判斷持有的鎖是否到期以及是否被自己持有
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() {
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3. We're going to try to update. The leaderElectionRecord is set to it's default
    // here. Let's correct it before updating.
    //第三步:自己現在是leader,但是分兩組情況,上一次也是leader和首次變為leader
    if le.IsLeader() {
        //自己本身就是leader則不需要更新AcquireTime和LeaderTransitions
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else {
        //首次自己變為leader則更新leader的更換次數
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    //更新鎖資源,這裡如果在 Get 和 Update 之間有變化,將會更新失敗
    // update the lock itself
    if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }

    le.observedRecord = leaderElectionRecord
    le.observedTime = le.clock.Now()
    return true
}

在這一步如果發生併發操作怎麼樣?

這裡很重要一點就是利用到了k8s api操作的原子性:

le.config.Lock.Get() 中會獲取到鎖的對象,其中有一個 resourceVersion 欄位用於標識一個資源對象的內部版本,每次更新操作都會更新其值。如果一個更新操作附加上了 resourceVersion 欄位,那麼 apiserver 就會通過驗證當前 resourceVersion 的值與指定的值是否相匹配來確保在此次更新操作周期內沒有其他的更新操作,從而保證了更新操作的原子性。

總結

leaderelection 主要是利用了k8s API操作的原子性實現了一個分散式鎖,在不斷的競爭中進行選舉。選中為leader的進行才會執行具體的業務代碼,這在k8s中非常的常見,而且我們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,比如部署為一個多副本的Deployment,當leader的pod退出後會重新啟動,可能鎖就被其他pod獲取繼續執行。

完整代碼:https://github.com/go-demo/leaderelection

關註"學點程式"公眾號,瞭解更多乾貨內容 學點程式


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、問題剖析 看到這個問題,我想吹水兩句再做推薦。一般發出這個疑問都處在初學編程階段,編程語言都是相通的,只要你領悟了一門語言的“任督二脈”,以後你學哪一門語言都會輕易上手。學語言嘛,當你工作一兩年了,你還真會覺得像當初老師說的那樣,語言只是工具罷了。工作期間,可能要你接觸到其它語言,而且要你能快速 ...
  • 生產者是如何生產消息 如何創建生產者 發送消息到Kafka 生產者配置 分區 ...
  • GUI編程 組建 視窗 彈窗 面板 文本框 列表框 按鈕 圖片 監聽事件 滑鼠 鍵盤事件 破解工具 1、簡介 GUI的核心技術:Swing AWT 為什麼不流行? 界面不美觀。 需要jre環境。(沒必要為一個5M的小游戲下載幾百M的jre) 但是學了java的GUI編程,有助於瞭解MVC架構,瞭解監 ...
  • 字元和字元串是最常用的信息 1:char表示字元 字元常量-兩個單引號中間的字元表示字元常量'A' 2:字元串和String 字元串常量-雙引號中間的字元序列"Java" 字元串常量是String型的實例的引用。String s="ABC"與String s=new String("ABC"); 3 ...
  • javaSE學習筆記(16) 網路編程 基本概念 如今,電腦已經成為人們學習、工作、生活必不可少的工具。我們利用電腦可以和親朋好友網上聊天,也可以玩網游、發郵件等等,這些功能實現都離不開電腦網路。電腦網路實現了不同電腦之間的通信,這必須依靠編寫網路程式來實現。下麵,我們將教大家如何編寫網路 ...
  • 通常,我們在寫java程式的時候,似乎很少關註記憶體分配和垃圾回收的問題。因為,這部分工作,JVM已經幫我們自動實現了。 這樣看起來,好像很美好,但是任何事情都有兩面性。雖然JVM會自動的進行垃圾回收,但是,如果遇到有些問題,JVM自己也處理不了呢? 因此,我們需要瞭解一下JVM垃圾回收是怎樣運作的, ...
  • 併發編程之無鎖 6.2 CAS與volatile 源碼之LongAdder 6.8 Unsafe 6.2 CAS與volatile 其中的關鍵是compareAndSet,它的簡稱就是CAS(也有Compare And Swap的說法),它必須是原子操作。註意其實CAS的底層是lock cmpxch ...
  • 這一篇討論了使用 const 、volatile 和 typedef 使變數更容易地使用 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...