淄博燒烤爆紅出了圈,當你坐在八大局的燒烤攤,面前是火爐、烤串、小餅和蘸料,音樂響起,啤酒倒滿,燒烤靈魂的party即將開場的時候,你系統中的Scheduler(調試器),也自動根據設定的Trigger(觸發器),從容優雅的啟動了一系列的Job(後臺定時任務)。工作一切早有安排,又何須費心勞神呢?因為 ...
本文講解的是
golang.org/x/sync
這個包中的errgroup
1、errgroup 的基礎介紹
學習過 Go 的朋友都知道 Go 實現併發編程是比較容易的事情,只需要使用go
關鍵字就可以開啟一個 goroutine。那對於併發場景中,如何實現goroutine
的協調控制呢?常見的一種方式是使用sync.WaitGroup
來進行協調控制。
使用過sync.WaitGroup
的朋友知道,sync.WaitGroup
雖然可以實現協調控制,但是不能傳遞錯誤,那該如何解決呢?聰明的你可能馬上想到使用 chan 或者是 context
來傳遞錯誤,確實是可以的。那接下來,我們一起看看官方是怎麼實現上面的需求的呢?
1.1 errgroup的安裝
安裝命令:
go get golang.org/x/sync
//下麵的案例是基於v0.1.0 演示的
go get golang.org/x/[email protected]
1.2 errgroup的基礎例子
這裡我們需要請求3個url來獲取數據,假設請求url2時報錯,url3耗時比較久,需要等一秒。
package main
import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"strings"
"time"
)
func main() {
queryUrls := map[string]string{
"url1": "http://localhost/url1",
"url2": "http://localhost/url2",
"url3": "http://localhost/url3",
}
var eg errgroup.Group
var results []string
for _, url := range queryUrls {
url := url
eg.Go(func() error {
result, err := query(url)
if err != nil {
return err
}
results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
return nil
})
}
// group 的wait方法,等待上面的 eg.Go 的協程執行完成,並且可以接受錯誤
err := eg.Wait()
if err != nil {
fmt.Println("eg.Wait error:", err)
return
}
for k, v := range results {
fmt.Printf("%v ---> %v\n", k, v)
}
}
func query(url string) (ret string, err error) {
// 假設這裡是發送請求,獲取數據
if strings.Contains(url, "url2") {
// 假設請求 url2 時出現錯誤
fmt.Printf("請求 %s 中....\n", url)
return "", errors.New("請求超時")
} else if strings.Contains(url, "url3") {
// 假設 請求 url3 需要1秒
time.Sleep(time.Second*1)
}
fmt.Printf("請求 %s 中....\n", url)
return "success", nil
}
執行結果:
請求 http://localhost/url2 中....
請求 http://localhost/url1 中....
請求 http://localhost/url3 中....
eg.Wait error: 請求超時
果然,當其中一個goroutine
出現錯誤時,會把goroutine
中的錯誤傳遞出來。
我們自己運行一下上面的代碼就會發現這樣一個問題,請求 url2 出錯了,但是依舊在請求 url3 。因為我們需要聚合 url1、url2、url3 的結果,所以當其中一個出現問題時,我們是可以做一個優化的,就是當其中一個出現錯誤時,取消還在執行的任務,直接返回結果,不用等待任務執行結果。
那應該如何做呢?
這裡假設 url1 執行1秒,url2 執行報錯,url3執行3秒。所以當url2報錯後,就不用等url3執行結束就可以返回了。
package main
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"strings"
"time"
)
func main() {
queryUrls := map[string]string{
"url1": "http://localhost/url1",
"url2": "http://localhost/url2",
"url3": "http://localhost/url3",
}
var results []string
ctx, cancel := context.WithCancel(context.Background())
eg, errCtx := errgroup.WithContext(ctx)
for _, url := range queryUrls {
url := url
eg.Go(func() error {
result, err := query(errCtx, url)
if err != nil {
//其實這裡不用手動取消,看完源碼就知道為啥了
cancel()
return err
}
results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
return nil
})
}
err := eg.Wait()
if err != nil {
fmt.Println("eg.Wait error:", err)
return
}
for k, v := range results {
fmt.Printf("%v ---> %v\n", k, v)
}
}
func query(errCtx context.Context, url string) (ret string, err error) {
fmt.Printf("請求 %s 開始....\n", url)
// 假設這裡是發送請求,獲取數據
if strings.Contains(url, "url2") {
// 假設請求 url2 時出現錯誤
time.Sleep(time.Second*2)
return "", errors.New("請求出錯")
} else if strings.Contains(url, "url3") {
// 假設 請求 url3 需要1秒
select {
case <- errCtx.Done():
ret, err = "", errors.New("請求3被取消")
return
case <- time.After(time.Second*3):
fmt.Printf("請求 %s 結束....\n", url)
return "success3", nil
}
} else {
select {
case <- errCtx.Done():
ret, err = "", errors.New("請求1被取消")
return
case <- time.After(time.Second):
fmt.Printf("請求 %s 結束....\n", url)
return "success1", nil
}
}
}
執行結果:
請求 http://localhost/url2 開始....
請求 http://localhost/url3 開始....
請求 http://localhost/url1 開始....
請求 http://localhost/url1 結束....
eg.Wait error: 請求出錯
2、errgroup源碼分析
看了上面的例子,我們對errgroup
有了一定瞭解,接下來,我們一起看看errgroup
做了那些封裝。
2.1 errgroup.Group
errgroup.Group
源碼如下:
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
// context 的 cancel 方法
cancel func()
wg sync.WaitGroup
//傳遞信號的通道,這裡主要是用於控制併發創建 goroutine 的數量
//通過 SetLimit 設置過後,同時創建的goroutine 最大數量為n
sem chan token
// 保證只接受一次錯誤
errOnce sync.Once
// 最先返回的錯誤
err error
}
看結構體中的內容,發現比原生的sync.WaitGroup
多了下麵的內容:
cancel func()
sem chan token
errOnce sync.Once
err error
2.2 WithContext 方法
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
方法邏輯還是比較簡單的,主要做了兩件事:
- 使用
context
的WithCancel()
方法創建一個可取消的Context
- 將
context.WithCancel(ctx)
創建的cancel
賦值給 Group中的cancel
2.3 Go
1.2 最後一個例子說,不用手動去執行 cancel 的原因就在這裡。
g.cancel() //這裡就是為啥不用手動執行 cancel的原因
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
//往 sem 通道中發送空結構體,控制併發創建 goroutine 的數量
g.sem <- token{}
}
g.wg.Add(1)
go func() {
// done()函數的邏輯就是當 f 執行完後,從 sem 取一條數據,並且 g.wg.Done()
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() { // 這裡就是確保 g.err 只被賦值一次
g.err = err
if g.cancel != nil {
g.cancel() //這裡就是為啥不用手動執行 cancel的原因
}
})
}
}()
}
2.4 TryGo
看註釋,知道此函數的邏輯是:當正在執行的goroutine數量小於通過SetLimit()
設置的數量時,可以啟動成功,返回 true,否則啟動失敗,返回false。
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}: // 當g.sem的緩衝區滿了過後,就會執行default,也代表著未啟動成功
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
//----主要看上面的邏輯,下麵的邏輯和Go中的一樣-------
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
return true
}
2.5 Wait
代碼邏輯很簡單,這裡主要註意這裡:
//我看這裡的時候,有點疑惑,為啥這裡會去調用 cancel()方法呢?
//這裡是為了代碼的健壯性,用 context.WithCancel() 創建得到的 cancel,在代碼執行完畢之前取消是一個好習慣
g.cancel()
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait() //通過 g.wg.Wait() 阻塞等待所有的 goroutine 執行完
if g.cancel != nil {
//我看這裡的時候,有點疑惑,為啥這裡會去調用 cancel()方法呢?
//這裡是為了代碼的健壯性,用 context.WithCancel() 創建得到的 cancel,在代碼執行完畢之前取消是一個好習慣
g.cancel()
}
return g.err
}
2.6 SetLimit
看代碼的註釋,我們知道:SetLimit
的邏輯主要是限制同時執行的 goroutines 的數量為n,當n小於0時,沒有限制。如果有運行的 goroutine,調用此方法會報錯。
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
3、errgroup 容易忽視的坑
這個坑是看別人的記錄看到的,對errgroup
不太熟悉時,是不小心確實容易掉進去,所以摘抄了過來,如果侵權,請聯繫刪除,謝謝!
原文鏈接:併發編程包之 errgroup
需求:
開啟多個
Goroutine
去緩存中設置數據,同時開啟一個Goroutine
去非同步寫日誌,很快我的代碼就寫出來了:
package main
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
// 單獨開一個協程去做其他的事情,不參與waitGroup
go WriteChangeLog(ctx)
for i:=0 ; i< 3; i++{
g.Go(func() error {
return errors.New("訪問redis失敗\n")
})
}
if err := g.Wait();err != nil{
fmt.Printf("appear error and err is %s",err.Error())
}
time.Sleep(1 * time.Second)
}
func WriteChangeLog(ctx context.Context) error {
select {
case <- ctx.Done():
return nil
case <- time.After(time.Millisecond * 50):
fmt.Println("write changelog")
}
return nil
}
結果:
appear error and err is 訪問redis失敗
代碼看著沒有問題,但是日誌一直沒有寫入。這是為什麼呢?
其實原因就是因為這個ctx
是errgroup.WithContext
方法返回的一個帶取消的ctx
,我們把這個ctx
當作父context
傳入WriteChangeLog
方法中了,如果errGroup
取消了,也會導致上下文的context
都取消了,所以WriteChangelog
方法就一直執行不到。
這個點是我們在日常開發中想不到的,所以需要註意一下~。
解決方法:
解決方法就是在 go WriteChangeLog(context.Background()) 傳入新的ctx
參考資料:
上面這個案例中講了一個容易忽視的坑,大家可以看看