如果多個客戶端同時請求修改同一個kubernetes資源,那麼很有可能收到apiserver返回失敗,本篇就來分析並復現這個問題,然後再實戰client-go官方的解決手段 ...
歡迎訪問我的GitHub
這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos
本篇概覽
- 本文是《client-go實戰》系列的第七篇,來瞭解一個常見的錯誤:版本衝突,以及client-go官方推薦的處理方式
- 本篇由以下部分組成
- 什麼是版本衝突(from kubernetes官方)
- 編碼,復現版本衝突
- 版本衝突的解決思路(from kubernetes官方)
- 版本衝突的實際解決手段(from client-go官方)
- 編碼,演示如何解決版本衝突
- 自定義入參,對抗更高的併發
什麼是版本衝突(from kubernetes官方)
- 簡單的說,就是同時出現多個修改請求,針對同一個kubernetes資源的時候,會出現一個請求成功其餘請求都失敗的情況
- 這裡有kubernetes官方對版本衝突的描述:https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
- 以下是個人的理解
- 首先,在邏輯上來說,提交衝突是肯定存在的,多人同時獲取到同一個資源的信息(例如同一個pod),然後各自在本地修改後提交,就有可能出現A的提交把B的提交覆蓋的情況,這一個點就不展開了,資料庫的樂觀鎖和悲觀鎖都可以用來處理併發衝突
- kubernetes應對提交衝突的方式是資源版本號,屬於樂觀鎖類型(Kubernetes leverages the concept of resource versions to achieve optimistic concurrency)
- 基於版本實現併發控制是常見套路,放在kubernetes也是一樣,基本原理如下圖所示,按照序號看一遍即可理解:左右兩人從後臺拿到的資源都是1.0版本,然而右側提交的1.1的時候,伺服器上已經被左側更新到1.1了,於是伺服器不接受右側提交
編碼,復現版本衝突
- 接下來,咱們將上述衝突用代碼復現出來,具體的功能如下
- 創建一個deployment資源,該資源帶有一個label,名為biz-version,值為101
- 啟動5個協程,每個協程都做同樣的事情:讀取deployment,得到label的值後,加一,再提交保存
- 正常情況下,label的值被累加了5次,那麼最終的值應該等於101+5=106
- 等5個協程都執行完畢後,再讀讀取一次deployment,看label值是都等於106
- 接下來就寫代碼實現上述功能
- 為了後續文章的實戰代碼能統一管理,這裡繼續使用前文《client-go實戰之七:準備一個工程管理後續實戰的代碼
》創建的client-go-tutorials工程,將代碼寫在這個工程中 - 在client-go-tutorials工程中新增名為的conflict.go的文件,整個工程結構如下圖所示
$ tree client-go-tutorials
client-go-tutorials
├── action
│ ├── action.go
│ ├── conflict.go
│ └── list_pod.go
├── client-go-tutorials
├── go.mod
├── go.sum
└── main.go
- 接下來的代碼都寫在conflict.go中
- 首先是新增兩個常量
const (
// deployment的名稱
DP_NAME string = "demo-deployment"
// 用於更新的標簽的名字
LABEL_CUSTOMIZE string = "biz-version"
)
- 然後是輔助方法,返回32位整型的指針,後面會用到
func int32Ptr(i int32) *int32 { return &i }
- 創建deployment的方法,要註意的是增加了一個label,名為LABEL_CUSTOMIZE,其值為101
// 創建deployment
func create(clientset *kubernetes.Clientset) error {
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: DP_NAME,
Labels: map[string]string{LABEL_CUSTOMIZE: "101"},
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "web",
Image: "nginx:1.12",
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
return err
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
return nil
}
- 按照名稱刪除deployment的方法,實戰的最後會調用,將deployment清理掉
// 按照名稱刪除
func delete(clientset *kubernetes.Clientset, name string) error {
deletePolicy := metav1.DeletePropagationBackground
err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
return err
}
return nil
}
- 再封裝一個get方法,用於所有更新操作完成後,獲取最新的deployment,檢查其label值是否符合預期
// 按照名稱查找deployment
func get(clientset *kubernetes.Clientset, name string) (*v1.Deployment, error) {
deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return deployment, nil
}
- 接下來是最重要的更新方法,這裡用的是常見的先查詢再更新的方式,查詢deployment,取得標簽值之後加一再提交保存
// 查詢指定名稱的deployment對象,得到其名為biz-version的label,加一後保存
func updateByGetAndUpdate(clientset *kubernetes.Clientset, name string) error {
deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
// 取出當前值
currentVal, ok := deployment.Labels[LABEL_CUSTOMIZE]
if !ok {
return errors.New("未取得自定義標簽")
}
// 將字元串類型轉為int型
val, err := strconv.Atoi(currentVal)
if err != nil {
fmt.Println("取得了無效的標簽,重新賦初值")
currentVal = "101"
}
// 將int型的label加一,再轉為字元串
deployment.Labels[LABEL_CUSTOMIZE] = strconv.Itoa(val + 1)
_, err = clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Update(context.TODO(), deployment, metav1.UpdateOptions{})
return err
}
- 最後,是主流程代碼,為了能在現有工程框架下運行,這裡新增一個struct,並實現了action介面的DoAction方法,這個DoAction方法中就是主流程
type Confilct struct{}
func (conflict Confilct) DoAction(clientset *kubernetes.Clientset) error {
fmt.Println("開始創建deployment")
// 開始創建deployment
err := create(clientset)
if err != nil {
return err
}
// 如果不延時,就會導致下麵的更新過早,會報錯
<-time.NewTimer(1 * time.Second).C
// 一旦創建成功,就一定到刪除再返回
defer delete(clientset, DP_NAME)
testNum := 5
waitGroup := sync.WaitGroup{}
waitGroup.Add(testNum)
fmt.Println("在協程中併發更新自定義標簽")
startTime := time.Now().UnixMilli()
for i := 0; i < testNum; i++ {
go func(clientsetA *kubernetes.Clientset, index int) {
// 避免進程卡死
defer waitGroup.Done()
err := updateByGetAndUpdate(clientsetA, DP_NAME)
// var retryParam = wait.Backoff{
// Steps: 5,
// Duration: 10 * time.Millisecond,
// Factor: 1.0,
// Jitter: 0.1,
// }
// err := retry.RetryOnConflict(retryParam, func() error {
// return updateByGetAndUpdate(clientset, DP_NAME)
// })
if err != nil {
fmt.Printf("err: %v\n", err)
}
}(clientset, i)
}
// 等待協程完成全部操作
waitGroup.Wait()
// 再查一下,自定義標簽的最終值
deployment, err := get(clientset, DP_NAME)
if err != nil {
fmt.Printf("查詢deployment發生異常: %v\n", err)
return err
}
fmt.Printf("自定義標簽的最終值為: %v,耗時%v毫秒\n", deployment.Labels[LABEL_CUSTOMIZE], time.Now().UnixMilli()-startTime)
return nil
}
- 最後還要修改main.go,增加一個action的處理,新增的內容如下
- 這裡給出完整main.go
package main
import (
"client-go-tutorials/action"
"flag"
"fmt"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
var actionFlag *string
// 試圖取到當前賬號的家目錄
if home := homedir.HomeDir(); home != "" {
// 如果能取到,就把家目錄下的.kube/config作為預設配置文件
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
// 如果取不到,就沒有預設配置文件,必須通過kubeconfig參數來指定
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
actionFlag = flag.String("action", "list-pod", "指定實際操作功能")
flag.Parse()
fmt.Println("解析命令完畢,開始載入配置文件")
// 載入配置文件
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 用clientset類來執行後續的查詢操作
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
fmt.Printf("載入配置文件完畢,即將執行業務 [%v]\n", *actionFlag)
var actionInterface action.Action
// 註意,如果有新的功能類實現,就在這裡添加對應的處理
switch *actionFlag {
case "list-pod":
listPod := action.ListPod{}
actionInterface = &listPod
case "conflict":
conflict := action.Confilct{}
actionInterface = &conflict
}
err = actionInterface.DoAction(clientset)
if err != nil {
fmt.Printf("err: %v\n", err)
} else {
fmt.Println("執行完成")
}
}
- 最後,如果您用的是vscode,可以修改launch.json,調整輸入參數
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}",
"args": ["-action=conflict"]
}
]
}
-
回顧上面的代碼,您會發現是5個協程並行執行先查詢再修改提交的邏輯,理論上會出現前面提到的衝突問題,5個協程併發更新,會出現併發衝突,因此最終標簽的值是小於101+5=106的,咱們來運行代碼試試
-
果然,經過更新後,lable的最終值等於102,也就是說過5個協程同時提交,只成功了一個
-
至此,咱們通過代碼證明瞭資源版本衝突問題確實存在,接下來就要想辦法解決此問題了
版本衝突的解決思路(from kubernetes官方)
- 來看看kubernetes的官方對於處理此問題是如何建議的,下麵是官方原話
In the case of a conflict, the correct client action at this point is to GET the resource again, apply the changes afresh, and try submitting again
- 很明顯,在更新因為版本衝突而失敗的時候,官方建議重新獲取最新版本的資源,然後再次修改並提交
- 聽起來很像CAS
- 在前面復現失敗的場景,如果是5個協程併發提交,總有一個會失敗多次,那豈不是要反覆重試,把代碼變得更複雜?
- 還好,client-go幫我們解決了這個問題,按照kubernetes官方的指導方向,將重試邏輯進行了封裝,讓使用者可以很方便的實現完成失敗重試
版本衝突的實際解決手段(from client-go官方)
- client-go提供的是方法,下麵是該方法的源碼
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return OnError(backoff, errors.IsConflict, fn)
}
- 從上述方法有兩個入參,backoff用於控制重試相關的細節,如重試次數、間隔時間等,fn則是常規的先查詢再更新的自定義方法,由調用方根據自己的業務自行實現,總之,只要fn返回錯誤,並且該錯誤是可以通過重試來解決的,RetryOnConflict方法就會按照backoff的配置進行等待和重試
- 可見經過client-go的封裝,對應普通開發者來說已經無需關註重試的實現了,只要調用RetryOnConflict即可確保版本衝突問題會被解決
- 接下來咱們改造前面有問題的代碼,看看能否解決併發衝突的問題
編碼,演示如何解決版本衝突
- 改成client-go提供的自動重試代碼,整體改動很小,如下圖所示,原來是直接調用updateByGetAndUpdate方法,現在註釋掉,改為調用RetryOnConflict,並且將updateByGetAndUpdate作為入參使用
- 再次運行代碼,如下圖,這次五個協程都更新成功了,不過耗時也更長,畢竟是靠著重試來實現最終提交成功的
自定義入參,對抗更高的併發
- 前面的驗證過程中,併發數被設置為5,現在加大一些試試,改成10,如下圖紅色箭頭位置
- 執行結果如下圖所示,10個併發請求,只成功了5個,其餘5個就算重試也還是失敗了
- 出現這樣的問題,原因很明顯:下麵是咱們調用方法時的入參,每個併發請求最多重試5次,顯然即便是重試5次,也只能確保每一次有個協程提交成功,所以5次過後沒有重試機會,導致只成功了5個
var retryParam = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
- 找到了原因就好處理了,把上面的Steps參數調大,改為10,再試試
- 如下圖,這一次結果符合預期,不過耗時更長了
- 最後留下一個問題:Steps參數到底該設置成多少呢?這個當然沒有固定值了,5是client-go官方推薦的值,結果在併發為10的時候依然不夠用,所以具體該設置成多少還是要依照您的實際情況來決定,需要大於最大的瞬間併發數,才能保證所有併發衝突都能通過重試解決,當然了,實際場景中,大量併發同時修改同一個資源對象的情況並不多見,所以大多數時候可以直接使用client-go官方的推薦值
- 至此,kubernetes資源更新時的版本衝突問題,經過實戰咱們都已經瞭解了,並且掌握瞭解決方法,基本的增刪改查算是沒問題了,接下來的文章,咱們要聚焦的是client-go另一個極其重要的能力:List&Watch
- 敬請期待,欣宸原創必不會辜負您
源碼下載
- 上述完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 鏈接 | 備註 |
---|---|---|
項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,本篇的源碼在tutorials/client-go-tutorials文件夾下,如下圖紅框所示: