go併發 - channel

来源:https://www.cnblogs.com/asdfzxv/archive/2023/11/19/17841629.html
-Advertisement-
Play Games

公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 在這篇文章中,我們將通過示例來學習 Java 函數式介面。 函數式介面的特點 只包含一個抽象方法的介面稱為函數式介面。 它可以有任意數量的預設靜態方法,但只能包含一個抽象方法。它還可以聲明對象類的方法。 函數介面也稱為單一 ...


概述

併發編程是利用多核心能力,提升程式性能,而多線程之間需要相互協作、共用資源、線程安全等。任何併發模型都要解決線程間通訊問題,毫不誇張的說線程通訊是併發編程的主要問題。go使用著名的CSP(Communicating Sequential Process,通訊順序進程)併發模型,從設計之初 Go 語言就註重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程式員專註於分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。channel是線程簡通訊的具體實現之一,本質就是一個線程安全的 FIFO 阻塞隊列(先進先出),向隊列中寫入數據,在另一個線程從隊列讀取數據。很多語言都有類似實現,比如 Java 的線程池任務隊列。

基本使用

通道是引用類型,需要使用 make 創建,格式如下

通道實例 := make(chan 數據類型, 通道長度)
  • 數據類型:通道內傳輸的元素類型,可以基本數據類型,也可以使自定義數據類型。
  • 通道實例:通過make創建的通道句柄,與函數名稱一樣,指向通道的記憶體首地址。
  • 通道長度:通道本質是隊列,創建時候可指定長度,預設為0

創建通道

ch1 := make(chan int)                 // 創建一個整型類型的通道
ch2 := make(chan interface{})         // 創建一個空介面類型的通道, 可以存放任意格式
ch3 := make(chan *Equip)             // 創建Equip指針類型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10)         // 創建Equip指針類型的通道, 並指定隊列長度

通道本質就是線程安全的隊列,創建時候可以指定隊列長度,預設為0。

向通道寫入數據,使用語法非常形象,寫入channel <-,讀取<-channel

ch2 := make(chan interface{}, 10)
ch2<- 10			// 向隊列寫入
n := <-ch2 			// 從隊列讀取
fmt.Println(n)		// 10

箭頭語法雖然很形象,但是有些奇怪,也不利於擴展。使用函數方式感覺更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,擴展性也更強,通過參數可增加超時、同步、非同步等技能。

箭頭符號並沒有規定位置,與C指針一樣,如下兩個語句等效

ch1 := make(chan int)
i := <-ch1			
i := <- ch1

箭頭語法的讀寫有相對性,可讀性一般,有時候無法分辨是讀或寫,看起來很奇怪,如下偽代碼

func main() {
	input := make(chan int, 2)
	output := make(chan int, 2)

	go func() {
		input <- 10
	}()
	output<- <-input
	fmt.Println(<-output)
}

管道是用於協程之間通訊,主流使用方式如下

ch2 := make(chan interface{}, 10)

go func() {
    data := <-ch2			// 用戶協程讀取
    fmt.Println(data)
}()
     
ch2 <- "hello"				// 主協程寫入
time.Sleep(time.Second)

管道也支持遍歷,與箭頭符號一樣,無數據時候迴圈將被阻塞,迴圈永遠不會結束,除非關閉管道

chanInt := make(chan int, 10)

for chanInt, ok := range chanInts {
    fmt.Println(chanInt)
}

管道也支持關閉,關閉後的管道不允許寫入,panic 異常

chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1		// panic: send on closed channel

讀取則不同,已有數據可繼續讀取,無數據時返回false,不阻塞

if value, ok := <-chanInts; ok {			// 從管道讀取數據不在阻塞
    fmt.Println("從管讀取=", value)
} else {
    fmt.Println("從管道讀取失敗", ok)
    return
}

單向管道

管道也支持單向模式,僅允許讀取、或者寫入

var queue <-chan string = make(chan string)

函數形參也可以定義定向管道

func customer(channel <-chan string) {		// 形參為只讀管道
    for {		
        message := <-channel				// 只允許讀取數據
        fmt.Println(message)
    }
}
channel := make(chan string)
go customer(channel)

管道阻塞

Go管道的讀寫都是同步模式,當管道容量還有空間,則寫入成功,否則將阻塞直到寫入成功。從管道讀取也一樣,有數據直接讀取,否則將阻塞直到讀取成功。

var done = make(chan bool)

func aGoroutine() {
    fmt.Println("hello")
    done <- true			// 寫管道
}

func main() {
    go aGoroutine()
    <-done					// 讀阻塞
}

主協程從管道讀取數據時將被阻塞,直到用戶協程寫入數據。管道非常適合用於生產者消費者模式,需要平滑兩者的性能差異,可通過管道容量實現緩衝,所以除非特定場景,都建議管道容量大於零。

有些場景可以使用管道控制線程併發數

// 待補充

阻塞特性也帶來了些問題,程式無法控制超時(箭頭函數語法的後遺症),go 也提供瞭解決方案, 使用select關鍵,與網路編程的select函數類似,監測多個通道是否可讀狀態,都可讀隨機選擇一個,都不可讀進入Default分支,否則阻塞

select {
    case n := <-input:
        fmt.Println(n)
    case m := <-output:
        fmt.Println(m)
    default:
        fmt.Println("default")
}

當然也可以使用select向管道寫入數據,只要不關閉管道總是可寫入,此時加入default分支永遠不會被執行到,如下隨機石頭剪刀布

ch := make(chan string)
go func() {
    for {
        select {
            case ch <- "石頭":
            case ch <- "剪刀":
            case ch <- "布":
        }
    }
}()

for value := range ch {
    log.Println(value)
    time.Sleep(time.Second)
}

模擬線程池

由於go的管道非常輕量且簡潔,大部分直接使用,封裝線程池模式並不常見。案例僅作為功能演示,非常簡單幾十行代碼即可實現線程池的基本功能,體現了go併發模型的簡潔、高效。

type Runnable interface {
	Start()
}

// 線程池對象
type ThreadPool struct {
	queueSize int
	workSize  int
	channel   chan Runnable
	wait      sync.WaitGroup
}

// 工作線程, 執行非同步任務
func (pool *ThreadPool) doWorker(name string) {
	log.Printf("%s 啟動工作協程", name)
	for true {
		if runnable, ok := <-pool.channel; ok {
			log.Printf("%s 獲取任務, %v\n", name, runnable)
			runnable.Start()
			log.Printf("%s 任務執行成功, %v\n", name, runnable)
		} else {
			log.Printf("%s 線程池關閉, 退出工作協程\n", name)
			pool.wait.Done()
			return
		}
	}
}

// 啟動工作線程
func (pool *ThreadPool) worker() {
	pool.wait.Add(pool.workSize)
	for i := 0; i < pool.workSize; i++ {
		go pool.doWorker(fmt.Sprintf("work-%d", i))
	}
}

// Submit 提交任務
func (pool *ThreadPool) Submit(task Runnable) bool {
	defer func() { recover() }()
	pool.channel <- task
	return true
}

// Close 關閉線程池
func (pool *ThreadPool) Close() {
	defer func() { recover() }()
	close(pool.channel)
}

// Wait 等待線程池任務完成
func (pool *ThreadPool) Wait() {
	pool.Close()
	pool.wait.Wait()
}

// NewThreadPool 工廠函數,創建線程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
	pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
	pool.worker()
	return pool
}

使用線程池

type person struct {
	name string
}

func (p *person) Start() {
	fmt.Println(p.name)
}

func main() {
	threadPool := executor.NewThreadPool(10, 2)		// 創建線程池, 隊列長度10, 工作線程2

	for i := 0; i < 5; i++ {
		threadPool.Submit(&person{name: "xx"})		// 提交十個任務
	}
        
	threadPool.Wait()								// 阻塞等待所有任務完成
}

模擬管道

任何線程之間的通訊都依賴底層鎖機制,channel是對鎖機制封裝後的實現對象,與Java中線程池任務隊列機制幾乎一樣,但要簡潔很多。使用切片簡單模擬
介面聲明

type Queue interface {
	// Put 向隊列添加任務, 添加成功返回true, 添加失敗返回false, 隊列滿了則阻塞直到添加成功
	Put(task interface{}) bool

	// Get 從隊列獲取任務, 一直阻塞直到獲取任務, 隊列關閉且沒有任務則返回false
	Get() (interface{}, bool)

	// Size 查看隊列中的任務數
	Size() int

	// Close 關閉隊列, 關閉後將無法添加任務, 已有的任務可以繼續獲取
	Close()
}

基於切片實現

// SliceQueue 使用切片實現, 自動擴容屬性隊列永遠都不會滿, 擴容時候會觸發數據複製, 性能一般
type SliceQueue struct {
	sync.Mutex
	cond  *sync.Cond
	queue []interface{}
	close atomic.Bool
}

func (q *SliceQueue) Get() (data interface{}, ok bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if len(q.queue) == 0 {
			if q.close.Load() == true {
				return nil, false
			}
			q.cond.Wait()
		}
		if data := q.doGet(); data != nil {
			return data, true
		}
	}
	return
}

func (q *SliceQueue) doGet() interface{} {
	if len(q.queue) >= 1 {
		data := q.queue[0]
		q.queue = q.queue[1:]
		return data
	}
	return nil
}

func (q *SliceQueue) Put(task interface{}) bool {
	q.Lock()
	defer func() {
		q.cond.Signal()
		q.Unlock()
	}()

	if q.close.Load() == true {
		return false
	}
	q.queue = append(q.queue, task)
	return true
}

func (q *SliceQueue) Size() int {
	return len(q.queue)
}

func (q *SliceQueue) Close() {
	if q.close.Load() == true {
		return
	}

	q.Lock()
	defer q.Unlock()

	q.close.Store(true)
	q.cond.Broadcast()
}

func NewSliceQueue() Queue {
	sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
	sliceQueue.cond = sync.NewCond(sliceQueue)
	return sliceQueue
}

基於環行數組實現

type ArrayQueue struct {
	sync.Mutex
	readCond     *sync.Cond
	writeCond    *sync.Cond
	readIndex    int
	writeIndex   int
	queueMaxSize int
	close        atomic.Bool
	queue        []interface{}
}

func (q *ArrayQueue) Put(task interface{}) bool {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.close.Load() == true {
			return false
		}
		if q.IsFull() {
			q.writeCond.Wait()
			if q.IsFull() {
				continue
			}
		}
		q.queue[q.writeIndex] = task
		q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
		q.readCond.Signal()
		return true
	}
	return true
}

func (q *ArrayQueue) Get() (interface{}, bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.IsEmpty() {
			if q.close.Load() == true {
				return nil, false
			}
			q.readCond.Wait()
			if q.IsEmpty() {
				continue
			}
		}
		task := q.queue[q.readIndex]
		q.readIndex = (q.readIndex + 1) % q.queueMaxSize
		q.writeCond.Signal()
		return task, true
	}
	return nil, true
}

func (q *ArrayQueue) Size() int {
	return q.queueMaxSize
}

func (q *ArrayQueue) Close() {
	if q.close.Load() == true {
		return
	}
	q.Lock()
	q.Unlock()
	q.close.Store(true)
	q.readCond.Broadcast()
}

func (q *ArrayQueue) IsFull() bool {
	return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}

func (q *ArrayQueue) IsEmpty() bool {
	return q.readIndex == q.writeIndex
}

func NewArrayQueue(size int) Queue {
	queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
	queue.readCond = sync.NewCond(queue)
	queue.writeCond = sync.NewCond(queue)
	return queue
}

測試用例

func TestWith(t *testing.T) {
	q := NewSliceQueue()
	go func() {
		time.Sleep(time.Second * 2)
		q.Put(true)  // 向隊列寫入數據, 與 chan<- 功能相同
	}()

	q.Get()			// 阻塞直到讀取數據, 與 <-chan 功能相同
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • python 版本:3.6 win32 版本(因為一些特殊原因必須使用3.6) pymssql 版本:2.2.0 連接資料庫: import pymssql** def InitMssql(self): try: host = self.IniConfig.get('default','dbhost ...
  • 學習完基礎的圖像演算法,開始接觸OpenCV學習: 灰度圖中,一個像素點上的灰度級需要一個位元組(byte,2^8,8 bit)進行存儲,此時的灰度圖是二維的。而當我們需要轉換為彩色圖時,即三維,便會產生顏色通道(Channel),這個時候,一個像素點上的灰度級便會需要三個位元組來進行存儲。 可以藉助笛卡 ...
  • 本文只發佈於利用OpenCV實現尺度不變性與角度不變性的特征找圖演算法和知乎 一般來說,利用OpenCV實現找圖功能,用的比較多的是模板匹配(matchTemplate)。筆者比較喜歡裡面的NCC演算法。但是模板有個很明顯的短板,面對尺度改變,角度改變的目標就無能為力了。因此本文旨在做到模板匹配做不到的 ...
  • 目錄: Redis是什麼? Redis優缺點? Redis為什麼這麼快? 講講Redis的線程模型? Redis應用場景有哪些? Memcached和Redis的區別? 為什麼要用 Redis 而不用 map/guava 做緩存? Redis 數據類型有哪些? SortedSet和List異同點? ...
  • 14.1、概述 在實際工作中,一般使用配置類和註解代替web.xml和SpringMVC配置文件的功能; 在 Servlet3.0 環境中,容器會在類路徑中查找實現了 javax.servlet.ServletContainerInitializer 介面的類, 如果找到了的話,就會用它來配置 Se ...
  • int i=1; i=i++; int j=i++; int k=i + ++i * i++; System.out.println("i="+i); System.out.println("j="+j); System.out.println("k="+k); ...
  • 寫在前面 今天狀態很不好,我發現學這部分知識的時候,會出現溜號或者註意力無法集中的情況。 我能想到的是,大概率是這部分知識,應該是超出了我現在的水平了,也就是說我存在知識斷層了,整體感覺真的是一知半解。 那有同學會問了,那你能說明白嗎? 我理解的肯定能呀,來往下看! Flask的使用 1、消息閃現的 ...
  • rust中的枚舉有什麼用?枚舉可以嵌入類型的好處是什麼 你可以在同一個枚舉中既有單個值,也有元組或結構體。 枚舉的每個變體可以擁有不同數量和類型的關聯數據。 這增加了類型的靈活性和表達力,使你能夠更精確地建模你的數據。 我知道rust可以為枚舉創建方法,那在哪種場景下枚舉會比結構體會有優勢 表示多個 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...