> 本文首發於公眾號:Hunter後端 > 原文鏈接:[Python連接es筆記四之創建和刪除操作](https://mp.weixin.qq.com/s/ZCe0JT9TDEiZI7M5dxC9qA) 這一篇筆記介紹一下索引和數據的創建和刪除。 其實對於索引來說,如果可以接觸到 kibana 的話 ...
1. 簡介
本文將介紹 Go 語言中的 Weighted
併發原語,包括 Weighted
的基本使用方法、實現原理、使用註意事項等內容。能夠更好地理解和應用 Weighted
來實現資源的管理,從而提高程式的穩定性。
2. 問題引入
在微服務架構中,我們的服務節點負責接收其他節點的請求,並提供相應的功能和數據。比如賬戶服務,其他服務需要獲取賬戶信息,都會通過rpc請求向賬戶服務發起請求。
這些服務節點通常以集群的方式部署在伺服器上,用於處理大量的併發請求。每個伺服器都有其處理能力的上限,超過該上限可能導致性能下降甚至崩潰。
在部署服務時,通常會評估服務的併發量,併為其分配適當的資源以處理預期的請求負載。然而,在微服務架構中,存在著上游服務請求下游服務的場景。如果上游服務在某些情況下沒有正確考慮併發量,或者由於某些異常情況導致大量請求發送給下游服務,那麼下游服務可能面臨超過其處理能力的問題。這可能導致下游服務的響應時間增加,甚至無法正常處理請求,進而影響整個系統的穩定性和可用性。下麵用一個簡單的代碼來說明一下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
// 啟動下游服務,用於處理請求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 模擬下游服務的處理邏輯
// ...
// 完成請求處理後,從等待組中刪除一個等待
wg.Done()
})
// 啟動下游服務的 HTTP 伺服器
http.ListenAndServe(":8080", nil)
}
這裡啟動一個簡單的HTTP伺服器,由其來模擬下游服務,來接收上游服務的請求。下麵我們啟動一個簡單的程式,由其來模擬上游服務發送請求:
func main() {
// 創建一個等待組,用於等待所有請求完成
var wg sync.WaitGroup
// 模擬上游服務發送大量請求給下游服務
go func() {
for i := 0; i < 1000000; i++ {
wg.Add(1)
go sendRequest(&wg)
}
}()
// 等待所有請求完成
wg.Wait()
}
func sendRequest(wg *sync.WaitGroup) {
// 模擬上游服務發送請求給下游服務
resp, err := http.Get("http://localhost:8080/")
if err != nil {
fmt.Println("請求失敗:", err)
} else {
fmt.Println("請求成功:", resp.Status)
}
// 請求完成後,通知等待組
wg.Done()
}
這裡,我們同時啟動了1000000個協程同時往HTTP伺服器發送請求,如果伺服器配置不夠高,亦或者是請求量更多的情況下,已經超過了伺服器的處理上限,伺服器沒有主夠的資源去處理這些請求,此時將有可能直接將伺服器打掛掉,服務直接不可用。在這種情況下,如果由於上游服務的問題,導致下游服務,甚至整個鏈路的系統都直接崩潰,這個是不合理的,此時需要有一些手段保護下游服務由於異常流量導致整個系統的崩潰。
這裡對上面的場景進行分析,可以發現,此時是由於上游服務大量請求的過來,而當前服務並沒有足夠的資源去處理這些請求,但是並沒有對其加以限制,而是繼續處理,最終導致了整個系統的不可用。那麼此時就應該進行限流,對併發請求量進行控制,對伺服器能夠處理的併發數進行合理評估,當併發請求數超過了限制,此時應該直接拒絕其訪問,避免整個系統的不可用。
那問題來了,go語言中,有什麼方法能夠實現資源的管理,如果沒有足夠的資源,此時將直接返回,不對請求進行處理呢?其實go語言中有Weighted
類型,在這種場景還挺合適的。下麵我們將對其進行介紹。
3. 基本使用
3.1 基本介紹
Weighted
是 Go 語言中 golang.org/x/sync
包中的一種類型,用於限制併發訪問某個資源的數量。它提供了一種機制,允許調用者以不同的權重請求訪問資源,併在資源可用時進行授予。
Weighted
的定義如下,提供了Acquire
,TryAcquire
,Release
三個方法:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error{}
func (s *Weighted) TryAcquire(n int64) bool{}
func (s *Weighted) Release(n int64) {}
Acquire
: 以權重n
請求獲取資源,阻塞直到資源可用或上下文ctx
結束。TryAcquire
: 嘗試以權重n
獲取信號量,如果成功則返回true
,否則返回false
,並保持信號量不變。Release
:釋放具有權重n
的信號量。
3.2 權重說明
有時候,不同請求對資源的消耗是不同的。通過設置權重,你可以更好地控制不同請求對資源的使用情況。例如,某些請求可能需要更多的計算資源或更長的處理時間,你可以設置較高的權重來確保它們能夠獲取到足夠的資源。
其次就是權重大隻是代表著請求需要使用到的資源多,對於優先順序並不會有作用。在Weighted
中,資源的許可是以先進先出(FIFO)的順序分配的,而不是根據權重來決定獲取的優先順序。當有多個請求同時等待獲取資源時,它們會按照先後順序依次獲取資源的許可。
假設先請求權重為 1 的資源,然後再請求權重為 2 的資源。如果當前可用的資源許可足夠滿足兩個請求的總權重,那麼先請求的權重為 1 的資源會先獲取到許可,然後是後續請求的權重為 2 的資源。
w.Acquire(context.Background(), 1) // 權重為 1 的請求先獲取到資源許可
w.Acquire(context.Background(), 2) // 權重為 2 的請求在權重為 1 的請求之後獲取到資源許可
3.3 基本使用
當使用Weighted
來控制資源的併發訪問時,通常需要以下幾個步驟:
- 創建
Weighted
實例,定義好最大資源數 - 當需要資源時,調用
Acquire
方法占據資源 - 當處理完成之後,調用
Release
方法釋放資源
下麵是一個簡單的代碼的示例,展示瞭如何使用Weighted
實現資源控制:
func main() {
// 1. 創建一個信號量實例,設置最大併發數
sem := semaphore.NewWeighted(10)
// 具體處理請求的函數
handleRequest := func(id int) {
// 2. 調用Acquire嘗試獲取資源
err := sem.Acquire(context.Background(), 1)
if err != nil {
fmt.Printf("Goroutine %d failed to acquire resource\n", id)
}
// 3. 成功獲取資源,使用defer,在任務執行完之後,自動釋放資源
defer sem.Release(1)
// 執行業務邏輯
return
}
// 模擬併發請求
for i := 0; i < 20; i++ {
go handleRequest(i)
}
time.Sleep(20 * time.Second)
}
首先,調用NewWeighted
方法創建一個信號量實例,設置最大併發數為10。然後在每次請求處理前調用Acquire
方法嘗試獲取資源,成功獲取資源後,使用defer
關鍵字,在任務執行完後自動釋放資源,調用Release
方法釋放一個資源。
保證最多同時有10個協程獲取資源。如果有更多的協程嘗試獲取資源,它們會等待其他協程釋放資源後再進行獲取。
4. 實現原理
4.1 設計初衷
Weighted
類型的設計初衷是為了在併發環境中實現對資源的控制和限制。它提供了一種簡單而有效的機制,允許在同一時間內只有一定數量的併發操作可以訪問或使用特定的資源。
4.2 基本原理
Weighted
類型的基本實現原理是基於計數信號量的概念。計數信號量是一種用於控制併發訪問的同步原語,它維護一個可用資源的計數器。在Weighted
中,該計數器表示可用的資源數量。
當一個任務需要獲取資源時,它會調用Acquire
方法。該方法首先會檢查當前可用資源的數量,如果大於零,則表示有可用資源,並將計數器減一,任務獲取到資源,並繼續執行。如果當前可用資源的數量為零,則任務會被阻塞,直到有其他任務釋放資源。
當一個任務完成對資源的使用後,它會調用Release
方法來釋放資源。該方法會將計數器加一,表示資源已經可用,其他被阻塞的任務可以繼續獲取資源並執行。
通過這種方式,Weighted
實現了對資源的限制和控制。它確保在同一時間內只有一定數量的併發任務可以訪問資源,超過限制的任務會被阻塞,直到有其他任務釋放資源。這樣可以有效地避免資源過度使用和競爭,保證系統的穩定性和性能。
4.3 代碼實現
4.3.1 結構體定義
Weighted
的結構體定義如下:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
size
:表示資源的總數量,即可以同時獲取的最大資源數量。cur
:表示當前已經被獲取的資源數量。mu
:用於保護Weighted
類型的互斥鎖,確保併發安全性。waiters
:使用雙向鏈表來存儲等待獲取資源的任務。
4.3.2 Acquire方法
Acquire
方法將獲取指定數量的資源。如果當前可用資源數量不足,調用此方法的任務將被阻塞,並加入到等待隊列中。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
// 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保併發安全性。
s.mu.Lock()
// size - cur 代表剩餘可用資源數,如果大於請求資源數n, 此時代表剩餘可用資源 大於 需要的資源數
// 其次,Weighted資源分配的順序是FIFO,如果等待隊列不為空,當前請求就需要自動放到隊列最後面
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// s.size 代表最大資源數,如果需要的資源數 大於 最大資源數,此時直接返回錯誤
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 這裡代表著當前暫時獲取不到資源,此時將創建一個waiter對象放到等待隊列最後
ready := make(chan struct{})
// waiter對象中包含需要獲取的資源數量n和通知通道ready。
w := waiter{n: n, ready: ready}
// 將waiter對象放到隊列最後
elem := s.waiters.PushBack(w)
// 釋放鎖,讓其他請求進來
s.mu.Unlock()
select {
// 如果ctx.Done()通道被關閉,表示上下文已取消,任務需要返回錯誤。
case <-ctx.Done():
err := ctx.Err()
// 新獲取鎖,檢查是否已經成功獲取資源。如果成功獲取資源,將錯誤置為nil,表示獲取成功;
s.mu.Lock()
select {
// 通過判斷ready channel是否接收到信號,從而來判斷是否成功獲取資源
case <-ready:
err = nil
default:
// 判斷是否是等待隊列中第一個元素
isFront := s.waiters.Front() == elem
// 將該請求從等待隊列中移除
s.waiters.Remove(elem)
// 如果是第一個等待對象,同時還有剩餘資源,喚醒後面的waiter。說不定後面的waiter剛好符合條件
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// ready通道接收到數據,代表此時已經成功占據到資源了
case <-ready:
return nil
}
}
Weighted
對象用來控制可用資源的數量。它有兩個重要的欄位,cur和size,分別表示當前可用的資源數量和總共可用的資源數量。
當一個請求通過Acquire
方法請求資源時,首先會檢查剩餘資源數量是否足夠,並且等待隊列中沒有其他請求在等待資源。如果滿足這兩個條件,請求就可以成功獲取到資源。
如果剩餘資源數量不足以滿足請求,那麼一個waiter
的對象會被創建並放入等待隊列中。waiter
對象包含了請求需要的資源數量n和一個用於通知的通道ready。當其他請求調用Release
方法釋放資源時,它們會檢查等待隊列中的waiter
對象是否滿足資源需求,如果滿足,就會將資源分配給該waiter
對象,並通過ready
通道來通知它可以執行業務邏輯了。
即使剩餘資源數量大於請求所需數量,如果等待隊列中存在等待的請求,新的請求也會被放入等待隊列中,而不管資源是否足夠。這可能導致一些請求長時間等待資源,導致資源的浪費和延遲。因此,在使用Weighted
進行資源控制時,需要謹慎評估資源配額,並避免資源饑餓的情況發生,以免影響系統的性能和響應能力。
4.3.3 Release方法
Release
方法將釋放指定數量的資源。當資源被釋放時,會檢查等待隊列中的任務。它從隊頭開始逐個檢查等待的元素,並嘗試為它們分配資源,直到最後一個不滿足資源條件的元素為止。
func (s *Weighted) Release(n int64) {
// 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保併發安全性。
s.mu.Lock()
// 2. 釋放資源
s.cur -= n
// 3. 異常情況處理
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 4. 喚醒等待任務
s.notifyWaiters()
s.mu.Unlock()
}
可以看到,Release
方法實現相對比較簡單,釋放資源後,便直接調用notifyWaiters
方法喚醒處於等待狀態的任務。下麵來看看notifyWaiters
方法的具體實現:
func (s *Weighted) notifyWaiters() {
for {
// 獲取隊頭元素
next := s.waiters.Front()
// 已經沒有處於等待狀態的協程,此時直接返回
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
// 如果資源不滿足要求 當前waiter的要求,此時直接返回
if s.size-s.cur < w.n {
break
}
// 否則占據waiter需要的資源數
s.cur += w.n
// 移除等待元素
s.waiters.Remove(next)
// 喚醒處於等待狀態的任務,Acquire方法會 <- ready 來等待信號的到來
close(w.ready)
}
}
notifyWaiters
方法會從隊頭開始獲取元素,判斷當前資源的剩餘數,是否滿足waiter
的要求,如果滿足的話,此時先占據該waiter
需要的資源,之後再將其從等待隊列中移除,最後調用close
方法,喚醒處於等待狀態的任務。 之後,再繼續隊列中取出元素,判斷是否滿足條件,迴圈反覆,直到不滿足waiter
的條件為止。
4.3.4 TryAcquire方法
TryAcquire
方法將嘗試獲取指定數量的資源,但不會阻塞。如果可用資源不足,它會立即返回一個錯誤,而不是阻塞等待。實現比較簡單,只是簡單檢查當前資源數是否滿足要求而已,具體如下:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
5. 註意事項
5.1 及時釋放資源
當使用Weighted
來管理資源時,確保在使用完資源後,及時調用Release
方法釋放資源。如果不這樣做,將會導致資源泄漏,最終導致所有的請求都將無法被處理。下麵展示一個簡單的代碼說明:
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
sem := semaphore.NewWeighted(5) // 創建一個最大併發數為5的Weighted實例
// 模擬使用資源的任務
task := func(id int) {
//1. 成功獲取資源
if err := sem.Acquire(context.Background(), 1); err != nil {
fmt.Printf("Task %d failed to acquire resource: %s\n", id, err)
return
}
// 2. 任務處理完成之後,資源沒有被釋放
// defer sem.Release(1) // 使用defer確保在任務完成後釋放資源
}
// 啟動多個任務併發執行
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task(id)
}(i)
}
wg.Wait() // 等待所有任務完成
}
在上面的代碼中,我們使用Weighted
來控制最大併發數為5。我們在任務中沒有調用sem.Release(1)
釋放資源,這些資源將一直被占用,後面啟動的5個任務將永遠無法獲取到資源,此時將永遠不會繼續執行下去。因此,務必在使用完資源後及時調用Release
方法釋放資源,以確保資源的正確回收和釋放,保證系統的穩定性和性能。
而且這裡最好使用defer
語句來實現資源的釋放,避免Release
函數在某些異常場景下無法被執行到。
5.2 合理設置併發數
Weighted
只是提供了一種管理資源的手段,具體的併發數還需要開發人員自行根據系統的實際需求和資源限制,合理設置Weighted
實例的最大併發數。過大的併發數可能導致資源過度競爭,而過小的併發數可能限制了系統的吞吐量。
具體操作可以到線上預發佈環境,不斷調整觀察,獲取到一個最合適的併發數。
5.3 考慮Weighted是否適用於當前場景
Weighted
類型可以用於限制併發訪問資源的數量,但它也存在一些潛在的缺點,需要根據具體的應用場景和需求權衡利弊。
首先是記憶體開銷,Weighted
類型使用一個 sync.Mutex
以及一個 list.List
來管理等待隊列,這可能會占用一定的記憶體開銷。對於大規模的併發處理,特別是在限制極高的情況下,可能會影響系統的記憶體消耗。
其次是Weighted
類型一旦初始化,最大併發數是固定的,無法在運行時動態調整。如果你的應用程式需要根據負載情況動態調整併發限制,可能需要使用其他機制或實現。
而且Weighted
是嚴格按照FIFO請求順序來分配資源的,當某些請求的權重過大時,可能會導致其他請求饑餓,即長時間等待資源。
最後,則是由於 Weighted
類型使用了互斥鎖來保護共用狀態,因此在高併發情況下,爭奪鎖可能成為性能瓶頸,影響系統的吞吐量。
因此,在使用 Weighted
類型時,需要根據具體的應用場景和需求權衡利弊,從而來決定是否使用Weighted
來實現資源的管理控制。
6. 總結
本文介紹了一種解決系統中資源管理問題的解決方案Weighted
。本文從問題引出,詳細介紹了Weighted
的特點和使用方法。通過瞭解Weighted
的設計初衷和實現原理,讀者可以更好地理解其工作原理。
同時,文章提供了使用Weighted
時需要註意的事項,如及時釋放資源、合理設置併發數等,從而幫助讀者避免潛在的問題,以及能夠在比較合適的場景下使用到Weighted
類型實現資源管理。基於此,我們完成了對Weighted
的介紹,希望對你有所幫助。你的點贊和收藏將是我最大的動力,比心~