多核處理器日益普及的現在很多代碼都得和併發/並行打交道,對於內置了併發支持(goroutine)的golang來說併發編程是必不可少的一環。 鏈表是我們再熟悉不過的數據結構,在併發編程中我們也時長需要用到,今天我們就來看兩種帶鎖的併發安全的單項鏈表。 方案一:粗粒度鎖,完全鎖住鏈表 方案一的做法是將 ...
多核處理器日益普及的現在很多代碼都得和併發/並行打交道,對於內置了併發支持(goroutine)的golang來說併發編程是必不可少的一環。
鏈表是我們再熟悉不過的數據結構,在併發編程中我們也時長需要用到,今天我們就來看兩種帶鎖的併發安全的單項鏈表。
方案一:粗粒度鎖,完全鎖住鏈表
方案一的做法是將所有操作用鎖——Mutex串列化處理。串列化處理是指鎖和鏈表相關聯,當需要修改或讀取鏈表時就獲取鎖,只要該goroutine持有鎖,那麼其他goroutine就無法修改或讀取鏈表,直到鎖的持有者將其釋放。
這樣可以保證任何時間都只有一個goroutine能處理鏈表,如此一來也就避免了數據競爭。下麵是鏈表結構的定義:
type MutexList struct { locker *sync.Mutex head, tail *Node size int64 }
size表示當前list的長度,head是一個哨兵節點,不存儲實際的數值。
下麵是節點的定義:
type Node struct { Value interface{} Next *Node } func NewNode(v interface{}) *Node { n := &Node{Value: v} n.Next = nil return n }
節點和它的初始化不用多說,因為數據訪問通過list來控制,所以節點里不需要再有mutex的存在。
好了我們進入正題,在粗粒度的解決方案里Enq方法負責將數據插入list的末尾,這是O(1)時間的操作,將list鎖住然後更新tail即可,註意我們不允許插入nil:
func (l *MutexList) Enq(v interface{}) bool { if v == nil { return false } l.locker.Lock() node := NewNode(v) l.tail.Next = node l.tail = node l.size++ l.locker.Unlock() return true }
然後是insert,它將數據插入在給出的index處,index從0開始,同樣我們不允許插入nil,同時會檢查index,index不能超過size,當list只有size-1個節點時,新數據會插入在list的末尾:
func (l *MutexList) Insert(index int64, v interface{}) bool { l.locker.Lock() defer l.locker.Unlock() if index > l.size || index < 0 || v == nil { return false } current := l.head for i := int64(0); i <= index-1; i++ { // index - 1是最後一個節點時 if current.Next == nil { break } current = current.Next } node := NewNode(v) node.Next = current.Next current.Next = node l.size++ return true }
這裡我們使用了defer,那是因為只有一個mutex,而且函數有多個出口,容易在編碼過程中漏掉對鎖的釋放。
節點的刪除和查找也是類似的步驟,先給列表上鎖,然後修改/讀取,最後解鎖,這裡就不多講解了。
然後是獲取size的函數,後面的測試中要用,雖然我們以原子操作來獲取了長度,但是仍然可能存在獲得size之後其他goroutine進行了remove導致size改變進而引發Insert返回false,所幸的是我們的測試里並不會讓remove和Insert同時出現,因此不會出現insert返回失敗的問題,在實際使用時需要註意Insert的返回值,這一點在第二種方案中也是一樣的:
func (l *MutexList) Length() int64 { return atomic.LoadInt64(&l.size) }
因為方案二的該函數並無什麼變化,因此就省略了。
如你所見,方案一的優點在於實現起來簡單,確點在於一次只有一個goroutine能處理list,幾乎所有對list的操作都被串列化了。
方案一無疑能很好地工作,但是它的性能十分有限,所以我們來看看方案二。
方案二:細粒度鎖,鎖住需要修改的節點
方案二的做法是將鎖放到node里,每次需要修改的僅僅是部分節點,而不用把整個list鎖住,這樣能保證互不幹擾的goroutine們可以同時處理list,而會互相干擾的goroutine則會被節點的mutex阻塞,以保證不存在竟態數據。
當然,為了保證不會有多個goroutine同時處理一個節點,我們需要在取得要修改節點的鎖之前先取得前項節點的鎖,然後才能取得修改節點的鎖。這個步驟很像交叉手,它被稱為鎖耦合。
另外一個需要註意的地方是加鎖的順序,所有操作的加鎖順序/方向必須相同,比如從head開始鎖定到tail,如果不按統一的順序加鎖將會出現死鎖。考慮如下情況,goroutine A鎖住了節點1,正準備鎖定節點2,這時goroutine B沿反方向加鎖,它要鎖住節點2然後再鎖住節點1,如果B運氣很好先於A取得了節點2的鎖,那麼它將一直等待鎖住節點1,而A則會始終等待鎖住節點2,出現了A等B,B等A的死鎖。但是只要統一了加鎖的順序/方向,那麼這種問題就不復存在了。
這是list和node的定義,可以看見鎖已經移動到node結構里了:
type List struct { head, tail *MutexNode size int64 } type MutexNode struct { Locker *sync.Mutex Value interface{} Next *MutexNode } func NewMutexNode(v interface{}) *MutexNode { n := &MutexNode{Value: v} n.Locker = &sync.Mutex{} n.Next = nil return n } func NewList() *List { l := &List{} l.head = NewMutexNode(nil) l.tail = l.head return l }list和node的定義
下麵我們來看Enq,功能與方案一一致,只是在處理鎖的地方有所不同,因為tail節點總是在list末尾的元素,符合我們從head開始的加鎖順序,又因為l.tail的位置始終是確定的,所以可以省略鎖住前項節點的步驟;然而l.tail會在我們等待鎖的時間段里被更新,所以我們需要處理l.tail被更新的情況:
func (l *List) Enq(v interface{}) bool { if v == nil { return false } tail := l.tail tail.Locker.Lock() for tail.Next != nil { next := tail.Next next.Locker.Lock() tail.Locker.Unlock() tail = next } node := NewMutexNode(v) tail.Next = node l.tail = node l.size++ tail.Locker.Unlock() return true }
如果tail的next在取得鎖時不為nil,說明tail被更新,在tail被更新之後我們需要找到當前的末尾節點,這時不能直接使用l.tail,有兩點原因,一是因為這時的l.tail可能也已經被更新,二是在臨時變數tail可能是非前驅節點時給l.tail加鎖不能保證其一致性,而且如此一來會破壞加鎖的順序,會造成意想不到的問題。所以我們遵循加鎖的順序原則不斷後推,直到找到真正的末尾節點。由此可見方案二的Enq操作最壞情況下是O(n)最好情況下是O(1),而只要仔細想一想,在併發壓力較大時這個操作幾乎總是O(n)的時間開銷(不過實際情況是方案二花費的時間與方案一差不多,原因在於方案一要鎖住整個list開銷實在太大了)
Insert的功能也與方案一一樣,因為不是鎖住整個list,所以光判斷size是無意義的,需要處理list被中途修改的情況;而且因為是從head開始加鎖,然後鎖住節點1再解鎖head,以此類推,所以不會有競爭,但同樣存在remove和insert一起使用時insert會失敗的情況,需要註意其返回值:
func (l *List) Insert(index int64, v interface{}) bool { if index < 0 || v == nil { return false } current := l.head current.Locker.Lock() for i := int64(0); i <= index-1; i++ { next := current.Next if next == nil { // 如果index前的某個節點為nil,那麼說明鏈表可能被修改了,沒有index個節點,insert失敗 if index < index - 1 { current.Locker.Unlock() return false } break } next.Locker.Lock() current.Locker.Unlock() current = next } node := NewMutexNode(v) node.Next = current.Next current.Next = node l.size++ current.Locker.Unlock() return true }
remove的做法和insert類似,不再贅述。
我們可以看到方案二鎖的粒度確實變小了,但是實現變得十分複雜,而且需要同時考慮多個邊界情況,對開發增加了很大的難度,而且分散的鎖也會對調試帶來一定的負面影響。
方案二的另一個缺點是每個節點都帶有自己的mutex,當節點增多時記憶體的開銷也會增大。
性能對比
說了這麼多,方案一粗粒度鎖和方案二細粒度鎖在性能上孰優孰劣呢?畢竟方案二需要多次獲取和釋放鎖而且需要額外處理很多邊界情況,仔細想一下的話可能也是一筆不菲的開銷,感謝golang自帶的測試套件,我們可以方便的測試。
測試我們採用一組單goroutine+一組多goroutine測試一個功能的做法,測試機器是intel i5 6500 4核,為了模擬一般的工作負載,在多goroutine組我統一使用6個goroutine來併發操作list。
測試代碼:
import ( "math/rand" "sync" "testing" "time" ) func init() { rand.Seed(time.Now().Unix()) } func BenchmarkEnq(b *testing.B) { list := NewMutexList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq failed") } } } func BenchmarkGoroutineEnq(b *testing.B) { list := NewMutexList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("Insert failed") } } } func BenchmarkGoroutineInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }方案一
import ( "math/rand" "sync" "testing" ) func BenchmarkMutexNodeEnq(b *testing.B) { list := NewList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq failed") } } } func BenchmarkGoroutineMutexNodeEnq(b *testing.B) { list := NewList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert failed") } } } func BenchmarkGoroutineMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }方案二
測試內容是將隨機數插入兩種不同的鏈表,然後對比插入性能。
這是測試結果,因為testing不能跟蹤goroutine內部的操作,所以多goroutine組的單個op看上去比較嚇人,其實這是一個goroutine運行完所有插入調用的時間:
go test -bench=. -benchmem
可以看到,在Enq也就是在末尾插入上兩者相差不多,方案二在所有多goroutine測試用例的表現都優於方案一;
對於有大量隨機訪問發生的Inser操作,方案二在性能上可以說是碾壓的存在,這可能是方案二可以運行多個goroutine同時修改list而方案一隻能同時有一個goroutine修改的原因;
而在併發的情況下方案二仍然比方案一快不少,但是差距縮小了,原因很可能是頻繁的加鎖加上goroutine之間互相干擾增多導致了性能的部分下降。
總結
如果追求性能,可以考慮方案二,或者使用第三方的無鎖隊列,不建議自己去實現無鎖數據結構,因為 太 復 雜 !如果你覺得方案二已經想不明白了,那麼無鎖編程將會是天書一般的存在,不如復用大神們的勞動成果吧。如果鏈表並不是你程式的性能熱點,那麼就可以考慮方案一,穩定且易於開發和維護的代碼永遠都是好東西。
最後如果有疑問和建議或者勘誤,歡迎評論指出,祝玩得愉快!