利用Kubernetes中的leaderelection實現組件高可用

来源:https://www.cnblogs.com/silenceper/archive/2020/02/16/12318619.html
-Advertisement-
Play Games

在Kubernetes中,通常kube schduler和kube controller manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工 ...


在Kubernetes中,通常kube-schduler和kube-controller-manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 leaderelection 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工作。

不單單隻是k8s中的這兩個組件用到,在其他服務中也可以看到這個包的使用,比如cluster-autoscaler等都能看得到這個包的,今天就來看看這個包的使用以及它內部是如何實現的。

使用

以下是一個簡單使用的例子,編譯完成之後同時啟動多個進程,但是只有一個進程在工作,當把leader進程kill掉之後,會重新選舉出一個leader進行工作,即執行其中的 run 方法:

/*
例子來源於client-go中的example包中
*/

package main

import (
    "context"
    "flag"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/google/uuid"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
    "k8s.io/klog"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
    if kubeconfig != "" {
        cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
        if err != nil {
            return nil, err
        }
        return cfg, nil
    }

    cfg, err := rest.InClusterConfig()
    if err != nil {
        return nil, err
    }
    return cfg, nil
}

func main() {
    klog.InitFlags(nil)

    var kubeconfig string
    var leaseLockName string
    var leaseLockNamespace string
    var id string

    flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
    flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
    flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
    flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
    flag.Parse()

    if leaseLockName == "" {
        klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
    }
    if leaseLockNamespace == "" {
        klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
    }

    // leader election uses the Kubernetes API by writing to a
    // lock object, which can be a LeaseLock object (preferred),
    // a ConfigMap, or an Endpoints (deprecated) object.
    // Conflicting writes are detected and each client handles those actions
    // independently.
    config, err := buildConfig(kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }
    client := clientset.NewForConfigOrDie(config)

    run := func(ctx context.Context) {
        // complete your controller loop here
        klog.Info("Controller loop...")

        select {}
    }

    // use a Go context so we can tell the leaderelection code when we
    // want to step down
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // listen for interrupts or the Linux SIGTERM signal and cancel
    // our context, which the leader election code will observe and
    // step down
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-ch
        klog.Info("Received termination, signaling shutdown")
        cancel()
    }()

    // we use the Lease lock type since edits to Leases are less common
    // and fewer objects in the cluster watch "all Leases".
    // 指定鎖的資源對象,這裡使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用)
    lock := &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      leaseLockName,
            Namespace: leaseLockNamespace,
        },
        Client: client.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: id,
        },
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: lock,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,//租約時間
        RenewDeadline:   15 * time.Second,//更新租約的
        RetryPeriod:     5 * time.Second,//非leader節點重試時間
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                //變為leader執行的業務代碼
                // we're notified when we start - this is where you would
                // usually put your code
                run(ctx)
            },
            OnStoppedLeading: func() {
                 // 進程退出
                // we can do cleanup here
                klog.Infof("leader lost: %s", id)
                os.Exit(0)
            },
            OnNewLeader: func(identity string) {
                //當產生新的leader後執行的方法
                // we're notified when new leader elected
                if identity == id {
                    // I just got the lock
                    return
                }
                klog.Infof("new leader elected: %s", identity)
            },
        },
    })
}

關鍵啟動參數說明:

kubeconfig: 指定kubeconfig文件地址
lease-lock-name:指定lock的名稱
lease-lock-namespace:指定lock存儲的namespace
id: 例子中提供的區別參數,用於區分實例
logtostderr:klog提供的參數,指定log輸出到控制台
v: 指定日誌輸出級別

同時啟動兩個進程:
啟動進程1

go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0215 19:56:37.049658   48045 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:56:37.080368   48045 leaderelection.go:252] successfully acquired lease default/example
I0215 19:56:37.080437   48045 main.go:87] Controller loop...

啟動進程2:

➜  leaderelection git:(master) ✗ go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
I0215 19:57:35.870051   48791 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:57:35.894735   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:35.894769   48791 leaderelection.go:247] failed to acquire lease default/example
I0215 19:57:35.894790   48791 main.go:151] new leader elected: 1
I0215 19:57:44.532991   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:44.533028   48791 leaderelection.go:247] failed to acquire lease default/example

這裡可以看出來id=1的進程持有鎖,並且運行的程式,而id=2的進程表示無法獲取到鎖,在不斷的進程嘗試。

現在kill掉id=1進程,在等待lock釋放之後(有個LeaseDuration時間),leader變為id=2的進程執行工作

I0215 20:01:41.489300   48791 leaderelection.go:252] successfully acquired lease default/example
I0215 20:01:41.489577   48791 main.go:87] Controller loop...

深入理解

基本原理其實就是利用通過Kubernetes中 configmap , endpoints 或者 lease 資源實現一個分散式鎖,搶(acqure)到鎖的節點成為leader,並且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次迴圈。當leader節點掛掉之後,租約到期,其他節點就成為新的leader。

入口

通過 leaderelection.RunOrDie 啟動,

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}

傳入參數 LeaderElectionConfig :

type LeaderElectionConfig struct {
    // Lock 的類型
    Lock rl.Interface
    //持有鎖的時間
    LeaseDuration time.Duration
    //在更新租約的超時時間
    RenewDeadline time.Duration
    //競爭獲取鎖的時間
    RetryPeriod time.Duration
    //狀態變化時執行的函數,支持三種:
    //1、OnStartedLeading 啟動是執行的業務代碼
    //2、OnStoppedLeading leader停止執行的方法
    //3、OnNewLeader 當產生新的leader後執行的方法
    Callbacks LeaderCallbacks

    //進行監控檢查
    // WatchDog is the associated health checker
    // WatchDog may be null if its not needed/configured.
    WatchDog *HealthzAdaptor
    //leader退出時,是否執行release方法
    ReleaseOnCancel bool
    
    // Name is the name of the resource lock for debugging
    Name string
}

LeaderElectionConfig.lock 支持保存在以下三種資源中:
configmap 
endpoint 
lease 
包中還提供了一個 multilock ,即可以進行選擇兩種,當其中一種保存失敗時,選擇第二張
可以在interface.go中看到:

    switch lockType {
    case EndpointsResourceLock://保存在endpoints
        return endpointsLock, nil
    case ConfigMapsResourceLock://保存在configmaps
        return configmapLock, nil
    case LeasesResourceLock://保存在leases
        return leaseLock, nil
    case EndpointsLeasesResourceLock://優先嘗試保存在endpoint失敗時保存在lease
        return &MultiLock{
            Primary:   endpointsLock,
            Secondary: leaseLock,
        }, nil
    case ConfigMapsLeasesResourceLock://優先嘗試保存在configmap,失敗時保存在lease
        return &MultiLock{
            Primary:   configmapLock,
            Secondary: leaseLock,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }

以lease資源對象為例,可以在查看到保存的內容:

➜  ~ kubectl get lease example -n default -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2020-02-15T11:56:37Z"
  name: example
  namespace: default
  resourceVersion: "210675"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
  uid: a3470a06-6fc3-42dc-8242-9d6cebdf5315
spec:
  acquireTime: "2020-02-15T12:01:41.476971Z"//獲得鎖時間
  holderIdentity: "2"//持有鎖進程的標識
  leaseDurationSeconds: 60//lease租約
  leaseTransitions: 1//leader更換次數
  renewTime: "2020-02-15T12:05:37.134655Z"//更新租約的時間

關註其spec中的欄位,分別進行標註,對應結構體如下:

type LeaderElectionRecord struct {
    HolderIdentity       string      `json:"holderIdentity"`//持有鎖進程的標識,一般可以利用主機名
    LeaseDurationSeconds int         `json:"leaseDurationSeconds"`//  lock的租約
    AcquireTime          metav1.Time `json:"acquireTime"`//持有鎖的時間
    RenewTime            metav1.Time `json:"renewTime"`//更新時間
    LeaderTransitions    int         `json:"leaderTransitions"`//leader更換的次數
}

獲取的鎖以及更新鎖

Run方法中包含了獲取鎖以及更新鎖的入口

// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
    defer func() {
        //進行退出執行
        runtime.HandleCrash()
        //停止時執行回調方法
        le.config.Callbacks.OnStoppedLeading()
    }()
    //不斷的進行獲得鎖,如果獲得鎖成功則執行後面的方法,否則不斷的進行重試
    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    //獲取鎖成功,當前進程變為leader,執行回調函數中的業務代碼
    go le.config.Callbacks.OnStartedLeading(ctx)
    //不斷的迴圈進行進行租約的更新,保證鎖一直被當前進行持有
    le.renew(ctx)
}

le.acquirele.renew 內部都是調用了 le.tryAcquireOrRenew 函數,只是對於返回結果的處理不一樣。

le.acquire 對於 le.tryAcquireOrRenew 返回成功則退出,失敗則繼續。

le.renew 則相反,成功則繼續,失敗則退出。

我們來看看 tryAcquireOrRenew 方法:

func (le *LeaderElector) tryAcquireOrRenew() bool {
    now := metav1.Now()
    //鎖資源對象內容
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),//唯一標識
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. obtain or create the ElectionRecord
    // 第一步:從k8s資源中獲取原有的鎖
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        //資源對象不存在,進行鎖資源創建
        if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        le.observedRecord = leaderElectionRecord
        le.observedTime = le.clock.Now()
        return true
    }

    // 2. Record obtained, check the Identity & Time
    // 第二步,對比存儲在k8s中的鎖資源與上一次獲取的鎖資源是否一致
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.observedRecord = *oldLeaderElectionRecord
        le.observedRawRecord = oldLeaderElectionRawRecord
        le.observedTime = le.clock.Now()
    }
    //判斷持有的鎖是否到期以及是否被自己持有
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() {
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3. We're going to try to update. The leaderElectionRecord is set to it's default
    // here. Let's correct it before updating.
    //第三步:自己現在是leader,但是分兩組情況,上一次也是leader和首次變為leader
    if le.IsLeader() {
        //自己本身就是leader則不需要更新AcquireTime和LeaderTransitions
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else {
        //首次自己變為leader則更新leader的更換次數
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    //更新鎖資源,這裡如果在 Get 和 Update 之間有變化,將會更新失敗
    // update the lock itself
    if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }

    le.observedRecord = leaderElectionRecord
    le.observedTime = le.clock.Now()
    return true
}

在這一步如果發生併發操作怎麼樣?

這裡很重要一點就是利用到了k8s api操作的原子性:

le.config.Lock.Get() 中會獲取到鎖的對象,其中有一個 resourceVersion 欄位用於標識一個資源對象的內部版本,每次更新操作都會更新其值。如果一個更新操作附加上了 resourceVersion 欄位,那麼 apiserver 就會通過驗證當前 resourceVersion 的值與指定的值是否相匹配來確保在此次更新操作周期內沒有其他的更新操作,從而保證了更新操作的原子性。

總結

leaderelection 主要是利用了k8s API操作的原子性實現了一個分散式鎖,在不斷的競爭中進行選舉。選中為leader的進行才會執行具體的業務代碼,這在k8s中非常的常見,而且我們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,比如部署為一個多副本的Deployment,當leader的pod退出後會重新啟動,可能鎖就被其他pod獲取繼續執行。

完整代碼:https://github.com/go-demo/leaderelection

關註"學點程式"公眾號,瞭解更多乾貨內容 學點程式


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、問題剖析 看到這個問題,我想吹水兩句再做推薦。一般發出這個疑問都處在初學編程階段,編程語言都是相通的,只要你領悟了一門語言的“任督二脈”,以後你學哪一門語言都會輕易上手。學語言嘛,當你工作一兩年了,你還真會覺得像當初老師說的那樣,語言只是工具罷了。工作期間,可能要你接觸到其它語言,而且要你能快速 ...
  • 生產者是如何生產消息 如何創建生產者 發送消息到Kafka 生產者配置 分區 ...
  • GUI編程 組建 視窗 彈窗 面板 文本框 列表框 按鈕 圖片 監聽事件 滑鼠 鍵盤事件 破解工具 1、簡介 GUI的核心技術:Swing AWT 為什麼不流行? 界面不美觀。 需要jre環境。(沒必要為一個5M的小游戲下載幾百M的jre) 但是學了java的GUI編程,有助於瞭解MVC架構,瞭解監 ...
  • 字元和字元串是最常用的信息 1:char表示字元 字元常量-兩個單引號中間的字元表示字元常量'A' 2:字元串和String 字元串常量-雙引號中間的字元序列"Java" 字元串常量是String型的實例的引用。String s="ABC"與String s=new String("ABC"); 3 ...
  • javaSE學習筆記(16) 網路編程 基本概念 如今,電腦已經成為人們學習、工作、生活必不可少的工具。我們利用電腦可以和親朋好友網上聊天,也可以玩網游、發郵件等等,這些功能實現都離不開電腦網路。電腦網路實現了不同電腦之間的通信,這必須依靠編寫網路程式來實現。下麵,我們將教大家如何編寫網路 ...
  • 通常,我們在寫java程式的時候,似乎很少關註記憶體分配和垃圾回收的問題。因為,這部分工作,JVM已經幫我們自動實現了。 這樣看起來,好像很美好,但是任何事情都有兩面性。雖然JVM會自動的進行垃圾回收,但是,如果遇到有些問題,JVM自己也處理不了呢? 因此,我們需要瞭解一下JVM垃圾回收是怎樣運作的, ...
  • 併發編程之無鎖 6.2 CAS與volatile 源碼之LongAdder 6.8 Unsafe 6.2 CAS與volatile 其中的關鍵是compareAndSet,它的簡稱就是CAS(也有Compare And Swap的說法),它必須是原子操作。註意其實CAS的底層是lock cmpxch ...
  • 這一篇討論了使用 const 、volatile 和 typedef 使變數更容易地使用 ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...