go併發 - channel

来源:https://www.cnblogs.com/asdfzxv/archive/2023/11/19/17841629.html
-Advertisement-
Play Games

公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 在這篇文章中,我們將通過示例來學習 Java 函數式介面。 函數式介面的特點 只包含一個抽象方法的介面稱為函數式介面。 它可以有任意數量的預設靜態方法,但只能包含一個抽象方法。它還可以聲明對象類的方法。 函數介面也稱為單一 ...


概述

併發編程是利用多核心能力,提升程式性能,而多線程之間需要相互協作、共用資源、線程安全等。任何併發模型都要解決線程間通訊問題,毫不誇張的說線程通訊是併發編程的主要問題。go使用著名的CSP(Communicating Sequential Process,通訊順序進程)併發模型,從設計之初 Go 語言就註重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程式員專註於分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。channel是線程簡通訊的具體實現之一,本質就是一個線程安全的 FIFO 阻塞隊列(先進先出),向隊列中寫入數據,在另一個線程從隊列讀取數據。很多語言都有類似實現,比如 Java 的線程池任務隊列。

基本使用

通道是引用類型,需要使用 make 創建,格式如下

通道實例 := make(chan 數據類型, 通道長度)
  • 數據類型:通道內傳輸的元素類型,可以基本數據類型,也可以使自定義數據類型。
  • 通道實例:通過make創建的通道句柄,與函數名稱一樣,指向通道的記憶體首地址。
  • 通道長度:通道本質是隊列,創建時候可指定長度,預設為0

創建通道

ch1 := make(chan int)                 // 創建一個整型類型的通道
ch2 := make(chan interface{})         // 創建一個空介面類型的通道, 可以存放任意格式
ch3 := make(chan *Equip)             // 創建Equip指針類型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10)         // 創建Equip指針類型的通道, 並指定隊列長度

通道本質就是線程安全的隊列,創建時候可以指定隊列長度,預設為0。

向通道寫入數據,使用語法非常形象,寫入channel <-,讀取<-channel

ch2 := make(chan interface{}, 10)
ch2<- 10			// 向隊列寫入
n := <-ch2 			// 從隊列讀取
fmt.Println(n)		// 10

箭頭語法雖然很形象,但是有些奇怪,也不利於擴展。使用函數方式感覺更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,擴展性也更強,通過參數可增加超時、同步、非同步等技能。

箭頭符號並沒有規定位置,與C指針一樣,如下兩個語句等效

ch1 := make(chan int)
i := <-ch1			
i := <- ch1

箭頭語法的讀寫有相對性,可讀性一般,有時候無法分辨是讀或寫,看起來很奇怪,如下偽代碼

func main() {
	input := make(chan int, 2)
	output := make(chan int, 2)

	go func() {
		input <- 10
	}()
	output<- <-input
	fmt.Println(<-output)
}

管道是用於協程之間通訊,主流使用方式如下

ch2 := make(chan interface{}, 10)

go func() {
    data := <-ch2			// 用戶協程讀取
    fmt.Println(data)
}()
     
ch2 <- "hello"				// 主協程寫入
time.Sleep(time.Second)

管道也支持遍歷,與箭頭符號一樣,無數據時候迴圈將被阻塞,迴圈永遠不會結束,除非關閉管道

chanInt := make(chan int, 10)

for chanInt, ok := range chanInts {
    fmt.Println(chanInt)
}

管道也支持關閉,關閉後的管道不允許寫入,panic 異常

chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1		// panic: send on closed channel

讀取則不同,已有數據可繼續讀取,無數據時返回false,不阻塞

if value, ok := <-chanInts; ok {			// 從管道讀取數據不在阻塞
    fmt.Println("從管讀取=", value)
} else {
    fmt.Println("從管道讀取失敗", ok)
    return
}

單向管道

管道也支持單向模式,僅允許讀取、或者寫入

var queue <-chan string = make(chan string)

函數形參也可以定義定向管道

func customer(channel <-chan string) {		// 形參為只讀管道
    for {		
        message := <-channel				// 只允許讀取數據
        fmt.Println(message)
    }
}
channel := make(chan string)
go customer(channel)

管道阻塞

Go管道的讀寫都是同步模式,當管道容量還有空間,則寫入成功,否則將阻塞直到寫入成功。從管道讀取也一樣,有數據直接讀取,否則將阻塞直到讀取成功。

var done = make(chan bool)

func aGoroutine() {
    fmt.Println("hello")
    done <- true			// 寫管道
}

func main() {
    go aGoroutine()
    <-done					// 讀阻塞
}

主協程從管道讀取數據時將被阻塞,直到用戶協程寫入數據。管道非常適合用於生產者消費者模式,需要平滑兩者的性能差異,可通過管道容量實現緩衝,所以除非特定場景,都建議管道容量大於零。

有些場景可以使用管道控制線程併發數

// 待補充

阻塞特性也帶來了些問題,程式無法控制超時(箭頭函數語法的後遺症),go 也提供瞭解決方案, 使用select關鍵,與網路編程的select函數類似,監測多個通道是否可讀狀態,都可讀隨機選擇一個,都不可讀進入Default分支,否則阻塞

select {
    case n := <-input:
        fmt.Println(n)
    case m := <-output:
        fmt.Println(m)
    default:
        fmt.Println("default")
}

當然也可以使用select向管道寫入數據,只要不關閉管道總是可寫入,此時加入default分支永遠不會被執行到,如下隨機石頭剪刀布

ch := make(chan string)
go func() {
    for {
        select {
            case ch <- "石頭":
            case ch <- "剪刀":
            case ch <- "布":
        }
    }
}()

for value := range ch {
    log.Println(value)
    time.Sleep(time.Second)
}

模擬線程池

由於go的管道非常輕量且簡潔,大部分直接使用,封裝線程池模式並不常見。案例僅作為功能演示,非常簡單幾十行代碼即可實現線程池的基本功能,體現了go併發模型的簡潔、高效。

type Runnable interface {
	Start()
}

// 線程池對象
type ThreadPool struct {
	queueSize int
	workSize  int
	channel   chan Runnable
	wait      sync.WaitGroup
}

// 工作線程, 執行非同步任務
func (pool *ThreadPool) doWorker(name string) {
	log.Printf("%s 啟動工作協程", name)
	for true {
		if runnable, ok := <-pool.channel; ok {
			log.Printf("%s 獲取任務, %v\n", name, runnable)
			runnable.Start()
			log.Printf("%s 任務執行成功, %v\n", name, runnable)
		} else {
			log.Printf("%s 線程池關閉, 退出工作協程\n", name)
			pool.wait.Done()
			return
		}
	}
}

// 啟動工作線程
func (pool *ThreadPool) worker() {
	pool.wait.Add(pool.workSize)
	for i := 0; i < pool.workSize; i++ {
		go pool.doWorker(fmt.Sprintf("work-%d", i))
	}
}

// Submit 提交任務
func (pool *ThreadPool) Submit(task Runnable) bool {
	defer func() { recover() }()
	pool.channel <- task
	return true
}

// Close 關閉線程池
func (pool *ThreadPool) Close() {
	defer func() { recover() }()
	close(pool.channel)
}

// Wait 等待線程池任務完成
func (pool *ThreadPool) Wait() {
	pool.Close()
	pool.wait.Wait()
}

// NewThreadPool 工廠函數,創建線程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
	pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
	pool.worker()
	return pool
}

使用線程池

type person struct {
	name string
}

func (p *person) Start() {
	fmt.Println(p.name)
}

func main() {
	threadPool := executor.NewThreadPool(10, 2)		// 創建線程池, 隊列長度10, 工作線程2

	for i := 0; i < 5; i++ {
		threadPool.Submit(&person{name: "xx"})		// 提交十個任務
	}
        
	threadPool.Wait()								// 阻塞等待所有任務完成
}

模擬管道

任何線程之間的通訊都依賴底層鎖機制,channel是對鎖機制封裝後的實現對象,與Java中線程池任務隊列機制幾乎一樣,但要簡潔很多。使用切片簡單模擬
介面聲明

type Queue interface {
	// Put 向隊列添加任務, 添加成功返回true, 添加失敗返回false, 隊列滿了則阻塞直到添加成功
	Put(task interface{}) bool

	// Get 從隊列獲取任務, 一直阻塞直到獲取任務, 隊列關閉且沒有任務則返回false
	Get() (interface{}, bool)

	// Size 查看隊列中的任務數
	Size() int

	// Close 關閉隊列, 關閉後將無法添加任務, 已有的任務可以繼續獲取
	Close()
}

基於切片實現

// SliceQueue 使用切片實現, 自動擴容屬性隊列永遠都不會滿, 擴容時候會觸發數據複製, 性能一般
type SliceQueue struct {
	sync.Mutex
	cond  *sync.Cond
	queue []interface{}
	close atomic.Bool
}

func (q *SliceQueue) Get() (data interface{}, ok bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if len(q.queue) == 0 {
			if q.close.Load() == true {
				return nil, false
			}
			q.cond.Wait()
		}
		if data := q.doGet(); data != nil {
			return data, true
		}
	}
	return
}

func (q *SliceQueue) doGet() interface{} {
	if len(q.queue) >= 1 {
		data := q.queue[0]
		q.queue = q.queue[1:]
		return data
	}
	return nil
}

func (q *SliceQueue) Put(task interface{}) bool {
	q.Lock()
	defer func() {
		q.cond.Signal()
		q.Unlock()
	}()

	if q.close.Load() == true {
		return false
	}
	q.queue = append(q.queue, task)
	return true
}

func (q *SliceQueue) Size() int {
	return len(q.queue)
}

func (q *SliceQueue) Close() {
	if q.close.Load() == true {
		return
	}

	q.Lock()
	defer q.Unlock()

	q.close.Store(true)
	q.cond.Broadcast()
}

func NewSliceQueue() Queue {
	sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
	sliceQueue.cond = sync.NewCond(sliceQueue)
	return sliceQueue
}

基於環行數組實現

type ArrayQueue struct {
	sync.Mutex
	readCond     *sync.Cond
	writeCond    *sync.Cond
	readIndex    int
	writeIndex   int
	queueMaxSize int
	close        atomic.Bool
	queue        []interface{}
}

func (q *ArrayQueue) Put(task interface{}) bool {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.close.Load() == true {
			return false
		}
		if q.IsFull() {
			q.writeCond.Wait()
			if q.IsFull() {
				continue
			}
		}
		q.queue[q.writeIndex] = task
		q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
		q.readCond.Signal()
		return true
	}
	return true
}

func (q *ArrayQueue) Get() (interface{}, bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.IsEmpty() {
			if q.close.Load() == true {
				return nil, false
			}
			q.readCond.Wait()
			if q.IsEmpty() {
				continue
			}
		}
		task := q.queue[q.readIndex]
		q.readIndex = (q.readIndex + 1) % q.queueMaxSize
		q.writeCond.Signal()
		return task, true
	}
	return nil, true
}

func (q *ArrayQueue) Size() int {
	return q.queueMaxSize
}

func (q *ArrayQueue) Close() {
	if q.close.Load() == true {
		return
	}
	q.Lock()
	q.Unlock()
	q.close.Store(true)
	q.readCond.Broadcast()
}

func (q *ArrayQueue) IsFull() bool {
	return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}

func (q *ArrayQueue) IsEmpty() bool {
	return q.readIndex == q.writeIndex
}

func NewArrayQueue(size int) Queue {
	queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
	queue.readCond = sync.NewCond(queue)
	queue.writeCond = sync.NewCond(queue)
	return queue
}

測試用例

func TestWith(t *testing.T) {
	q := NewSliceQueue()
	go func() {
		time.Sleep(time.Second * 2)
		q.Put(true)  // 向隊列寫入數據, 與 chan<- 功能相同
	}()

	q.Get()			// 阻塞直到讀取數據, 與 <-chan 功能相同
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • python 版本:3.6 win32 版本(因為一些特殊原因必須使用3.6) pymssql 版本:2.2.0 連接資料庫: import pymssql** def InitMssql(self): try: host = self.IniConfig.get('default','dbhost ...
  • 學習完基礎的圖像演算法,開始接觸OpenCV學習: 灰度圖中,一個像素點上的灰度級需要一個位元組(byte,2^8,8 bit)進行存儲,此時的灰度圖是二維的。而當我們需要轉換為彩色圖時,即三維,便會產生顏色通道(Channel),這個時候,一個像素點上的灰度級便會需要三個位元組來進行存儲。 可以藉助笛卡 ...
  • 本文只發佈於利用OpenCV實現尺度不變性與角度不變性的特征找圖演算法和知乎 一般來說,利用OpenCV實現找圖功能,用的比較多的是模板匹配(matchTemplate)。筆者比較喜歡裡面的NCC演算法。但是模板有個很明顯的短板,面對尺度改變,角度改變的目標就無能為力了。因此本文旨在做到模板匹配做不到的 ...
  • 目錄: Redis是什麼? Redis優缺點? Redis為什麼這麼快? 講講Redis的線程模型? Redis應用場景有哪些? Memcached和Redis的區別? 為什麼要用 Redis 而不用 map/guava 做緩存? Redis 數據類型有哪些? SortedSet和List異同點? ...
  • 14.1、概述 在實際工作中,一般使用配置類和註解代替web.xml和SpringMVC配置文件的功能; 在 Servlet3.0 環境中,容器會在類路徑中查找實現了 javax.servlet.ServletContainerInitializer 介面的類, 如果找到了的話,就會用它來配置 Se ...
  • int i=1; i=i++; int j=i++; int k=i + ++i * i++; System.out.println("i="+i); System.out.println("j="+j); System.out.println("k="+k); ...
  • 寫在前面 今天狀態很不好,我發現學這部分知識的時候,會出現溜號或者註意力無法集中的情況。 我能想到的是,大概率是這部分知識,應該是超出了我現在的水平了,也就是說我存在知識斷層了,整體感覺真的是一知半解。 那有同學會問了,那你能說明白嗎? 我理解的肯定能呀,來往下看! Flask的使用 1、消息閃現的 ...
  • rust中的枚舉有什麼用?枚舉可以嵌入類型的好處是什麼 你可以在同一個枚舉中既有單個值,也有元組或結構體。 枚舉的每個變體可以擁有不同數量和類型的關聯數據。 這增加了類型的靈活性和表達力,使你能夠更精確地建模你的數據。 我知道rust可以為枚舉創建方法,那在哪種場景下枚舉會比結構體會有優勢 表示多個 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 微服務架構已經成為搭建高效、可擴展系統的關鍵技術之一,然而,現有許多微服務框架往往過於複雜,使得我們普通開發者難以快速上手並體驗到微服務帶了的便利。為瞭解決這一問題,於是作者精心打造了一款最接地氣的 .NET 微服務框架,幫助我們輕鬆構建和管理微服務應用。 本框架不僅支持 Consul 服務註 ...
  • 先看一下效果吧: 如果不會寫動畫或者懶得寫動畫,就直接交給Blend來做吧; 其實Blend操作起來很簡單,有點類似於在操作PS,我們只需要設置關鍵幀,滑鼠點來點去就可以了,Blend會自動幫我們生成我們想要的動畫效果. 第一步:要創建一個空的WPF項目 第二步:右鍵我們的項目,在最下方有一個,在B ...
  • Prism:框架介紹與安裝 什麼是Prism? Prism是一個用於在 WPF、Xamarin Form、Uno 平臺和 WinUI 中構建鬆散耦合、可維護和可測試的 XAML 應用程式框架 Github https://github.com/PrismLibrary/Prism NuGet htt ...
  • 在WPF中,屏幕上的所有內容,都是通過畫筆(Brush)畫上去的。如按鈕的背景色,邊框,文本框的前景和形狀填充。藉助畫筆,可以繪製頁面上的所有UI對象。不同畫筆具有不同類型的輸出( 如:某些畫筆使用純色繪製區域,其他畫筆使用漸變、圖案、圖像或繪圖)。 ...
  • 前言 嗨,大家好!推薦一個基於 .NET 8 的高併發微服務電商系統,涵蓋了商品、訂單、會員、服務、財務等50多種實用功能。 項目不僅使用了 .NET 8 的最新特性,還集成了AutoFac、DotLiquid、HangFire、Nlog、Jwt、LayUIAdmin、SqlSugar、MySQL、 ...
  • 本文主要介紹攝像頭(相機)如何採集數據,用於類似攝像頭本地顯示軟體,以及流媒體數據傳輸場景如傳屏、視訊會議等。 攝像頭採集有多種方案,如AForge.NET、WPFMediaKit、OpenCvSharp、EmguCv、DirectShow.NET、MediaCaptre(UWP),網上一些文章以及 ...
  • 前言 Seal-Report 是一款.NET 開源報表工具,擁有 1.4K Star。它提供了一個完整的框架,使用 C# 編寫,最新的版本採用的是 .NET 8.0 。 它能夠高效地從各種資料庫或 NoSQL 數據源生成日常報表,並支持執行複雜的報表任務。 其簡單易用的安裝過程和直觀的設計界面,我們 ...
  • 背景需求: 系統需要對接到XXX官方的API,但因此官方對接以及管理都十分嚴格。而本人部門的系統中包含諸多子系統,系統間為了穩定,程式間多數固定Token+特殊驗證進行調用,且後期還要提供給其他兄弟部門系統共同調用。 原則上:每套系統都必須單獨接入到官方,但官方的接入複雜,還要官方指定機構認證的證書 ...
  • 本文介紹下電腦設備關機的情況下如何通過網路喚醒設備,之前電源S狀態 電腦Power電源狀態- 唐宋元明清2188 - 博客園 (cnblogs.com) 有介紹過遠程喚醒設備,後面這倆天瞭解多了點所以單獨加個隨筆 設備關機的情況下,使用網路喚醒的前提條件: 1. 被喚醒設備需要支持這WakeOnL ...
  • 前言 大家好,推薦一個.NET 8.0 為核心,結合前端 Vue 框架,實現了前後端完全分離的設計理念。它不僅提供了強大的基礎功能支持,如許可權管理、代碼生成器等,還通過採用主流技術和最佳實踐,顯著降低了開發難度,加快了項目交付速度。 如果你需要一個高效的開發解決方案,本框架能幫助大家輕鬆應對挑戰,實 ...