go併發 - channel

来源:https://www.cnblogs.com/asdfzxv/archive/2023/11/19/17841629.html
-Advertisement-
Play Games

公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 在這篇文章中,我們將通過示例來學習 Java 函數式介面。 函數式介面的特點 只包含一個抽象方法的介面稱為函數式介面。 它可以有任意數量的預設靜態方法,但只能包含一個抽象方法。它還可以聲明對象類的方法。 函數介面也稱為單一 ...


概述

併發編程是利用多核心能力,提升程式性能,而多線程之間需要相互協作、共用資源、線程安全等。任何併發模型都要解決線程間通訊問題,毫不誇張的說線程通訊是併發編程的主要問題。go使用著名的CSP(Communicating Sequential Process,通訊順序進程)併發模型,從設計之初 Go 語言就註重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程式員專註於分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。channel是線程簡通訊的具體實現之一,本質就是一個線程安全的 FIFO 阻塞隊列(先進先出),向隊列中寫入數據,在另一個線程從隊列讀取數據。很多語言都有類似實現,比如 Java 的線程池任務隊列。

基本使用

通道是引用類型,需要使用 make 創建,格式如下

通道實例 := make(chan 數據類型, 通道長度)
  • 數據類型:通道內傳輸的元素類型,可以基本數據類型,也可以使自定義數據類型。
  • 通道實例:通過make創建的通道句柄,與函數名稱一樣,指向通道的記憶體首地址。
  • 通道長度:通道本質是隊列,創建時候可指定長度,預設為0

創建通道

ch1 := make(chan int)                 // 創建一個整型類型的通道
ch2 := make(chan interface{})         // 創建一個空介面類型的通道, 可以存放任意格式
ch3 := make(chan *Equip)             // 創建Equip指針類型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10)         // 創建Equip指針類型的通道, 並指定隊列長度

通道本質就是線程安全的隊列,創建時候可以指定隊列長度,預設為0。

向通道寫入數據,使用語法非常形象,寫入channel <-,讀取<-channel

ch2 := make(chan interface{}, 10)
ch2<- 10			// 向隊列寫入
n := <-ch2 			// 從隊列讀取
fmt.Println(n)		// 10

箭頭語法雖然很形象,但是有些奇怪,也不利於擴展。使用函數方式感覺更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,擴展性也更強,通過參數可增加超時、同步、非同步等技能。

箭頭符號並沒有規定位置,與C指針一樣,如下兩個語句等效

ch1 := make(chan int)
i := <-ch1			
i := <- ch1

箭頭語法的讀寫有相對性,可讀性一般,有時候無法分辨是讀或寫,看起來很奇怪,如下偽代碼

func main() {
	input := make(chan int, 2)
	output := make(chan int, 2)

	go func() {
		input <- 10
	}()
	output<- <-input
	fmt.Println(<-output)
}

管道是用於協程之間通訊,主流使用方式如下

ch2 := make(chan interface{}, 10)

go func() {
    data := <-ch2			// 用戶協程讀取
    fmt.Println(data)
}()
     
ch2 <- "hello"				// 主協程寫入
time.Sleep(time.Second)

管道也支持遍歷,與箭頭符號一樣,無數據時候迴圈將被阻塞,迴圈永遠不會結束,除非關閉管道

chanInt := make(chan int, 10)

for chanInt, ok := range chanInts {
    fmt.Println(chanInt)
}

管道也支持關閉,關閉後的管道不允許寫入,panic 異常

chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1		// panic: send on closed channel

讀取則不同,已有數據可繼續讀取,無數據時返回false,不阻塞

if value, ok := <-chanInts; ok {			// 從管道讀取數據不在阻塞
    fmt.Println("從管讀取=", value)
} else {
    fmt.Println("從管道讀取失敗", ok)
    return
}

單向管道

管道也支持單向模式,僅允許讀取、或者寫入

var queue <-chan string = make(chan string)

函數形參也可以定義定向管道

func customer(channel <-chan string) {		// 形參為只讀管道
    for {		
        message := <-channel				// 只允許讀取數據
        fmt.Println(message)
    }
}
channel := make(chan string)
go customer(channel)

管道阻塞

Go管道的讀寫都是同步模式,當管道容量還有空間,則寫入成功,否則將阻塞直到寫入成功。從管道讀取也一樣,有數據直接讀取,否則將阻塞直到讀取成功。

var done = make(chan bool)

func aGoroutine() {
    fmt.Println("hello")
    done <- true			// 寫管道
}

func main() {
    go aGoroutine()
    <-done					// 讀阻塞
}

主協程從管道讀取數據時將被阻塞,直到用戶協程寫入數據。管道非常適合用於生產者消費者模式,需要平滑兩者的性能差異,可通過管道容量實現緩衝,所以除非特定場景,都建議管道容量大於零。

有些場景可以使用管道控制線程併發數

// 待補充

阻塞特性也帶來了些問題,程式無法控制超時(箭頭函數語法的後遺症),go 也提供瞭解決方案, 使用select關鍵,與網路編程的select函數類似,監測多個通道是否可讀狀態,都可讀隨機選擇一個,都不可讀進入Default分支,否則阻塞

select {
    case n := <-input:
        fmt.Println(n)
    case m := <-output:
        fmt.Println(m)
    default:
        fmt.Println("default")
}

當然也可以使用select向管道寫入數據,只要不關閉管道總是可寫入,此時加入default分支永遠不會被執行到,如下隨機石頭剪刀布

ch := make(chan string)
go func() {
    for {
        select {
            case ch <- "石頭":
            case ch <- "剪刀":
            case ch <- "布":
        }
    }
}()

for value := range ch {
    log.Println(value)
    time.Sleep(time.Second)
}

模擬線程池

由於go的管道非常輕量且簡潔,大部分直接使用,封裝線程池模式並不常見。案例僅作為功能演示,非常簡單幾十行代碼即可實現線程池的基本功能,體現了go併發模型的簡潔、高效。

type Runnable interface {
	Start()
}

// 線程池對象
type ThreadPool struct {
	queueSize int
	workSize  int
	channel   chan Runnable
	wait      sync.WaitGroup
}

// 工作線程, 執行非同步任務
func (pool *ThreadPool) doWorker(name string) {
	log.Printf("%s 啟動工作協程", name)
	for true {
		if runnable, ok := <-pool.channel; ok {
			log.Printf("%s 獲取任務, %v\n", name, runnable)
			runnable.Start()
			log.Printf("%s 任務執行成功, %v\n", name, runnable)
		} else {
			log.Printf("%s 線程池關閉, 退出工作協程\n", name)
			pool.wait.Done()
			return
		}
	}
}

// 啟動工作線程
func (pool *ThreadPool) worker() {
	pool.wait.Add(pool.workSize)
	for i := 0; i < pool.workSize; i++ {
		go pool.doWorker(fmt.Sprintf("work-%d", i))
	}
}

// Submit 提交任務
func (pool *ThreadPool) Submit(task Runnable) bool {
	defer func() { recover() }()
	pool.channel <- task
	return true
}

// Close 關閉線程池
func (pool *ThreadPool) Close() {
	defer func() { recover() }()
	close(pool.channel)
}

// Wait 等待線程池任務完成
func (pool *ThreadPool) Wait() {
	pool.Close()
	pool.wait.Wait()
}

// NewThreadPool 工廠函數,創建線程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
	pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
	pool.worker()
	return pool
}

使用線程池

type person struct {
	name string
}

func (p *person) Start() {
	fmt.Println(p.name)
}

func main() {
	threadPool := executor.NewThreadPool(10, 2)		// 創建線程池, 隊列長度10, 工作線程2

	for i := 0; i < 5; i++ {
		threadPool.Submit(&person{name: "xx"})		// 提交十個任務
	}
        
	threadPool.Wait()								// 阻塞等待所有任務完成
}

模擬管道

任何線程之間的通訊都依賴底層鎖機制,channel是對鎖機制封裝後的實現對象,與Java中線程池任務隊列機制幾乎一樣,但要簡潔很多。使用切片簡單模擬
介面聲明

type Queue interface {
	// Put 向隊列添加任務, 添加成功返回true, 添加失敗返回false, 隊列滿了則阻塞直到添加成功
	Put(task interface{}) bool

	// Get 從隊列獲取任務, 一直阻塞直到獲取任務, 隊列關閉且沒有任務則返回false
	Get() (interface{}, bool)

	// Size 查看隊列中的任務數
	Size() int

	// Close 關閉隊列, 關閉後將無法添加任務, 已有的任務可以繼續獲取
	Close()
}

基於切片實現

// SliceQueue 使用切片實現, 自動擴容屬性隊列永遠都不會滿, 擴容時候會觸發數據複製, 性能一般
type SliceQueue struct {
	sync.Mutex
	cond  *sync.Cond
	queue []interface{}
	close atomic.Bool
}

func (q *SliceQueue) Get() (data interface{}, ok bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if len(q.queue) == 0 {
			if q.close.Load() == true {
				return nil, false
			}
			q.cond.Wait()
		}
		if data := q.doGet(); data != nil {
			return data, true
		}
	}
	return
}

func (q *SliceQueue) doGet() interface{} {
	if len(q.queue) >= 1 {
		data := q.queue[0]
		q.queue = q.queue[1:]
		return data
	}
	return nil
}

func (q *SliceQueue) Put(task interface{}) bool {
	q.Lock()
	defer func() {
		q.cond.Signal()
		q.Unlock()
	}()

	if q.close.Load() == true {
		return false
	}
	q.queue = append(q.queue, task)
	return true
}

func (q *SliceQueue) Size() int {
	return len(q.queue)
}

func (q *SliceQueue) Close() {
	if q.close.Load() == true {
		return
	}

	q.Lock()
	defer q.Unlock()

	q.close.Store(true)
	q.cond.Broadcast()
}

func NewSliceQueue() Queue {
	sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
	sliceQueue.cond = sync.NewCond(sliceQueue)
	return sliceQueue
}

基於環行數組實現

type ArrayQueue struct {
	sync.Mutex
	readCond     *sync.Cond
	writeCond    *sync.Cond
	readIndex    int
	writeIndex   int
	queueMaxSize int
	close        atomic.Bool
	queue        []interface{}
}

func (q *ArrayQueue) Put(task interface{}) bool {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.close.Load() == true {
			return false
		}
		if q.IsFull() {
			q.writeCond.Wait()
			if q.IsFull() {
				continue
			}
		}
		q.queue[q.writeIndex] = task
		q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
		q.readCond.Signal()
		return true
	}
	return true
}

func (q *ArrayQueue) Get() (interface{}, bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.IsEmpty() {
			if q.close.Load() == true {
				return nil, false
			}
			q.readCond.Wait()
			if q.IsEmpty() {
				continue
			}
		}
		task := q.queue[q.readIndex]
		q.readIndex = (q.readIndex + 1) % q.queueMaxSize
		q.writeCond.Signal()
		return task, true
	}
	return nil, true
}

func (q *ArrayQueue) Size() int {
	return q.queueMaxSize
}

func (q *ArrayQueue) Close() {
	if q.close.Load() == true {
		return
	}
	q.Lock()
	q.Unlock()
	q.close.Store(true)
	q.readCond.Broadcast()
}

func (q *ArrayQueue) IsFull() bool {
	return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}

func (q *ArrayQueue) IsEmpty() bool {
	return q.readIndex == q.writeIndex
}

func NewArrayQueue(size int) Queue {
	queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
	queue.readCond = sync.NewCond(queue)
	queue.writeCond = sync.NewCond(queue)
	return queue
}

測試用例

func TestWith(t *testing.T) {
	q := NewSliceQueue()
	go func() {
		time.Sleep(time.Second * 2)
		q.Put(true)  // 向隊列寫入數據, 與 chan<- 功能相同
	}()

	q.Get()			// 阻塞直到讀取數據, 與 <-chan 功能相同
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • python 版本:3.6 win32 版本(因為一些特殊原因必須使用3.6) pymssql 版本:2.2.0 連接資料庫: import pymssql** def InitMssql(self): try: host = self.IniConfig.get('default','dbhost ...
  • 學習完基礎的圖像演算法,開始接觸OpenCV學習: 灰度圖中,一個像素點上的灰度級需要一個位元組(byte,2^8,8 bit)進行存儲,此時的灰度圖是二維的。而當我們需要轉換為彩色圖時,即三維,便會產生顏色通道(Channel),這個時候,一個像素點上的灰度級便會需要三個位元組來進行存儲。 可以藉助笛卡 ...
  • 本文只發佈於利用OpenCV實現尺度不變性與角度不變性的特征找圖演算法和知乎 一般來說,利用OpenCV實現找圖功能,用的比較多的是模板匹配(matchTemplate)。筆者比較喜歡裡面的NCC演算法。但是模板有個很明顯的短板,面對尺度改變,角度改變的目標就無能為力了。因此本文旨在做到模板匹配做不到的 ...
  • 目錄: Redis是什麼? Redis優缺點? Redis為什麼這麼快? 講講Redis的線程模型? Redis應用場景有哪些? Memcached和Redis的區別? 為什麼要用 Redis 而不用 map/guava 做緩存? Redis 數據類型有哪些? SortedSet和List異同點? ...
  • 14.1、概述 在實際工作中,一般使用配置類和註解代替web.xml和SpringMVC配置文件的功能; 在 Servlet3.0 環境中,容器會在類路徑中查找實現了 javax.servlet.ServletContainerInitializer 介面的類, 如果找到了的話,就會用它來配置 Se ...
  • int i=1; i=i++; int j=i++; int k=i + ++i * i++; System.out.println("i="+i); System.out.println("j="+j); System.out.println("k="+k); ...
  • 寫在前面 今天狀態很不好,我發現學這部分知識的時候,會出現溜號或者註意力無法集中的情況。 我能想到的是,大概率是這部分知識,應該是超出了我現在的水平了,也就是說我存在知識斷層了,整體感覺真的是一知半解。 那有同學會問了,那你能說明白嗎? 我理解的肯定能呀,來往下看! Flask的使用 1、消息閃現的 ...
  • rust中的枚舉有什麼用?枚舉可以嵌入類型的好處是什麼 你可以在同一個枚舉中既有單個值,也有元組或結構體。 枚舉的每個變體可以擁有不同數量和類型的關聯數據。 這增加了類型的靈活性和表達力,使你能夠更精確地建模你的數據。 我知道rust可以為枚舉創建方法,那在哪種場景下枚舉會比結構體會有優勢 表示多個 ...
一周排行
    -Advertisement-
    Play Games
  • 當使用Autofac處理一個介面有多個實現的情況時,通常會使用鍵(key)進行區分或者通過IIndex索引註入,也可以通過IEnumerable集合獲取所有實例,以下是一個具體的例子,演示如何在Autofac中註冊多個實現,並通過構造函數註入獲取指定實現。 首先,確保你已經安裝了Autofac Nu ...
  • 本篇將分享Prometheus+Grafana的監控平臺搭建,並監控之前文章所搭建的主機&服務,分享日常使用的一些使用經驗本篇將配置常用服務的監控與面板配置:包括 MySQL,MongoDB,CLickHouse,Redis,RabbitMQ,Linux,Windows,Nginx,站點訪問監控,已... ...
  • 使用Aspirate可以將Aspire程式部署到Kubernetes 集群 工具安裝 dotnet tool install -g aspirate --prerelease 註意:Aspirate 正在開發中,該軟體包將作為預覽版進行版本控制,--prelease 選項將獲得最新的預覽版。 容器註 ...
  • 前言 本文要說的這種開發模式,這種模式並不是只有blazor支持,js中有一樣的方案next.js nuxt.js;blazor還有很多其它內容,本文近關註漸進式開發模式。 是的,前後端是主流,不過以下情況也許前後端分離並不是最好的選擇: 小公司,人員不多,利潤不高,創業階段能省則省 個人開發者,接 ...
  • 在.NET中,Microsoft.Extensions.Logging是一個靈活的日誌庫,它允許你將日誌信息記錄到各種不同的目標,包括資料庫。在這個示例中,我將詳細介紹如何使用Microsoft.Extensions.Logging將日誌保存到MySQL資料庫。我們將使用Entity Framewo ...
  • chatgpt介面開發筆記3: 語音識別介面 1.文本轉語音 1、瞭解介面參數 介面地址: POST https://api.openai.com/v1/audio/speech 下麵是介面文檔描述內容: 參數: { "model": "tts-1", "input": "你好,我是饒坤,我是ter ...
  • 前面兩篇文章主要是介紹瞭如何解決高併發情況下資源爭奪的問題。但是現實的應用場景中除了要解決資源爭奪問題,高併發的情況還需要解決更多問題,比如快速處理業務數據等, 本篇文章簡要羅列一下與之相關的更多技術細節。 1、非同步編程:使用async和await關鍵字進行非同步編程,這可以避免阻塞線程,提高程式的響 ...
  • 大家好,我是棧長。 Nacos 2.3.0 前幾天正式發佈了,新增了不少實用性的新功能,真是史上最強版本。 Nacos 2.3.0 還真是一個比較重要的大版本,因為它涉及了太多重大更新,今天棧長給大家來解讀下。 Nacos 先掃個盲: Nacos 一個用於構建雲原生應用的動態服務發現、配置管理和服務 ...
  • IDEA的遠程開發功能,可以將本地的編譯、構建、調試、運行等工作都放在遠程伺服器上執行,而本地僅運行客戶端軟體進行常規的開發操作即可,舊版本IDEA目前不支持該功能.,本例使用的是IDEA2023.2.5版本 下麵介紹如何在IDEA中設置遠程連接伺服器開發環境並結合Cpolar內網穿透工具實現無公網 ...
  • 本文解釋為啥會有響應式編程,為什麼它在開發者中不太受歡迎,以及引入 Java 虛擬線程後它可能最終會消失。 命令式風格編程一直深受開發者喜愛,如 if-then-else、while 迴圈、函數和代碼塊等結構使代碼易理解、調試,異常易追蹤。然而,像所有好的東西一樣,通常也有問題。這種編程風格導致線程 ...