go channel

来源:https://www.cnblogs.com/studyios/archive/2023/12/01/17871024.html
-Advertisement-
Play Games

小市值選股策略的核心在於通過綜合分析公司的基本面、行業定位、財務健康狀況以及市場趨勢, 來尋找那些被市場低估但具備顯著成長潛力的股票,同時也要重視風險管理和投資組合的多樣化。 今天來給大家分享下小市值策略代碼如下: # 顯式導入 BigQuant 相關 SDK 模塊 from bigdatasour ...


模型

三部分組成:

發送等待隊列
接收等待隊列
管道 緩存區

定義

runtimechan.go

type hchan struct {
	qcount   uint          
	dataqsiz uint          
	buf      unsafe.Pointer 
	elemsize uint16
	elemtype *_type // element type
    // 上面的這幾個組成了一個環形緩存區

	closed   uint32 // 關閉狀態

    // 下麵4個組成了2個隊列 g的
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	lock mutex  // 鎖,用於在操作 channel的 元素時候,保證併發安全 
}

特殊緩存區

qcount  已經存儲的個數
dataqsiz  環形隊列緩存的容量,即允許緩存的消息最大個數
buf   指向這個緩存區的地址
elemsize 元素的大小
elemtype  元素的類型

設計成這樣,主要目的不需要gc來清理,因為環形機構,會自動把刪除數據記憶體給占了。

環形緩存可以大幅降低GC的開銷

兩個隊列

這裡還要看下 waitq 的定義

 type waitq struct {
  	first *sudog
  	last  *sudog
 }

// 在sema中也有用到,就g結構體的封裝
type sudog struct {
        g *g 
	next *sudog
	prev *sudog
	elem unsafe.Pointer // 接受參數的地址 
    // 當是接收g時候,如果有緩存中有數據,直接把數據拷貝到這個地址

}

註意:這裡的sendx 並不是指向發送隊列中的g,而且發送隊列應該寫入環形緩存區的index,
同理,recvx也是,指向接受數據的g,應該從緩衝區的那個index取數據

互斥鎖

lock mutex  

互斥鎖並不是排隊發送/接收數據

不是讓發送和接收隊列來排隊的,這些發送和接收數據的隊列,休眠也不是在鎖的sema里

互斥鎖保護的hchan結構體本身

所以, Channel並不是無鎖的

狀態值

	closed   uint32 // 關閉狀態

0為開啟、1為關閉

當一個關閉的channel,再往裡寫或者重覆關閉、就會panic。但是可以讀。

發送數據

c<- 關鍵字是一個語法糖

編譯階段,會把C<- 轉化為 ruintime.chansend1

chansend1 會調用 chansend0 方法

直接發送 (已經有接收隊列在等)

發送數據前,己經有G在休眠等待接收

此時緩存肯定是空的,不用考慮緩存

將數據直接拷貝給G的接收變數,喚醒G

實現

// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}
// 部分源碼
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //有加鎖,如果這時候,再來一個g也發送,就會休眠去 sema隊列了
	lock(&c.lock)
	if c.closed != 0 { // 如果已經關閉,就會報錯,上面講過給一個關閉的chan發送會panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 從接收隊列裡面拿一個 g
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
        // 上面的註釋講的很清楚,直接把值給接收者,繞過 channel的buffer
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      //  和之前講的 sudog的elem對上了
      if sg.elem != nil {
        // 直接把數據拷貝到 接收者的 elem中
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
    // 喚醒這個協程 接收者的g
	goready(gp, skip+1)
}

步驟:

從隊列里取出一個等待接收的G
將數據直接拷貝到接收變數中
喚醒G,接收者的g

放入緩存

沒有G在休眠等待,但是有緩存空間
將數據放入緩存

實現

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 上面是直接發送的源碼,被截除
    // 當緩存隊列存入的數量 小於 緩存的容量,就是還有緩存空間
	if c.qcount < c.dataqsiz {
        // 緩存區接受數據的地址
		qp := chanbuf(c, c.sendx)
		// 將數據拷貝過去
		typedmemmove(c.elemtype, qp, ep)
        
        // 指示下一個發送數據,存在那個緩衝區,這裡有個邏輯下麵講
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
        // 元素個數加1,釋放鎖並返回
		c.qcount++
		unlock(&c.lock)
		return true
	}
 }

    qp := chanbuf(c, c.sendx)
    // chanbuf(c, i) is pointer to the i'th slot in the buffer
    // 返回緩衝區的 第幾個槽
    // 寫入緩存區
    c.sendx++
    意味著這裡使用  sendx 來指引下一個發送的數據,寫到幾個槽,
    所以才有下麵,如果滿了,就從0開始,形成環形
    if c.sendx == c.dataqsiz {
  		c.sendx = 0
    }

整個邏輯比較清晰:

  獲取可存入的緩存地址
  存入數據
  維護索引

休眠等待

沒有G在休眠等待,而旦沒有緩存或滿了

自己進入發送隊列,休眠等待

實現

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  
  // 接著 直接放入緩存的代碼

  // Block on the channel. Some receiver will complete our operation for us.
	gp := getg() 
	mysg := acquireSudog() // 把自己組裝成一個 sudog結構體
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep  // 要發送的數據 也是放到這個  elem中的
	mysg.waitlink = nil
	mysg.g = gp   // sudug 的g等於自己的g結構體
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 進入waitReasonChanSend 休眠
  // 然後去 gopark 休眠了,這個方法在將 協程切換的時候,講過
 // 協程到這裡就 休眠了,不繼續執行了,直到被喚醒。
}

步驟:

    把自己包裝成sudog

    sudog放入sendq隊列

    休眠並解鎖

    被喚醒後,數據已經被取走,維護其他數據 (下麵講 接收時候講)

發送小結

  1. 編譯階段,會把<-轉化為 runtime.chansend10

  2. 直接發送時,將數據直接拷貝到目標變數

  3. 放入緩存時,將數據放入環形緩存,成功返回

  4. 休眠等待時,將自己包裝後放入sendp, 休眠

接收

<-c 關鍵字

<-c 關鍵字是一個語法糖

編譯階段,i<-C轉化為 runtime.chanrecv()

編譯階段,i, ok<-c轉化為 runtime.chanrecv()

最後會調用 chanrecv() 方法

無緩存區、有發送協程在等待

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	
	if c == nil {  // 如果讀一個 block 為true的 channel ,協程會直接休眠,
                      // 正常讀channel這個 block都是 true
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}
  // 中間刪了一段  block 為 false的情況

	lock(&c.lock)

	if c.closed != 0 {  // 如果channel已經關閉
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)  // 有數據 放回數據
			}
			return true, false  // 沒數據,返回一個 false
		}
	} else {
		if sg := c.sendq.dequeue(); sg != nil { // 如果發送隊列中有g
			
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  // 調用這個 recv方法
			return true, true
		}
	}
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 { // 無緩存區
		if ep != nil {  
			// copy data from sender
			recvDirect(c.elemtype, sg, ep) // 直接把 傳過來的g 的數據取走
		}
	}
    
    // 給g跟新下參數
        sg.elem = nil
  	gp := sg.g
  	unlockf()
  	gp.param = unsafe.Pointer(sg)
  	sg.success = true
  	if sg.releasetime != 0 {
  		sg.releasetime = cputicks()
  	}
  	goready(gp, skip+1) // 喚醒g,這時候,發送的數據已經被取走了
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
	memmove(dst, src, t.Size_)
}

步驟:

  判斷有G 在發送隊列等待,進入recv()
  判斷此 Channel 無緩存
  直接從等待的 G 中取走數據,喚醒G

這裡有兩個地方要註意,代碼中也標記了:

如果channel為nil,再去讀會直接休眠阻塞。這裡只的是 block為 true的讀,block為false的情況後面講,正常channe都是true

如果channel close了, 去讀有值返回值,沒值返回 false

有等待的g,緩存區滿了

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 上面是緩存區為0的情況
        qp := chanbuf(c, c.recvx) // 取緩存區的數據,用的recvx 標記,和開頭的總結對應上了
        
        // ep 就是 a <-c ,這個a的地址,如果a是nil,說明傳遞的值,沒有用,只是要個時機
		// ep may be nil, in which case received data is ignored.
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        // 將發送隊列中休眠的這個g的數據 拷貝到了 緩存去
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++ // 取數據的地址增加
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz  
        
        // 喚醒了這個 發送數據的g,因為這個g的數據已經放到了緩存區,不用休眠等待了
        sg.elem = nil
    	gp := sg.g
    	unlockf()
    	gp.param = unsafe.Pointer(sg)
    	sg.success = true
    	if sg.releasetime != 0 {
    		sg.releasetime = cputicks()
    	}
    	goready(gp, skip+1)  

}

步驟:

  接收數據前,已經有G 在休眠等待發送

  而且這個 Channel 有緩存

  從緩存取走一個數據

  將休眠G 的數據放進緩存,喚醒G

接收緩存,沒有發送g在等待

直接從 緩存區拿數據走就行

 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx) // 取出數據
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
        // 上面講過 也許接收的變數為空
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) // 數據拷貝過去
		}
		typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理緩存區已經取走的這個數據的空間
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
}

步驟:

  判斷沒有 G 在發送隊列等待

  判斷此 Channel 有緩存

  從緩存中取走一個數據

接收阻塞

比較多用在不讓協程退出,除非收到 context的cancel消息等。

沒有 G 在休眠等待,而旦沒有緩存或緩存空

自己進入接收隊列,休眠等待

代碼實現

  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

  // no sender available: block on this channel.
  	gp := getg()  
  	mysg := acquireSudog() //把自己包裝成 sudog

  	mysg.releasetime = 0
  	if t0 != 0 {
  		mysg.releasetime = -1
  	}
  	mysg.elem = ep // 接收數據的地址,拷貝到了elem
  	mysg.waitlink = nil
  	gp.waiting = mysg
  	mysg.g = gp
  	mysg.isSelect = false
  	mysg.c = c
  	gp.param = nil
  	c.recvq.enqueue(mysg) // 加入了等待接收隊列

  	gp.parkingOnChan.Store(true)
  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠
  }

這裡和上面的發送時候,已經有等待接收的g,對上了。

步驟:

判斷沒有 G 在發送隊列等待

判斷此 Channel 無緩存

將自己包裝成 sudog

sudog 放入接收等待隊列,休眠

喚醒時,發送的 G 已經把數據拷貝到位

接收總結:

編譯階段,<-C 會轉化為 chanrecv()

有等待的G,旦無緩存時,從G 接收

有等待的 G,且有緩存時,從緩存接收

無等待的 G,且緩存有數據,從緩存接收

無等待的 G,且緩存無數據,等待喂數據

看上面代碼時候,講過一般使用時候,那個blocktrue的,什麼情況下blockfalse,下篇聊。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近有個需求需要實現自定義首頁佈局,需要將屏幕按照 6 列 4 行進行等分成多個格子,然後將組件可拖拽對應格子進行渲染展示。 示例 對比一些已有的插件,發現想要實現產品的交互效果,沒有現成可用的。本身功能並不是太過複雜,於是決定自己基於 vue 手擼一個簡易的 Grid 拖拽佈局。 完整源碼在此,在 ...
  • 項目背景: vue 1.創建 backtop.vue 的回到頂部邏輯的組件 <template> <transition name="back-up-fade"> <div class="back-top" :style="{ bottom: bottom + 'px', right: right ...
  • 理解 async/await 的原理和使用方法是理解現代JavaScript非同步編程的關鍵。這裡我會提供一個詳細的實例,涵蓋原理、流程、使用方法以及一些註意事項。代碼註釋會儘量詳盡,確保你理解每個步驟。 實例:使用async/await進行非同步操作 <!DOCTYPE html> <html lan ...
  • 本文檔譯自 www.codeproject.com 的文章 "Calling Conventions Demystified",作者 Nemanja Trifunovic,原文參見此處 引言 - Introduction 在學習 Windows 編程的漫長、艱難而美妙的旅途中,你可能會對函數聲明前出 ...
  • 如何使用mysql實現可重入的分散式鎖 目錄 什麼是分散式鎖? 如何實現分散式鎖? 定義分散式表結構 定義鎖統一介面 使用mysql來實現分散式鎖 ① 生成線程標記ID ② 加鎖 ③ 解鎖 ④ 重置鎖 寫在最後 1. 什麼是分散式鎖? 百度百科:分散式鎖是控制分散式系統之間同步訪問共用資源的一種方式 ...
  • Scikit-learn是一個基於Python的開源機器學習庫,它提供了大量的機器學習演算法和工具,方便用戶進行數據挖掘、分析和預測。 Scikit-learn是基於另外兩個知名的庫 Scipy 和 Numpy的,關於 Scipy 和 Numpy 等庫,之前的系列文章中有介紹: Scipy 基礎系列 ...
  • 第二部分主要涵蓋了 SpringMVC 中作用域處理,介紹了 Request 作用域、Session 作用域和應用作用域的處理方式,以及 @ModelAttribute 註解的使用和 ModelAndView 的使用方法;最後,探討了靜態資源的處理方式,包括使用 DefaultServlet 或者 ... ...
  • acwing week2 基礎演算法3總結 總結點1:雙指針演算法 //常用模版框架 for (int i = 0, j = 0; i < n; i ++ ) { while (j < i && check(i, j)) j ++ ; } 常見問題分類: (1) 對於一個序列,用兩個指針維護一段區間 ( ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...