深度解密Go語言之channel

来源:https://www.cnblogs.com/qcrao-2018/archive/2019/07/21/11220651.html
-Advertisement-
Play Games

Goroutine 和 Channel 是 Go 語言併發編程的兩大基石,本文深度解密 channel 的底層原理。 ...


目錄

大家好!“深度解密 Go 語言”系列好久未見,我們今天講 channel,預祝閱讀愉快!在開始正文之前,我們先說些題外話。

上一篇關於 Go 語言的文章講 Go 程式的整個編碼、編譯、運行、退出的全過程。文章發出後,反響強烈,在各大平臺的閱讀量都不錯。例如博客園登上了 48 小時閱讀排行榜,並且受到了編輯推薦,占據首頁頭條位置整整一天;在開發者頭條首頁精選的位置霸榜一周時間……

博客園頭條推薦

開發者頭條精選

熟悉碼農桃花源的朋友們都知道,這裡每篇文章都很長,要花很長時間才能讀完。但長並不是目的,把每個問題都講深、講透才是最重要的。首先我自己得完全理解才行,所以寫每篇文章時我都會看很多參考資料,看源碼,請教大牛,自己還要去寫樣例代碼跑結果……從創建文稿到真正完成寫作需要很長時間。

寫作時間

做這些事情,無非是想力求我寫出來的文字,都是我目前所能理解的最深層次。如果我暫時理解不了,我會說出來,或者不寫進文章裡面去,留到以後有能力的時候再來寫。

我自己平時有這種體會:看微信公眾號的文章都是想快速地看完,快速地拉到最後,目的快點開始看下一篇,新鮮感才能不斷刺激大腦。有時候碰到長文很花時間,可能就沒耐心看下去了,裡面說的東西也覺得很難理解,可能直接就放棄了。但是,如果我知道一篇文章價值很高,就會選一個精力比較充沛的時間段,花整塊時間看完,這時候反倒很容易看進去。這種情況下,潛意識裡就會知道我今天是一定要讀完這篇文章的,並且要把裡面有價值的東西都吸收進來。

所以,對於碼農桃花源的文章,我建議你收藏之後,找個空閑時間再好好看。

上周,我把 GitHub 項目 Go-Question 的內容整合成了開源電子書,閱讀體驗提升 N 倍,建議關註項目,現在已經 400 star 了,年底目標是 1k star。項目地址列在了參考資料里。

GitBook

另外,公眾號的文章也可以使用微信讀書看,體驗也非常贊,並且可以放到書架上,每個公眾號就是一本書,簡直酷炫。

微信讀書

閑話最後,一直“吹”了很久的曹大,新書《Go 語言高級編程》出版了!書的另一位作者是柴樹杉老師,這是給 Go 語言提交 pull 的人,他在 Go 語言上面的研究不用我多說了吧。我第一時間下了單,並且到曹大工位要了簽名。

Go 語言高級編程

這本書的推薦人有很多大佬,像許世偉,郝林,雨痕等,評價非常高。重點給大家看下雨痕老師對這本書的評價(上圖第二排左側圖):

本書闡明瞭官方文檔某些語焉不詳的部分,有助於 Gopher 瞭解更多內在實現,以及日常工作中需要用到的 RPC、Web、分散式應用等內容。我認識本書作者之一曹春暉,對他的學習態度和能力頗為欽佩,因此推薦大家閱讀本書。

大家可能不知道,出書一點都不賺錢,但投入的精力卻很大。但是像曹大在給讀者的書簽名時所說的:書籍是時代的生命。多少知識都是通過書本一代代傳承!

搬過幾次家就知道,紙質書太多,過程會比較痛苦。所以,我現在買紙書都會考慮再三。但是,這次我還是在第一時間下單了《Go 語言高級編程》。我也強烈推薦你買一本,支持原創者。

柴老師在武漢,我接觸不多。但和曹大卻是經常能見面(在同一個公司工作)。他本人經常活躍在各種微信群,社區,也非常樂於解答各種疑難雜症。上周還和曹大一起吃了個飯,請教了很多問題,我總結了一些對家都有用的東西,放在我的朋友圈:

曹大交流總結

如果你想圍觀我的朋友圈,想和我交流,可以長按下麵的二維碼加我好友,備註下來自公眾號。

wechat-QR

好了,下麵開始我們的正文。

併發模型

併發與並行

大家都知道著名的摩爾定律。1965 年,時任仙童公司的 Gordon Moore 發表文章,預測在未來十年,半導體晶元上的晶體管和電阻數量將每年增加一倍;1975 年,Moore 再次發表論文,將“每年”修改為“每兩年”。這個預測在 2012 年左右基本是正確的。

但隨著晶體管電路逐漸接近性能極限,摩爾定律終將走到盡頭。靠增加晶體管數量來提高電腦的性能不靈了。於是,人們開始轉換思路,用其他方法來提升電腦的性能,這就是多核電腦產生的原因。

這一招看起來還不錯,但是人們又遇到了一個另一個定律的限制,那就是 Amdahl's Law,它提出了一個模型用來衡量在並行模式下程式運行效率的提升。這個定律是說,一個程式能從並行上獲得性能提升的上限取決於有多少代碼必須寫成串列的。

舉個例子,對於一個和用戶打交道的界面程式,它必須和用戶打交道。用戶點一個按鈕,然後才能繼續運行下一步,這必須是串列執行的。這種程式的運行效率就取決於和用戶交互的速度,你有多少核都白瞎。用戶就是不按下一步,你怎麼辦?

2000 年左右雲計算興起,人們可以方便地獲取計算雲上的資源,方便地水平擴展自己的服務,可以輕而易舉地就調動多台機器資源甚至將計算任務分發到分佈在全球範圍的機器。但是也因此帶來了很多問題和挑戰。例如怎樣在機器間進行通信、聚合結果等。最難的一個挑戰是如何找到一個模型能用來描述 concurrent。

我們都知道,要想一段併發的代碼沒有任何 bug,是非常困難的。有些併發 bug 是在系統上線數年後才發現的,原因常常是很詭異的,比如用戶數增加到了某個界限。

併發問題一般有下麵這幾種:

數據競爭。簡單來說就是兩個或多個線程同時讀寫某個變數,造成了預料之外的結果。

原子性。在一個定義好的上下文里,原子性操作不可分割。上下文的定義非常重要。有些代碼,你在程式里看起來是原子的,如最簡單的 i++,但在機器層面看來,這條語句通常需要幾條指令來完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可以讓我們放心地構造併發安全的程式。

記憶體訪問同步。代碼中需要控制同時只有一個線程訪問的區域稱為臨界區。Go 語言中一般使用 sync 包里的 Mutex 來完成同步訪問控制。鎖一般會帶來比較大的性能開銷,因此一般要考慮加鎖的區域是否會頻繁進入、鎖的粒度如何控制等問題。

死鎖。在一個死鎖的程式里,每個線程都在等待其他線程,形成了一個首尾相連的尷尬局面,程式無法繼續運行下去。

活鎖。想象一下,你走在一條小路上,一個人迎面走來。你往左邊走,想避開他;他做了相反的事情,他往右邊走,結果兩個都過不了。之後,兩個人又都想從原來自己相反的方向走,還是同樣的結果。這就是活鎖,看起來都像在工作,但工作進度就是無法前進。

饑餓。併發的線程不能獲取它所需要的資源以進行下一步的工作。通常是有一個非常貪婪的線程,長時間占據資源不釋放,導致其他線程無法獲得資源。

關於併發和並行的區別,引用一個經典的描述:

併發是同一時間應對(dealing with)多件事情的能力。
並行是同一時間動手(doing)做多件事情的能力。

雨痕老師《Go 語言學習筆記》上的解釋:

併發是指邏輯上具備同時處理多個任務的能力;並行則是物理上同時執行多個任務。

而根據《Concurrency in Go》這本書,電腦的概念都是抽象的結果,併發和並行也不例外。它這樣描述併發和並行的區別:

Concurrency is a property of the code; parallelism is a property of the running program.

併發是代碼的特性,並行是正在運行的程式的特性。先忽略我拙劣的翻譯。很新奇,不是嗎?我也是第一次見到這樣的說法,細想一下,還是很有道理的。

我們一直說寫的代碼是併發的或者是並行的,但是我們能提供什麼保證嗎?如果在只有一個核的機器上跑並行的代碼,它還能並行嗎?你就是再天才,也無法寫出並行的程式。充其量也就是代碼上看起來“併發”的,如此而已。

當然,錶面上看起來還是並行的,但那不過 CPU 的障眼法,多個線程在分時共用 CPU 的資源,在一個粗糙的時間隔里看起來就是“並行”。

所以,我們實際上只能編寫“併發”的代碼,而不能編寫“並行”的代碼,而且只是希望併發的代碼能夠並行地執行。併發的代碼能否並行,取決於抽象的層級:代碼里的併發原語、runtime,操作系統(虛擬機、容器)。層級越來越底層,要求也越來越高。因此,我們談併發或並行實際上要指定上下文,也就是抽象的層級。

《Concurrency in Go》書里舉了一個例子:假如兩個人同時打開電腦上的計算器程式,這兩個程式肯定不會影響彼此,這就是並行。在這個例子中,上下文就是兩個人的機器,而兩個計算器進程就是並行的元素。

隨著抽象層次的降低,併發模型實際上變得更難也更重要,而越低層次的併發模型對我們也越重要。要想併發程式正確地執行,就要深入研究併發模型。

在 Go 語言發佈前,我們寫併發代碼時,考慮到的最底層抽象是:系統線程。Go 發佈之後,在這條抽象鏈上,又加一個 goroutine。而且 Go 從著名的電腦科學家 Tony Hoare 那借來一個概念:channel。Tony Hoare 就是那篇著名文章《Communicating Sequential Processes》的作者。

看起來事情變得更加複雜,因為 Go 又引入了一個更底層的抽象,但事實並不是這樣。因為 goroutine 並不是看起來的那樣又抽象了一層,它其實是替代了系統線程。Gopher 在寫代碼的時候,並不會去關心系統線程,大部分時候只需要考慮到 goroutine 和 channel。當然有時候會用到一些共用記憶體的概念,一般就是指 sync 包里的東西,比如 sync.Mutex。

什麼是 CSP

CSP 經常被認為是 Go 在併發編程上成功的關鍵因素。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發表在 ACM 的一篇論文。論文里指出一門編程語言應該重視 input 和 output 的原語,尤其是併發編程的代碼。

在那篇文章發表的時代,人們正在研究模塊化編程的思想,該不該用 goto 語句在當時是最激烈的議題。彼時,面向對象編程的思想正在崛起,幾乎沒什麼人關心併發編程。

在文章中,CSP 也是一門自定義的編程語言,作者定義了輸入輸出語句,用於 processes 間的通信(communicatiton)。processes 被認為是需要輸入驅動,並且產生輸出,供其他 processes 消費,processes 可以是進程、線程、甚至是代碼塊。輸入命令是:!,用來向 processes 寫入;輸出是:?,用來從 processes 讀出。這篇文章要講的 channel 正是借鑒了這一設計。

Hoare 還提出了一個 -> 命令,如果 -> 左邊的語句返回 false,那它右邊的語句就不會執行。

通過這些輸入輸出命令,Hoare 證明瞭如果一門編程語言中把 processes 間的通信看得第一等重要,那麼併發編程的問題就會變得簡單。

Go 是第一個將 CSP 的這些思想引入,並且發揚光大的語言。僅管記憶體同步訪問控制(原文是 memory access synchronization)在某些情況下大有用處,Go 里也有相應的 sync 包支持,但是這在大型程式很容易出錯。

Go 一開始就把 CSP 的思想融入到語言的核心裡,所以併發編程成為 Go 的一個獨特的優勢,而且很容易理解。

大多數的編程語言的併發編程模型是基於線程和記憶體同步訪問控制,Go 的併發編程的模型則用 goroutine 和 channel 來替代。Goroutine 和線程類似,channel 和 mutex (用於記憶體同步訪問控制)類似。

Goroutine 解放了程式員,讓我們更能貼近業務去思考問題。而不用考慮各種像線程庫、線程開銷、線程調度等等這些繁瑣的底層問題,goroutine 天生替你解決好了。

Channel 則天生就可以和其他 channel 組合。我們可以把收集各種子系統結果的 channel 輸入到同一個 channel。Channel 還可以和 select, cancel, timeout 結合起來。而 mutex 就沒有這些功能。

Go 的併發原則非常優秀,目標就是簡單:儘量使用 channel;把 goroutine 當作免費的資源,隨便用。

說明一下,前面這兩部分的內容來自英文開源書《Concurrency In Go》,強烈推薦閱讀。

引入結束,我們正式開始今天的主角:channel。

什麼是 channel

Goroutine 和 channel 是 Go 語言併發編程的 兩大基石。Goroutine 用於執行併發任務,channel 用於 goroutine 之間的同步、通信。

Channel 在 gouroutine 間架起了一條管道,在管道里傳輸數據,實現 gouroutine 間的通信;由於它是線程安全的,所以用起來非常方便;channel 還提供“先進先出”的特性;它還能影響 goroutine 的阻塞和喚醒。

相信大家一定見過一句話:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共用記憶體來通信,而要通過通信來實現記憶體共用。

這就是 Go 的併發哲學,它依賴 CSP 模型,基於 channel 實現。

簡直是一頭霧水,這兩句話難道不是同一個意思?

通過前面兩節的內容,我個人這樣理解這句話:前面半句說的是通過 sync 包里的一些組件進行併發編程;而後面半句則是說 Go 推薦使用 channel 進行併發編程。兩者其實都是必要且有效的。實際上看完本文後面對 channel 的源碼分析,你會發現,channel 的底層就是通過 mutex 來控制併發的。只是 channel 是更高一層次的併發編程原語,封裝了更多的功能。

關於是選擇 sync 包里的底層併發編程原語還是 channel,《Concurrency In Go》這本書的第 2 章 “Go's Philosophy on Concurrency” 里有一張決策樹和詳細的論述,再次推薦你去閱讀。我把圖貼出來:

concurrency code decision tree

channel 實現 CSP

Channel 是 Go 語言中一個非常重要的類型,是 Go 里的第一對象。通過 channel,Go 實現了通過通信來實現記憶體共用。Channel 是在多個 goroutine 之間傳遞數據和同步的重要手段。

使用原子函數、讀寫鎖可以保證資源的共用訪問安全,但使用 channel 更優雅。

channel 字面意義是“通道”,類似於 Linux 中的管道。聲明 channel 的語法如下:

chan T // 聲明一個雙向通道
chan<- T // 聲明一個只能用於發送的通道
<-chan T // 聲明一個只能用於接收的通道

單向通道的聲明,用 <- 來表示,它指明通道的方向。你只要明白,代碼的書寫順序是從左到右就馬上能掌握通道的方向是怎樣的。

因為 channel 是一個引用類型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函數進行初始化。可以向它傳遞一個 int 值,代表 channel 緩衝區的大小(容量),構造出來的是一個緩衝型的 channel;不傳或傳 0 的,構造的就是一個非緩衝型的 channel。

兩者有一些差別:非緩衝型 channel 無法緩衝元素,對它的操作一定順序是“發送-> 接收 -> 發送 -> 接收 -> ……”,如果連續向一個非緩衝 chan 發送 2 個元素,並且沒有接收的話,第二次一定會被阻塞;對於緩衝型 channel 的操作,則要“寬鬆”一些,畢竟是帶了“緩衝”光環。

為什麼要 channel

Go 通過 channel 實現 CSP 通信模型,主要用於 goroutine 之間的消息傳遞和事件通知。

有了 channel 和 goroutine 之後,Go 的併發編程變得異常容易和安全,得以讓程式員把註意力留到業務上去,實現開發效率的提升。

要知道,技術並不是最重要的,它只是實現業務的工具。一門高效的開發語言讓你把節省下來的時間,留著去做更有意義的事情,比如寫寫文章。

channel 實現原理

對 chan 的發送和接收操作都會在編譯期間轉換成為底層的發送接收函數。

Channel 分為兩種:帶緩衝、不帶緩衝。對不帶緩衝的 channel 進行的操作實際上可以看作“同步模式”,帶緩衝的則稱為“非同步模式”。

同步模式下,發送方和接收方要同步就緒,只有在兩者都 ready 的情況下,數據才能在兩者間傳輸(後面會看到,實際上就是記憶體拷貝)。否則,任意一方先行進行發送或接收操作,都會被掛起,等待另一方的出現才能被喚醒。

非同步模式下,在緩衝槽可用的情況下(有剩餘容量),發送和接收操作都可以順利進行。否則,操作的一方(如寫入)同樣會被掛起,直到出現相反操作(如接收)才會被喚醒。

小結一下:同步模式下,必須要使發送方和接收方配對,操作才會成功,否則會被阻塞;非同步模式下,緩衝槽要有剩餘容量,操作才會成功,否則也會被阻塞。

數據結構

直接上源碼(版本是 1.9.2):

type hchan struct {
    // chan 里元素數量
    qcount   uint
    // chan 底層迴圈數組的長度
    dataqsiz uint
    // 指向底層迴圈數組的指針
    // 只針對有緩衝的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被關閉的標誌
    closed   uint32
    // chan 中元素類型
    elemtype *_type // element type
    // 已發送元素在迴圈數組中的索引
    sendx    uint   // send index
    // 已接收元素在迴圈數組中的索引
    recvx    uint   // receive index
    // 等待接收的 goroutine 隊列
    recvq    waitq  // list of recv waiters
    // 等待發送的 goroutine 隊列
    sendq    waitq  // list of send waiters

    // 保護 hchan 中所有欄位
    lock mutex
}

關於欄位的含義都寫在註釋里了,再來重點說幾個欄位:

buf 指向底層迴圈數組,只有緩衝型的 channel 才有。

sendxrecvx 均指向底層迴圈數組,表示當前可以發送和接收的元素位置索引值(相對於底層數組)。

sendqrecvq 分別表示被阻塞的 goroutine,這些 goroutine 由於嘗試讀取 channel 或向 channel 發送數據而被阻塞。

waitqsudog 的一個雙向鏈表,而 sudog 實際上是對 goroutine 的一個封裝:

type waitq struct {
    first *sudog
    last  *sudog
}

lock 用來保證每個讀 channel 或寫 channel 的操作都是原子的。

例如,創建一個容量為 6 的,元素為 int 型的 channel 數據結構如下 :

chan data structure

創建

我們知道,通道有兩個方向,發送和接收。理論上來說,我們可以創建一個只發送或只接收的通道,但是這種通道創建出來後,怎麼使用呢?一個只能發的通道,怎麼接收呢?同樣,一個只能收的通道,如何向其發送數據呢?

一般而言,使用 make 創建一個能收能發的通道:

// 無緩衝通道
ch1 := make(chan int)
// 有緩衝通道
ch2 := make(chan int, 10)

通過彙編分析,我們知道,最終創建 chan 的函數是 makechan

func makechan(t *chantype, size int64) *hchan

從函數原型來看,創建的 chan 是一個指針。所以我們能在函數間直接傳遞 channel,而不用傳遞 channel 的指針。

具體來看下代碼:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 省略了檢查 channel size,align 的代碼
    // ……

    var c *hchan
    // 如果元素類型不含指針 或者 size 大小為 0(無緩衝類型)
    // 只進行一次記憶體分配
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // 如果 hchan 結構體中不含指針,GC 就不會掃描 chan 中的元素
        // 只分配 "hchan 結構體大小 + 元素大小*個數" 的記憶體
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 如果是緩衝型 channel 且元素大小不等於 0(大小等於 0的元素類型:struct{})
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            // 1. 非緩衝型的,buf 沒用,直接指向 chan 起始地址處
            // 2. 緩衝型的,能進入到這裡,說明元素無指針且元素類型為 struct{},也無影響
            // 因為只會用到接收和發送游標,不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內容)
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 進行兩次記憶體分配操作
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    // 迴圈數組長度
    c.dataqsiz = uint(size)

    // 返回 hchan 指針
    return c
}

新建一個 chan 後,記憶體在堆上分配,大概長這樣:

make chan

說明一下,這張圖來源於 Gopher Con 上的一份 PPT,地址見參考資料。這份材料非常清晰易懂,推薦你去讀。

接下來,我們用一個來自參考資料【深入 channel 底層】的例子來理解創建、發送、接收的整個過程。

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先創建了一個無緩衝的 channel,接著啟動兩個 goroutine,並將前面創建的 channel 傳遞進去。然後,向這個 channel 中發送數據 3,最後 sleep 1 秒後程式退出。

程式第 14 行創建了一個非緩衝型的 channel,我們只看 chan 結構體中的一些重要欄位,來從整體層面看一下 chan 的狀態,一開始什麼都沒有:

unbuffered chan

接收

在繼續分析前面小節的例子前,我們先來看一下接收相關的源碼。在清楚了接收的具體過程之後,也就能輕鬆理解具體的例子了。

接收操作有兩種寫法,一種帶 "ok",反應 channel 是否關閉;一種不帶 "ok",這種寫法,當接收到相應類型的零值時無法知道是真實的發送者發送過來的值,還是 channel 被關閉後,返回給接收者的預設類型的零值。兩種寫法,都有各自的應用場景。

經過編譯器的處理後,這兩種寫法最後對應源碼里的這兩個函數:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv1 函數處理不帶 "ok" 的情形,chanrecv2 則通過返回 "received" 這個欄位來反應 channel 是否被關閉。接收值則比較特殊,會“放到”參數 elem 所指向的地址了,這很像 C/C++ 里的寫法。如果代碼里忽略了接收值,這裡的 elem 為 nil。

無論如何,最終轉向了 chanrecv 函數:

// 位於 src/runtime/chan.go

// chanrecv 函數接收 channel c 的元素並將其寫入 ep 所指向的記憶體地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數據可接收的情況下,返回 (false, false)
// 否則,如果 c 處於關閉狀態,將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的記憶體地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函數調用者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內容 …………

    // 如果是一個 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否則,接收一個 nil 的 channel,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會執行到這裡
        throw("unreachable")
    }

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
    // 當我們觀察到 channel 沒準備好接收:
    // 1. 非緩衝型,等待發送列隊 sendq 里沒有 goroutine 在等待
    // 2. 緩衝型,但 buf 里沒有元素
    // 之後,又觀察到 closed == 0,即 channel 未關閉。
    // 因為 channel 不可能被重覆打開,所以前一個觀測的時候 channel 也是未關閉的,
    // 因此在這種情況下可以直接宣佈接收失敗,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關閉,並且迴圈數組 buf 里沒有元素
    // 這裡可以處理非緩衝型關閉 和 緩衝型關閉但 buf 無元素的情況
    // 也就是說即使是關閉狀態,但在緩衝型的 channel,
    // buf 里有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個已關閉的 channel 執行接收操作,且未忽略返回值
            // 那麼接收的值將是一個該類型的零值
            // typedmemclr 根據類型清理相應地址的記憶體
            typedmemclr(c.elemtype, ep)
        }
        // 從一個已關閉的 channel 接收,selected 會返回true
        return true, false
    }

    // 等待發送隊列里有 goroutine 存在,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩衝型的 channel
    // 2. 緩衝型的 channel,但 buf 滿了
    // 針對 1,直接進行記憶體拷貝(從 sender goroutine -> receiver goroutine)
    // 針對 2,接收到迴圈數組頭部的元素,並將發送者的元素放到迴圈數組尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩衝型,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // 直接從迴圈數組裡找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉迴圈數組裡相應位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游標向前移動
        c.recvx++
        // 接收游標歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數組裡的元素個數減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構造一個 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收數據的地址保存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進入channel 的等待接收隊列
    c.recvq.enqueue(mysg)
    // 將當前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了,接著從這裡繼續執行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

上面的代碼註釋地比較詳細了,你可以對著源碼一行行地去看,我們再來詳細看一下。

  • 如果 channel 是一個空值(nil),在非阻塞模式下,會直接返回。在阻塞模式下,會調用 gopark 函數掛起 goroutine,這個會一直阻塞下去。因為在 channel 是 nil 的情況下,要想不阻塞,只有關閉它,但關閉一個 nil 的 channel 又會發生 panic,所以沒有機會被喚醒了。更詳細地可以在 closechan 函數的時候再看。

  • 和發送函數一樣,接下來搞了一個在非阻塞模式下,不用獲取鎖,快速檢測到失敗並且返回的操作。順帶插一句,我們平時在寫代碼的時候,找到一些邊界條件,快速返回,能讓代碼邏輯更清晰,因為接下來的正常情況就比較少,更聚焦了,看代碼的人也更能專註地看核心代碼邏輯了。

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

當我們觀察到 channel 沒準備好接收:

  1. 非緩衝型,等待發送列隊里沒有 goroutine 在等待
  2. 緩衝型,但 buf 里沒有元素

之後,又觀察到 closed == 0,即 channel 未關閉。

因為 channel 不可能被重覆打開,所以前一個觀測的時候, channel 也是未關閉的,因此在這種情況下可以直接宣佈接收失敗,快速返回。因為沒被選中,也沒接收到數據,所以返回值為 (false, false)。

  • 接下來的操作,首先會上一把鎖,粒度比較大。如果 channel 已關閉,並且迴圈數組 buf 里沒有元素。對應非緩衝型關閉和緩衝型關閉但 buf 無元素的情況,返回對應類型的零值,但 received 標識是 false,告訴調用者此 channel 已關閉,你取出來的值並不是正常由發送者發送過來的數據。但是如果處於 select 語境下,這種情況是被選中了的。很多將 channel 用作通知信號的場景就是命中了這裡。

  • 接下來,如果有等待發送的隊列,說明 channel 已經滿了,要麼是非緩衝型的 channel,要麼是緩衝型的 channel,但 buf 滿了。這兩種情況下都可以正常接收數據。

於是,調用 recv 函數:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是非緩衝型的 channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        // 未忽略接收的數據
        if ep != nil {
            // 直接拷貝數據,從 sender goroutine -> receiver goroutine
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 緩衝型的 channel,但 buf 已滿。
        // 將迴圈數組 buf 隊首的元素拷貝到接收數據的地址
        // 將發送者的數據入隊。實際上這時 revx 和 sendx 值相等
        // 找到接收游標
        qp := chanbuf(c, c.recvx)
        // …………
        // 將接收游標處的數據拷貝給接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }

        // 將發送者數據拷貝到 buf
        typedmemmove(c.elemtype, qp, sg.elem)
        // 更新游標值
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx
    }
    sg.elem = nil
    gp := sg.g

    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }

    // 喚醒發送的 goroutine。需要等到調度器的光臨
    goready(gp, skip+1)
}

如果是非緩衝型的,就直接從發送者的棧拷貝到接收者的棧。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

否則,就是緩衝型 channel,而 buf 又滿了的情形。說明發送游標和接收游標重合了,因此需要先找到接收游標:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

將該處的元素拷貝到接收地址。然後將發送者待發送的數據拷貝到接收游標處。這樣就完成了接收數據和發送數據的操作。接著,分別將發送游標和接收游標向前進一,如果發生“環繞”,再從 0 開始。

最後,取出 sudog 里的 goroutine,調用 goready 將其狀態改成 “runnable”,待發送者被喚醒,等待調度器的調度。

  • 然後,如果 channel 的 buf 里還有數據,說明可以比較正常地接收。註意,這裡,即使是在 channel 已經關閉的情況下,也是可以走到這裡的。這一步比較簡單,正常地將 buf 里接收游標處的數據拷貝到接收數據的地址。

  • 到了最後一步,走到這裡來的情形是要阻塞的。當然,如果 block 傳進來的值是 false,那就不阻塞,直接返回就好了。

先構造一個 sudog,接著就是保存各種值了。註意,這裡會將接收數據的地址存儲到了 elem 欄位,當被喚醒時,接收到的數據就會保存到這個欄位指向的地址。然後將 sudog 添加到 channel 的 recvq 隊列里。調用 goparkunlock 函數將 goroutine 掛起。

接下來的代碼就是 goroutine 被喚醒後的各種收尾工作了。

我們繼續之前的例子。前面說到第 14 行,創建了一個非緩衝型的 channel,接著,第 15、16 行分別創建了一個 goroutine,各自執行了一個接收操作。通過前面的源碼分析,我們知道,這兩個 goroutine (後面稱為 G1 和 G2 好了)都會被阻塞在接收操作。G1 和 G2 會掛在 channel 的 recq 隊列中,形成一個雙向迴圈鏈表。

在程式的 17 行之前,chan 的整體數據結構如下:

chan struct at the runtime

buf 指向一個長度為 0 的數組,qcount 為 0,表示 channel 中沒有元素。重點關註 recvqsendq,它們是 waitq 結構體,而 waitq 實際上就是一個雙向鏈表,鏈表的元素是 sudog,裡面包含 g 欄位,g 表示一個 goroutine,所以 sudog 可以看成一個 goroutine。recvq 存儲那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲那些嘗試寫入 channel,但被阻塞的 goroutine。

此時,我們可以看到,recvq 里掛了兩個 goroutine,也就是前面啟動的 G1 和 G2。因為沒有 goroutine 接收,而 channel 又是無緩衝類型,所以 G1 和 G2 被阻塞。sendq 沒有被阻塞的 goroutine。

recvq 的數據結構如下。這裡直接引用文章中的一幅圖,用了三維元素,畫得很好:

recvq structure

再從整體上來看一下 chan 此時的狀態:

chan state

G1 和 G2 被掛起了,狀態是 WAITING。關於 goroutine 調度器這塊不是今天的重點,當然後面肯定會寫相關的文章。這裡先簡單說下,goroutine 是用戶態的協程,由 Go runtime 進行管理,作為對比,內核線程由 OS 進行管理。Goroutine 更輕量,因此我們可以輕鬆創建數萬 goroutine。

一個內核線程可以管理多個 goroutine,當其中一個 goroutine 阻塞時,內核線程可以調度其他的 goroutine 來運行,內核線程本身不會阻塞。這就是通常我們說的 M:N 模型:

M:N scheduling

M:N 模型通常由三部分構成:M、P、G。M 是內核線程,負責運行 goroutine;P 是 context,保存 goroutine 運行所需要的上下文,它還維護了可運行(runnable)的 goroutine 列表;G 則是待運行的 goroutine。M 和 P 是 G 運行的基礎。

MGP

繼續回到例子。假設我們只有一個 M,當 G1(go goroutineA(ch)) 運行到 val := <- a 時,它由本來的 running 狀態變成了 waiting 狀態(調用了 gopark 之後的結果):

G1 running

G1 脫離與 M 的關係,但調度器可不會讓 M 閑著,所以會接著調度另一個 goroutine 來運行:

G1 waiting

G2 也是同樣的遭遇。現在 G1 和 G2 都被掛起了,等待著一個 sender 往 channel 里發送數據,才能得到解救。

發送

接著上面的例子,G1 和 G2 現在都在 recvq 隊列里了。

ch <- 3

第 17 行向 channel 發送了一個元素 3。

發送操作最終轉化為 chansend 函數,直接上源碼,同樣大部分都註釋了,可以看懂主流程:

// 位於 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未發送成功
        if !block {
            return false
        }
        // 當前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關……

    // 對於不阻塞的 send,快速檢測失敗場景
    //
    // 如果 channel 未關閉且 channel 沒有多餘的緩衝空間。這可能是:
    // 1. channel 是非緩衝型的,且等待接收隊列里沒有 goroutine
    // 2. channel 是緩衝型的,但迴圈數組已經裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel,併發安全
    lock(&c.lock)

    // 如果 channel 關閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收隊列里有 goroutine,直接將要發送的數據拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對於緩衝型的 channel,如果還有緩衝空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將數據從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 發送游標值加 1
        c.sendx++
        // 如果發送游標值等於容量值,游標值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩衝區的元素數量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,則直接返回錯誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了,發送方會被阻塞。接下來會構造一個 sudog

    // 獲取當前 goroutine 的指針
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當前 goroutine 進入發送等待隊列
    c.sendq.enqueue(mysg)

    // 當前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這裡開始被喚醒了(channel 有機會可以發送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒後,channel 關閉了。坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上綁定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

上面的代碼註釋地比較詳細了,我們來詳細看看。

  • 如果檢測到 channel 是空的,當前 goroutine 會被掛起。

  • 對於不阻塞的發送操作,如果 channel 未關閉並且沒有多餘的緩衝空間(說明:a. channel 是非緩衝型的,且等待接收隊列里沒有 goroutine;b. channel 是緩衝型的,但迴圈數組已經裝滿了元素)

對於這一點,runtime 源碼里註釋了很多。這一條判斷語句是為了在不阻塞發送的場景下快速檢測到發送失敗,好快速返回。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
}

註釋里主要講為什麼這一塊可以不加鎖,我詳細解釋一下。if 條件里先讀了兩個變數:block 和 c.closed。block 是函數的參數,不會變;c.closed 可能被其他 goroutine 改變,因為沒加鎖嘛,這是“與”條件前面兩個表達式。

最後一項,涉及到三個變數:c.dataqsiz,c.recvq.first,c.qcount。c.dataqsiz == 0 && c.recvq.first == nil 指的是非緩衝型的 channel,並且 recvq 里沒有等待接收的 goroutine;c.dataqsiz > 0 && c.qcount == c.dataqsiz 指的是緩衝型的 channel,但迴圈數組已經滿了。這裡 c.dataqsiz 實際上也是不會被修改的,在創建的時候就已經確定了。不加鎖真正影響地是 c.qcountc.recvq.first

這一部分的條件就是兩個 word-sized read,就是讀兩個 word 操作:c.closedc.recvq.first(非緩衝型) 或者 c.qcount(緩衝型)。

當我們發現 c.closed == 0 為真,也就是 channel 未被關閉,再去檢測第三部分的條件時,觀測到 c.recvq.first == nil 或者 c.qcount == c.dataqsiz 時(這裡忽略 c.dataqsiz),就斷定要將這次發送操作作失敗處理,快速返回 false。

這裡涉及到兩個觀測項:channel 未關閉、channel not ready for sending。這兩項都會因為沒加鎖而出現觀測前後不一致的情況。例如我先觀測到 channel 未被關閉,再觀察到 channel not ready for sending,這時我以為能滿足這個 if 條件了,但是如果這時 c.closed 變成 1,這時其實就不滿足條件了,誰讓你不加鎖呢!

但是,因為一個 closed channel 不能將 channel 狀態從 'ready for sending' 變成 'not ready for sending',所以當我觀測到 'not ready for sending' 時,channel 不是 closed。即使 c.closed == 1,即 channel 是在這兩個觀測中間被關閉的,那也說明在這兩個觀測中間,channel 滿足兩個條件:not closednot ready for sending,這時,我直接返回 false 也是沒有問題的。

這部分解釋地比較繞,其實這樣做的目的就是少獲取一次鎖,提升性能。

  • 如果檢測到 channel 已經關閉,直接 panic。

  • 如果能從等待接收隊列 recvq 里出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的,沒有元素,所以才會有等待接收者。這時會調用 send 函數將元素直接從發送者的棧拷貝到接收者的棧,關鍵操作由 sendDirect 函數完成。

// send 函數處理向一個空的 channel 發送操作

// ep 指向被髮送的元素,會被直接拷貝到接收的 goroutine
// 之後,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待隊列里有 goroutine,肯定是空的)
// c 必須被上鎖,發送操作執行完後,會使用 unlockf 函數解鎖
// sg 必須已經從等待隊列里取出來了
// ep 必須是非空,並且它指向堆或調用者的棧

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……

    // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
    if sg.elem != nil {
        // 直接拷貝記憶體(從發送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上綁定的 goroutine
    gp := sg.g
    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒接收的 goroutine. skip 和列印棧相關,暫時不理會
    goready(gp, skip+1)
}

繼續看 sendDirect 函數:

// 向一個非緩衝型的 channel 發送數據、從一個無元素的(非緩衝型或緩衝型但空)的 channel
// 接收數據,都會導致一個 goroutine 直接操作另一個 goroutine 的棧
// 由於 GC 假設對棧的寫操作只能發生在 goroutine 正在運行中並且由當前 goroutine 來寫
// 所以這裡實際上違反了這個假設。可能會造成一些問題,所以需要用到寫屏障來規避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在當前 goroutine 的棧上,dst 是另一個 goroutine 的棧

    // 直接進行記憶體"搬遷"
    // 如果目標地址的棧發生了棧收縮,當我們讀出了 sg.elem 後
    // 就不能修改真正的 dst 位置的值了
    // 因此需要在讀和寫之前加上一個屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

這裡涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作,一般而言,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設。為了不出問題,寫的過程中增加了寫屏障,保證正確地完成寫操作。這樣做的好處是減少了一次記憶體 copy:不用先拷貝到 channel 的 buf,直接由發送者到接收者,沒有中間商賺差價,效率得以提高,完美。

然後,解鎖、喚醒接收者,等待調度器的光臨,接收者也得以重見天日,可以繼續執行接收操作之後的代碼了。

  • 如果 c.qcount < c.dataqsiz,說明緩衝區可用(肯定是緩衝型的 channel)。先通過函數取出待發送元素應該去到的位置:
qp := chanbuf(c, c.sendx)

// 返回迴圈隊列里第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

c.sendx 指向下一個待發送元素在迴圈數組中的位置,然後調用 typedmemmove 函數將其拷貝到迴圈數組中。之後 c.sendx 加 1,元素總量加 1 :c.qcount++,最後,解鎖並返回。

  • 如果沒有命中以上條件的,說明 channel 已經滿了。不管這個 channel 是緩衝型的還是非緩衝型的,都要將這個 sender “關起來”(goroutine 被阻塞)。如果 block 為 false,直接解鎖,返回 false。

  • 最後就是真的需要被阻塞的情況。先構造一個 sudog,將其入隊(channel 的 sendq 欄位)。然後調用 goparkunlock 將當前 goroutine 掛起,並解鎖,等待合適的時機再喚醒。

喚醒之後,從 goparkunlock 下一行代碼開始繼續往下執行。

這裡有一些綁定操作,sudog 通過 g 欄位綁定 goroutine,而 goroutine 通過 waiting 綁定 sudog,sudog 還通過 elem 欄位綁定待發送元素的地址,以及 c 欄位綁定被“坑”在此處的 channel。

所以,待發送的元素地址其實是存儲在 sudog 結構體里,也就是當前 goroutine 里。

好了,看完源碼。我們接著來分析例子,相信大家已經把例子忘得差不多了,我再貼一下代碼:

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("goroutine A received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("goroutine B received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)

    ch1 := make(chan struct{})
}

在發送小節里我們說到 G1 和 G2 現在被掛起來了,等待 sender 的解救。在第 17 行,主協程向 ch 發送了一個元素 3,來看下接下來會發生什麼。

根據前面源碼分析的結果,我們知道,sender 發現 ch 的 recvq 里有 receiver 在等待著接收,就會出隊一個 sudog,把 recvq 里 first 指針的 sudo “推舉”出來了,並將其加入到 P 的可運行 goroutine 隊列中。

然後,sender 把發送元素拷貝到 sudog 的 elem 地址處,最後會調用 goready 將 G1 喚醒,狀態變為 runnable。

G1 runnable

當調度器光顧 G1 時,將 G1 變成 running 狀態,執行 goroutineA 接下來的代碼。G 表示其他可能有的 goroutine。

這裡其實涉及到一個協程寫另一個協程棧的操作。有兩個 receiver 在 channel 的一邊虎視眈眈地等著,這時 channel 另一邊來了一個 sender 準備向 channel 發送數據,為了高效,用不著通過 channel 的 buf “中轉”一次,直接從源地址把數據 copy 到目的地址就可以了,效率高啊!

send direct

上圖是一個示意圖,3 會被拷貝到 G1 棧上的某個位置,也就是 val 的地址處,保存在 elem 欄位。

關閉

關閉某個 channel,會執行函數 closechan

func closechan(c *hchan) {
    // 關閉一個 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上鎖
    lock(&c.lock)
    // 如果 channel 已經關閉
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改關閉狀態
    c.closed = 1

    var glist *g

    // 將 channel 所有等待接收隊列的里 sudog 釋放
    for {
        // 從接收隊列里出隊一個 sudog
        sg := c.recvq.dequeue()
        // 出隊完畢,跳出迴圈
        if sg == nil {
            break
        }

        // 如果 elem 不為空,說明此 receiver 未忽略接收數據
        // 給它賦一個相應類型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相連,形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 將 channel 等待發送隊列里的 sudog 釋放
    // 如果存在,這些 goroutine 將會 panic
    for {
        // 從發送隊列里出隊一個 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 發送者會 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍歷鏈表
    for glist != nil {
        // 取最後一個
        gp := glist
        // 向前走一步,下一個喚醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 喚醒相應 goroutine
        goready(gp, 3)
    }
}

close 邏輯比較簡單,對於一個 channel,recvq 和 sendq 中分別保存了阻塞的發送者和接收者。關閉 channel 後,對於等待接收者而言,會收到一個相應類型的零值。對於等待發送者,會直接 panic。所以,在不瞭解 channel 還有沒有接收者的情況下,不能貿然關閉 channel。

close 函數先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最後,再將所有的 sudog 全都喚醒。

喚醒之後,該幹嘛幹嘛。sender 會繼續執行 chansend 函數里 goparkunlock 函數之後的代碼,很不幸,檢測到 channel 已經關閉了,panic。receiver 則比較幸運,進行一些掃尾工作後,返回。這裡,selected 返回 true,而返回值 received 則要根據 channel 是否關閉,返回不同的值。如果 channel 關閉,received 為 false,否則為 true。這我們分析的這種情況下,received 返回 false。

channel 進階

總結一下操作 channel 的結果:

操作 nil channel closed channel not nil, not closed channel
close panic panic 正常關閉
讀 <- ch 阻塞 讀到對應類型的零值 阻塞或正常讀取數據。緩衝型 channel 為空或非緩衝型 channel 沒有等待發送者時會阻塞
寫 ch <- 阻塞 panic 阻塞或正常寫入數據。非緩衝型 channel 沒有等待接收者或緩衝型 channel buf 滿時會被阻塞

總結一下,發生 panic 的情況有三種:向一個關閉的 channel 進行寫操作;關閉一個 nil 的 channel;重覆關閉一個 channel。

讀、寫一個 nil channel 都會被阻塞。

發送和接收元素的本質

Channel 發送和接收元素的本質是什麼?參考資料【深入 channel 底層】里是這樣回答的:

Remember all transfer of value on the go channels happens with the copy of value.

就是說 channel 的發送和接收操作本質上都是 “值的拷貝”,無論是從 sender goroutine 的棧到 chan buf,還是從 chan buf 到 receiver goroutine,或者是直接從 sender goroutine 到 receiver goroutine。

這裡再引用文中的一個例子,我會加上更加詳細地解釋。順帶說一下,這是一篇英文的博客,寫得很好,沒有像我們這篇文章那樣大段的源碼分析,它是將代碼里情況拆開來各自描述的,各有利弊吧。推薦去讀下原文,閱讀體驗比較好。

type user struct {
    name string
    age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
    fmt.Println("modifyUser Received Vaule", pu)
    pu.name = "Anand"
}

func printUser(u <-chan *user) {
    time.Sleep(2 * time.Second)
    fmt.Println("printUser goRoutine called", <-u)
}

func main() {
    c := make(chan *user, 5)
    c <- g
    fmt.Println(g)
    // modify g
    g = &user{name: "Ankur Anand", age: 100}
    go printUser(c)
    go modifyUser(g)
    time.Sleep(5 * time.Second)
    fmt.Println(g)
}

運行結果:

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

這裡就是一個很好的 share memory by communicating 的例子。

output

一開始構造一個結構體 u,地址是 0x56420,圖中地址上方就是它的內容。接著把 &u 賦值給指針 g,g 的地址是 0x565bb0,它的內容就是一個地址,指向 u。

main 程式里,先把 g 發送到 c,根據 copy value 的本質,進入到 chan buf 里的就是 0x56420,它是指針 g 的值

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • SpringMVC是一個一種基於Java的實現MVC設計模型的請求驅動類型的輕量級web框架 SpringMVC的入門案例 1. 2.導入相關jar包 3,在web.xml中配置前端控制器 4.編寫控制器類 5.開啟掃描註解 6.編寫兩個JSP 首頁 成功頁面 入門案例的流程總結 1.伺服器啟動,加 ...
  • # 電影天堂爬蟲 **今日目標** 爬取電影天堂前30頁最新電影以及下載地址 ```python # 1. 列印程式執行時間 # 2. 數據爬下來後做處理(字元串),定義成字典 # 3. 一條龍: 獲取 -> 調用解析 -> 數據處理 from urllib import request impor... ...
  • 第七章 父 子關係文檔 打虎親兄弟,上陣父子兵。 本章作為 複雜搜索 的鋪墊,介紹父子文檔是為了更好的介紹複雜場景下的ES操作。 在非關係型資料庫資料庫中,我們常常會有表與表的關聯查詢。例如學生表和成績表的關聯查詢就能查出學會的信息和成績信息。在ES中,父子關係文檔就類似於表的關聯查詢。 背景 ES ...
  • 列表列表用中括弧[ ]把各種數據框起來,每一個數據叫作“元素”。每個元素之間都要用英文逗號隔開各種類型的數據(整數/浮點數/字元串)————————————————————————————從列表提取單個元素每個元素都有自己的位置編號(即偏移量) 1.偏移量是從0開始的2.列表名後加帶偏移量的中括弧, ...
  • 一、抽象工廠模式 1、生活場景 汽車生產根據用戶選擇的汽車類型,指定不同的工廠進行生產,選擇紅旗轎車,就要使用中國工廠,選擇奧迪轎車,就要使用德國工廠。 2、抽象工廠模式 1) 抽象工廠模式:定義了一個interface用於創建相關對象或相互依賴的對象,而無需指明具體的類; 2) 抽象工廠模式可以將 ...
  • 一.首先程式啟動,顯示下麵內容供用戶選擇: 1.請登錄 2.請註冊 3.進入文章頁面 4.進入評論頁面 5.進入日記頁面 6.進入收藏頁面 7.註銷賬號 8.退出整個程式 二.必須實現的功能: 1.註冊功能要求: a.用戶名、密碼要記錄在文件中。 b.用戶名要求:只能含有字母或者數字不能含有特殊字元 ...
  • 我喜歡直接了當, 這次主要是推薦蟒營大媽的 Python 入門課(https://py.101.camp), 還有不到一周就要開課了, 歡迎轉發推薦~ 點擊"夏日大作戰:從小白到小能手的 Python 學習之旅", 直接訪問課程的詳細信息, 以及課程的由來故事DM2: call back 下文為曾經 ...
  • 運用input函數搜集信息 input()函數結果的賦值name = input('請輸入你的名字:') #將input()函數的執行結果(收集的信息)賦值給變數name input()函數的使用場景1.函數結果賦值 name=input()2.搜集信息 name=input(xxx)3.輸出結果 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...