在Kubernetes中,通常kube schduler和kube controller manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工 ...
在Kubernetes中,通常kube-schduler和kube-controller-manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這裡就利用到 leaderelection
的選主機制,保證leader是處於工作狀態,並且在leader掛掉之後,從其他節點選取新的leader保證組件正常工作。
不單單隻是k8s中的這兩個組件用到,在其他服務中也可以看到這個包的使用,比如cluster-autoscaler等都能看得到這個包的,今天就來看看這個包的使用以及它內部是如何實現的。
使用
以下是一個簡單使用的例子,編譯完成之後同時啟動多個進程,但是只有一個進程在工作,當把leader進程kill掉之後,會重新選舉出一個leader進行工作,即執行其中的 run
方法:
/*
例子來源於client-go中的example包中
*/
package main
import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
)
func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return cfg, nil
}
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return cfg, nil
}
func main() {
klog.InitFlags(nil)
var kubeconfig string
var leaseLockName string
var leaseLockNamespace string
var id string
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
flag.Parse()
if leaseLockName == "" {
klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
}
if leaseLockNamespace == "" {
klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
}
// leader election uses the Kubernetes API by writing to a
// lock object, which can be a LeaseLock object (preferred),
// a ConfigMap, or an Endpoints (deprecated) object.
// Conflicting writes are detected and each client handles those actions
// independently.
config, err := buildConfig(kubeconfig)
if err != nil {
klog.Fatal(err)
}
client := clientset.NewForConfigOrDie(config)
run := func(ctx context.Context) {
// complete your controller loop here
klog.Info("Controller loop...")
select {}
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
// 指定鎖的資源對象,這裡使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用)
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,//租約時間
RenewDeadline: 15 * time.Second,//更新租約的
RetryPeriod: 5 * time.Second,//非leader節點重試時間
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
//變為leader執行的業務代碼
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// 進程退出
// we can do cleanup here
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
//當產生新的leader後執行的方法
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
關鍵啟動參數說明:
kubeconfig: 指定kubeconfig文件地址
lease-lock-name:指定lock的名稱
lease-lock-namespace:指定lock存儲的namespace
id: 例子中提供的區別參數,用於區分實例
logtostderr:klog提供的參數,指定log輸出到控制台
v: 指定日誌輸出級別
同時啟動兩個進程:
啟動進程1:
go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0215 19:56:37.049658 48045 leaderelection.go:242] attempting to acquire leader lease default/example...
I0215 19:56:37.080368 48045 leaderelection.go:252] successfully acquired lease default/example
I0215 19:56:37.080437 48045 main.go:87] Controller loop...
啟動進程2:
➜ leaderelection git:(master) ✗ go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
I0215 19:57:35.870051 48791 leaderelection.go:242] attempting to acquire leader lease default/example...
I0215 19:57:35.894735 48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:35.894769 48791 leaderelection.go:247] failed to acquire lease default/example
I0215 19:57:35.894790 48791 main.go:151] new leader elected: 1
I0215 19:57:44.532991 48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:44.533028 48791 leaderelection.go:247] failed to acquire lease default/example
這裡可以看出來id=1的進程持有鎖,並且運行的程式,而id=2的進程表示無法獲取到鎖,在不斷的進程嘗試。
現在kill掉id=1進程,在等待lock釋放之後(有個LeaseDuration時間),leader變為id=2的進程執行工作
I0215 20:01:41.489300 48791 leaderelection.go:252] successfully acquired lease default/example
I0215 20:01:41.489577 48791 main.go:87] Controller loop...
深入理解
基本原理其實就是利用通過Kubernetes中 configmap
, endpoints
或者 lease
資源實現一個分散式鎖,搶(acqure)到鎖的節點成為leader,並且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次迴圈。當leader節點掛掉之後,租約到期,其他節點就成為新的leader。
入口
通過 leaderelection.RunOrDie
啟動,
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}
傳入參數 LeaderElectionConfig
:
type LeaderElectionConfig struct {
// Lock 的類型
Lock rl.Interface
//持有鎖的時間
LeaseDuration time.Duration
//在更新租約的超時時間
RenewDeadline time.Duration
//競爭獲取鎖的時間
RetryPeriod time.Duration
//狀態變化時執行的函數,支持三種:
//1、OnStartedLeading 啟動是執行的業務代碼
//2、OnStoppedLeading leader停止執行的方法
//3、OnNewLeader 當產生新的leader後執行的方法
Callbacks LeaderCallbacks
//進行監控檢查
// WatchDog is the associated health checker
// WatchDog may be null if its not needed/configured.
WatchDog *HealthzAdaptor
//leader退出時,是否執行release方法
ReleaseOnCancel bool
// Name is the name of the resource lock for debugging
Name string
}
LeaderElectionConfig.lock
支持保存在以下三種資源中:configmap
endpoint
lease
包中還提供了一個 multilock
,即可以進行選擇兩種,當其中一種保存失敗時,選擇第二張
可以在interface.go中看到:
switch lockType {
case EndpointsResourceLock://保存在endpoints
return endpointsLock, nil
case ConfigMapsResourceLock://保存在configmaps
return configmapLock, nil
case LeasesResourceLock://保存在leases
return leaseLock, nil
case EndpointsLeasesResourceLock://優先嘗試保存在endpoint失敗時保存在lease
return &MultiLock{
Primary: endpointsLock,
Secondary: leaseLock,
}, nil
case ConfigMapsLeasesResourceLock://優先嘗試保存在configmap,失敗時保存在lease
return &MultiLock{
Primary: configmapLock,
Secondary: leaseLock,
}, nil
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
以lease資源對象為例,可以在查看到保存的內容:
➜ ~ kubectl get lease example -n default -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
creationTimestamp: "2020-02-15T11:56:37Z"
name: example
namespace: default
resourceVersion: "210675"
selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
uid: a3470a06-6fc3-42dc-8242-9d6cebdf5315
spec:
acquireTime: "2020-02-15T12:01:41.476971Z"//獲得鎖時間
holderIdentity: "2"//持有鎖進程的標識
leaseDurationSeconds: 60//lease租約
leaseTransitions: 1//leader更換次數
renewTime: "2020-02-15T12:05:37.134655Z"//更新租約的時間
關註其spec中的欄位,分別進行標註,對應結構體如下:
type LeaderElectionRecord struct {
HolderIdentity string `json:"holderIdentity"`//持有鎖進程的標識,一般可以利用主機名
LeaseDurationSeconds int `json:"leaseDurationSeconds"`// lock的租約
AcquireTime metav1.Time `json:"acquireTime"`//持有鎖的時間
RenewTime metav1.Time `json:"renewTime"`//更新時間
LeaderTransitions int `json:"leaderTransitions"`//leader更換的次數
}
獲取的鎖以及更新鎖
Run方法中包含了獲取鎖以及更新鎖的入口
// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
defer func() {
//進行退出執行
runtime.HandleCrash()
//停止時執行回調方法
le.config.Callbacks.OnStoppedLeading()
}()
//不斷的進行獲得鎖,如果獲得鎖成功則執行後面的方法,否則不斷的進行重試
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//獲取鎖成功,當前進程變為leader,執行回調函數中的業務代碼
go le.config.Callbacks.OnStartedLeading(ctx)
//不斷的迴圈進行進行租約的更新,保證鎖一直被當前進行持有
le.renew(ctx)
}
le.acquire
和 le.renew
內部都是調用了 le.tryAcquireOrRenew
函數,只是對於返回結果的處理不一樣。
le.acquire
對於 le.tryAcquireOrRenew
返回成功則退出,失敗則繼續。
le.renew
則相反,成功則繼續,失敗則退出。
我們來看看 tryAcquireOrRenew
方法:
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
//鎖資源對象內容
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),//唯一標識
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 第一步:從k8s資源中獲取原有的鎖
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
//資源對象不存在,進行鎖資源創建
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 2. Record obtained, check the Identity & Time
// 第二步,對比存儲在k8s中的鎖資源與上一次獲取的鎖資源是否一致
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedRawRecord = oldLeaderElectionRawRecord
le.observedTime = le.clock.Now()
}
//判斷持有的鎖是否到期以及是否被自己持有
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
//第三步:自己現在是leader,但是分兩組情況,上一次也是leader和首次變為leader
if le.IsLeader() {
//自己本身就是leader則不需要更新AcquireTime和LeaderTransitions
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
//首次自己變為leader則更新leader的更換次數
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
//更新鎖資源,這裡如果在 Get 和 Update 之間有變化,將會更新失敗
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
在這一步如果發生併發操作怎麼樣?
這裡很重要一點就是利用到了k8s api操作的原子性:
在 le.config.Lock.Get()
中會獲取到鎖的對象,其中有一個 resourceVersion
欄位用於標識一個資源對象的內部版本,每次更新操作都會更新其值。如果一個更新操作附加上了 resourceVersion
欄位,那麼 apiserver 就會通過驗證當前 resourceVersion
的值與指定的值是否相匹配來確保在此次更新操作周期內沒有其他的更新操作,從而保證了更新操作的原子性。
總結
leaderelection 主要是利用了k8s API操作的原子性實現了一個分散式鎖,在不斷的競爭中進行選舉。選中為leader的進行才會執行具體的業務代碼,這在k8s中非常的常見,而且我們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,比如部署為一個多副本的Deployment,當leader的pod退出後會重新啟動,可能鎖就被其他pod獲取繼續執行。
完整代碼:https://github.com/go-demo/leaderelection
關註"學點程式"公眾號,瞭解更多乾貨內容