**註:**本文所有函數名為中文名,並不符合代碼規範,僅供讀者理解參考。 Goroutine Go程不是OS線程,也不是綠色線程(語言運行時管理的線程),而是更高級別的抽象,一種特殊的協程。是一種非搶占式的簡單併發子goroutine(函數、閉包、方法)。不能被中斷,但有多個point可以暫停或重新 ...
註:本文所有函數名為中文名,並不符合代碼規範,僅供讀者理解參考。
Goroutine
Go程不是OS線程,也不是綠色線程(語言運行時管理的線程),而是更高級別的抽象,一種特殊的協程。是一種非搶占式的簡單併發子goroutine(函數、閉包、方法)。不能被中斷,但有多個point可以暫停或重新進入。
goroutine 在它們所創建的相同地址空間內執行,特別是在迴圈創建go程的時候,推薦將變數顯式映射到閉包(引用外部作用域變數的函數)中。
fork-join 併發模型
Fork 在程式中的任意節點,子節支可以與父節點同時運行。join 在將來某個時候這些併發分支會合併在一起,這是保持程式正確性和消除競爭條件的關鍵。Go語言遵循 fork-join併發模型。
使用 go func 其實就是在創建 fork point,為了創建 join point,我們需要解決競爭條件。
sync.WaitGroup
func 競爭條件_解決() {
var wg sync.WaitGroup
var data int
wg.Add(1)
go func() {
defer wg.Done()
data++
}()
wg.Wait()
if data == 0 {
fmt.Println("Value", data)
} else {
fmt.Println("Value 不是 0")
}
}
通過 sync.WaitGroup 我們阻塞 main 直到 go 程退出後再讓 main 繼續執行,實現了 join point。可以理解為併發-安全計數器,經常配合迴圈使用。
這是一個同步訪問共用記憶體的例子。使用前提是你不關心併發操作的結果,或者你有其他方法來收集它們的結果。
wg.Add(1) 是在幫助跟蹤的goroutine之外完成的,如果放在匿名函數內部,會產生競爭條件。因為你不知道go程什麼時候被調度。
sync.Mutex 互斥鎖
type state struct {
lock sync.Mutex
count int
}
func 結構體修改狀態_互斥鎖() {
s := state{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
// s.lock.Lock()
defer wg.Done()
// defer s.lock.Unlock()
s.count++
}()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
// s.lock.Lock()
defer wg.Done()
// defer s.lock.Unlock()
s.count--
}()
}
wg.Wait()
fmt.Println(s.count)
}
沒有互斥鎖的時候,會導致發生競爭現象,取消互斥鎖的註釋,最終結果為理想的0。
進入和退出一個臨界區是有消耗的,所以一般人會儘量減少在臨界區的時間。
sync.RWMutex 讀寫鎖
本質和普通的互斥鎖相同,但是可以保證在未鎖的情況允許多個讀消費者持有一個讀鎖,在讀消費者非常多的情況下可以提高性能。
在多個讀消費者的情況下,通常使用 RWMutex ,讀消費者較少時,Mutex和RWMutex兩者都可用。
Cond 同步多個go程
cond : 一個goroutine的集合點,等待或發佈一個event。
多個go程暫停在某個point上,等待一個事件信號再繼續執行。沒有cond的時候是怎麼做的,當然是for迴圈,但是這有個大問題。
func 無cond() {
isOK := false
go func() {
for isOK == false {
// time.Sleep(time.Microsecond) // bad method
// do something
}
fmt.Println("OK I finished")
}()
go func() {
for isOK == false {
// time.Sleep(time.Microsecond) // bad method
// do something
}
fmt.Println("OK I finished")
}()
time.Sleep(time.Second * 5)
isOK = true
select {}
}
這會消耗一整個CPU核心的所有周期,有些人會引入 time.Sleep 實際上這會讓演算法低效,這時候我們可以使用 cond。
func 有cond() {
var wg sync.WaitGroup
cond := sync.NewCond(&sync.Mutex{})
test := func() {
defer wg.Done()
defer cond.L.Unlock()
cond.L.Lock()
cond.Wait()
fmt.Println("something work...OK finished")
}
wg.Add(2)
go test()
go test()
time.Sleep(time.Second * 5)
cond.Broadcast() // 通知所有go程
// cond.Signal() // 通知等待時間最久的一個go程
wg.Wait()
}
cond運行時內部維護一個FIFO列表。與利用channel相比,cond類型性能要高很多。
Once 只允許一次
可以配合單例模式使用,將判斷對象是否為null改為sync.Once用於創建唯一對象。
sync.Once只計算調用Do方法的次數,而不是多少次唯一調用Do方法。所以在必要情況下聲明多個sync.Once變數而不是用一個。下麵的例子輸出 1
func 只調用一次() {
var once sync.Once
count := 0
once.Do(func() {
count++
})
once.Do(func() {
count--
})
fmt.Println(count)
}
Pool 池子
對象池模式是一種創建和提供可供使用的固定數量實例或Pool實例的方法。通常用於約束創建昂貴的場景,比如資料庫連接,以便只創建固定數量的實例,但不確定數量的操作仍然可以請求訪問這些場景。
使用pool的另一個原因是實例化的對象會被GC自動清理,而pool不會
- 可以通過限制創建的對象數量來節省主機記憶體。
- 提前載入獲取引用到另一個對象所需的時間,比如建立伺服器連接。
你的併發進程需要請求一個對象,但是在實例化之後很快地處理它們,或者在這些對象的構造可能會對記憶體產生負面影響,這時最好使用Pool設計模式。但是必須確保pool中對象是同質的,否則性能大打折扣。
註意事項
- 實例化 sync.Pool ,調用 New 方法創建成員變數是線程安全的。
- 收到來自Get的實例,不要對所接受的對象的狀態做出任何假設。(同質,不需要做if判斷)
- 當你用完了一個從Pool取出的對象時,一定要調用put,否則無法復用這個實例。通常情況下用defer完成。
- Pool內的分佈必須大致均勻
type conn struct{}
func 對象池() {
pool := &sync.Pool{New: func() any {
time.Sleep(time.Millisecond * 250)
fmt.Println("創建連接對象")
return &conn{}
}}
for i := 0; i < 10; i++ {
pool.Put(pool.New())
}
fmt.Println("初始化結束")
c1 := pool.Get()
c2 := pool.Get()
pool.Put(c1)
pool.Put(c2)
}
Channel 通道
channel也可以用來同步記憶體訪問,但最好用於在goroutine之間傳遞消息(channel是將goroutine綁定在一起的粘合劑)。雙向 chan 變數名尾碼加 Stream
帶緩存的channel和不帶緩存的channel聲明是一樣的
var dataStream chan interface{}
雙向channel可以隱式轉換成單向channel,這對函數返回單向通道很有用
var receiveChan <-chan interface{}
var sendChan chan<- interface{}
dataStream := make(chan interface{})
receiveChan = dataStream
sendChan = datraStream
go語言中channel是阻塞的,意味著channel內的數據被消費後,新的數據才可以寫入。通過 <- 操作符的接受形式可以選擇返回兩個值。
salutation,ok := <-dataStream
當channel未關閉時,ok返回true,關閉後返回false。即使channel關閉了,也能讀取到預設值,為了支持一個channel有單個上游寫入,有多個下游讀取。
模擬之前WaitGroup的例子
func 競爭條件_通道() {
var data int
var Stream chan interface{} = make(chan interface{})
go func() {
data++
Stream <- struct{}{}
}()
<-Stream
if data == 0 {
fmt.Println("Value", data)
} else {
fmt.Println("Value 不是 0")
}
}
模擬之前cond同步多個go程的例子
func channel代替cond() {
var wg sync.WaitGroup
Stream := make(chan interface{})
test := func() {
defer wg.Done()
<-Stream
fmt.Println("something work...OK finished")
}
wg.Add(1)
go test()
go test()
time.Sleep(time.Second * 5)
close(Stream)
wg.Wait()
}
在同一時間打開或關閉多個goroutine可以考慮用channel。
channel操作結果
操作 | Channel狀態 | 結果 |
---|---|---|
Read | nil | 阻塞 |
打開且非空 | 輸出值 | |
打開但空 | 阻塞 | |
關閉的 | 預設值,false | |
只寫 | 編譯錯誤 | |
Write | nil | 阻塞 |
打開但填滿 | 阻塞 | |
打開但不滿 | 寫入 | |
關閉的 | panic | |
只讀 | 編譯錯誤 | |
close | nil | panic |
打開且非空 | 關閉Channel;仍然能讀取通道數據,直到讀取完畢返回預設值 | |
打開但空 | 關閉Channel;返回預設值 | |
關閉的 | panic | |
只讀 | 編譯錯誤 |
Channel 使用哲學
在正確的環境中配置Channel,分配channel的所有權。這裡的所有權被定義為 實例化、寫入和關閉channel的goroutine。重要的是弄清楚哪個goroutine擁有channel。
單向channel聲明的是一種工具,允許我們區分所有者和使用者。一旦我們將channel所有者和非channel所有者區分開來,前面的表的結果會非常清晰。可以開始講責任分配給哪些擁有channel的goroutine和不擁有channel的goroutine。
擁有channel的goroutine
- 實例化channel
- 執行寫操作,或將所有權傳遞個另一個goroutine
- 關閉channel
- 執行這三件事,並通過只讀channel把它們暴露出來。
使用channel的goroutine
- 知道channel是何時關閉的 => 檢查第二個返回值
- 正確處理阻塞 =>取決於你的演算法
儘量保持channel的所有權很小,消費者函數只能執行channel的讀取方法,因此只需要知道它應該如何處理阻塞和channel的關閉。
func 通道使用哲學() {
// 所有權範圍足夠小,職責明確
chanOwner := func() <-chan int {
resultStream := make(chan int, 5)
go func() {
defer close(resultStream)
for i := 0; i < 5; i++ {
resultStream <- i
}
}()
return resultStream // 傳遞單向通道給另一個 goroutine
}
resultStream := chanOwner()
for result := range resultStream {
fmt.Println(result)
}
fmt.Println("Done")
}
Select 選擇語句
Go語言運行時將在一組case語句中執行偽隨機選擇。
var c<-chan int // 註意是 nil,永遠阻塞
select{
case <-c:
case <- time.After(1 * time.Second):
fmt.Println("Timed out.")
}
time.After函數通過傳入time.Duration參數返回一個數值並寫入channel。select允許加default語句,通常配合for-select迴圈一起使用,允許go程在等待另一個go程結果的同時,自己乾一些事情。
GOMAXPROCS
通過修改 runtime.GOMAXPROCS 允許你修改OS線程的數量。一般是為了調試,添加OS線程來更頻繁觸發競爭條件。
參考資料
-
《Go語言併發之道》Katherine CoxBuday
-
《Go語言核心編程》李文塔
-
《Go語言高級編程》柴樹彬、曹春輝