GO 語言中 chan 的理解 ### chan 的底層實現是怎麼樣的? > chan 是 Go 語言中的一個關鍵字,用於實現併發通信。chan 可以用於在不同的 goroutine 之間傳遞數據,實現數據的同步和非同步傳輸。 在底層實現上,chan 是通過一個結構體來表示的,這個結構體包含了一個指向 ...
GO 語言中 chan 的理解
chan 的底層實現是怎麼樣的?
chan 是 Go 語言中的一個關鍵字,用於實現併發通信。chan 可以用於在不同的 goroutine 之間傳遞數據,實現數據的同步和非同步傳輸。
在底層實現上,chan 是通過一個結構體來表示的,這個結構體包含了一個指向數據的指針和兩個指向通道的指針。其中,一個指針用於發送數據,另一個指針用於接收數據。
下麵是 chan 的底層實現代碼:
type hchan struct {
qcount uint // 當前隊列中的元素數量
dataqsiz uint // 隊列的容量
buf unsafe.Pointer // 指向隊列的指針
elemsize uint16 // 元素的大小
closed uint32 // 是否關閉
elemtype *_type // 元素的類型
sendx uint // 發送的位置
recvx uint // 接收的位置
recvq waitq // 接收等待隊列
sendq waitq // 發送等待隊列
lock mutex // 鎖
}
chan 的發送和接收操作的底現
當我們向 chan 發送數據時,會先檢查 chan 是否已經關閉。如果 chan 已經關閉,那麼發送操作會直接返回一個 panic。否則,會將數據複製到隊列中,並更新發送位置。
下麵是 chan 發送操作的底層實現代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 檢查 chan 是否已經關閉
if c.closed != 0 {
panic("send on closed channel")
}
// 計算發送位置
i := c.sendx
// 計算隊列中的元素數量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果隊列已滿,需要擴容
grow(c)
}
// 更新發送位置
c.sendx++
// 將數據複製到隊列中
qput(c, i, ep)
return true
}
當我們從 chan 接收數據時,也會先檢查 chan 是否已經關閉。如果 chan 已經關閉並且隊列中沒有數據,那麼接收操作會直接返回一個零值。否則,會從隊列中取出數據,並更新接收位置。
下麵是 chan 接收操作的底層實現代碼:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
return false, false
}
// 計算接收位置
i := c.recvx
// 如果隊列中沒有數據,需要阻塞等待
for c.qcount <= 0 {
if !block {
return false, false
}
gopark(chanparkcommit, unsafe.Pointer(c), "chan receive", traceEvGoBlockRecv, 1)
}
// 從隊列中取出數據
qget(c, i, ep)
// 更新接收位置
c.recvx++
// 更新隊列中的元素數量
c.qcount--
return true, true
}
chan 是如何實現多個 gorouting 併發安全訪問的?
如上 hchan 結構中的 recvq 和 sendq 分別表示接收等待隊列和發送等待隊列,它們的定義如下:
type waitq struct {
first *sudog // 等待隊列的第一個元素
last *sudog // 等待隊列的最後一個元素
}
sudog 表示等待隊列中的一個元素,它的定義如下:
type sudog struct {
// 等待的 goroutine
g *g
// 是否是 select 操作
isSelect bool
// 等待隊列中的下一個元素
next *sudog
// 等待隊列中的上一個元素
prev *sudog
// 等待的元素
elem unsafe.Pointer
// 獲取鎖的時間
acquiretime int64
// 保留欄位
release2 uint32
// 等待的 ticket
ticket uint32
// 父 sudog
parent *sudog
// 等待鏈表
waitlink *sudog
// 等待鏈表的尾部
waittail *sudog
// 關聯的 chan
c *hchan
// 喚醒時間
releasetime int64
}
當 chan 的隊列已滿或為空時,當前 goroutine 會被加入到發送等待隊列或接收等待隊列中,並釋放鎖。當另一個 goroutine 從 chan 中取出數據或向 chan 發送數據時,它會重新獲取鎖,並從等待隊列中取出一個 goroutine,將其喚醒。這樣,多個 goroutine 就可以通過等待隊列來實現併發訪問 chan。
sudog 是 Go 中非常重要的數據結構,因為 g 與同步對象關係是多對多的。
一個 g 可以出現在許多等待隊列上,因此一個 g 可能有很多sudog:在 select 操作中,一個 goroutine 可以等待多個 chan 中的任意一個就緒, sudog 中的 isSelect 欄位被用來標記它是否是 select 操作。當一個 chan 就緒時,它會喚醒對應的 sudog,並將其從等待隊列中移除。如果一個 sudog 是 select 操作,它會在喚醒後返回一個特殊的值,表示哪個 chan 就緒了
多個 g 可能正在等待同一個同步對象,因此一個對象可能有許多 sudog:chan 在不同的 gorouting 中傳遞等待
完整的發送和接受方法實現如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 計算發送位置
i := c.sendx
// 計算隊列中的元素數量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果隊列已滿,需要將當前 goroutine 加入到發送等待隊列中
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false
}
// 創建一個 sudog,表示當前 goroutine 等待發送
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到發送等待隊列中
c.sendq.enqueue(sg)
// 釋放鎖,並將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanSend, traceEvGoBlockSend, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 從發送等待隊列中取出 sudog
sg = c.sendq.dequeue()
if sg == nil {
throw("chan send inconsistency")
}
// 將數據複製到隊列中
qput(c, i, ep)
}
// 更新發送位置
c.sendx++
// 釋放鎖
unlock(&c.lock)
return true
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 計算接收位置
i := c.recvx
// 如果隊列中沒有數據,需要將當前 goroutine 加入到接收等待隊列中
if c.qcount <= 0 {
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false, false
}
// 創建一個 sudog,表示當前 goroutine 等待接收
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到接收等待隊列中
c.recvq.enqueue(sg)
// 釋放鎖,並將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanReceive, traceEvGoBlockRecv, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 從接收等待隊列中取出 sudog
sg = c.recvq.dequeue()
if sg == nil {
throw("chan receive inconsistency")
}
// 從隊列中取出數據
qget(c, i, ep)
} else {
// 從隊列中取出數據
qget(c, i, ep)
}
// 更新接收位置
c.recvx++
// 更新隊列中的元素數量
c.qcount--
// 釋放鎖
unlock(&c.lock)
return true, true
}