公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 在這篇文章中,我們將通過示例來學習 Java 函數式介面。 函數式介面的特點 只包含一個抽象方法的介面稱為函數式介面。 它可以有任意數量的預設靜態方法,但只能包含一個抽象方法。它還可以聲明對象類的方法。 函數介面也稱為單一 ...
概述
併發編程是利用多核心能力,提升程式性能,而多線程之間需要相互協作、共用資源、線程安全等。任何併發模型都要解決線程間通訊問題,毫不誇張的說線程通訊是併發編程的主要問題。go
使用著名的CSP(Communicating Sequential Process,通訊順序進程)併發模型,從設計之初 Go 語言就註重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程式員專註於分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。channel是線程簡通訊的具體實現之一,本質就是一個線程安全的 FIFO
阻塞隊列(先進先出),向隊列中寫入數據,在另一個線程從隊列讀取數據。很多語言都有類似實現,比如 Java 的線程池任務隊列。
基本使用
通道是引用類型,需要使用 make 創建,格式如下
通道實例 := make(chan 數據類型, 通道長度)
- 數據類型:通道內傳輸的元素類型,可以基本數據類型,也可以使自定義數據類型。
- 通道實例:通過make創建的通道句柄,與函數名稱一樣,指向通道的記憶體首地址。
- 通道長度:通道本質是隊列,創建時候可指定長度,預設為0
創建通道
ch1 := make(chan int) // 創建一個整型類型的通道
ch2 := make(chan interface{}) // 創建一個空介面類型的通道, 可以存放任意格式
ch3 := make(chan *Equip) // 創建Equip指針類型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10) // 創建Equip指針類型的通道, 並指定隊列長度
通道本質就是線程安全的隊列,創建時候可以指定隊列長度,預設為0。
向通道寫入數據,使用語法非常形象,寫入channel <-
,讀取<-channel
ch2 := make(chan interface{}, 10)
ch2<- 10 // 向隊列寫入
n := <-ch2 // 從隊列讀取
fmt.Println(n) // 10
箭頭語法雖然很形象,但是有些奇怪,也不利於擴展。使用函數方式感覺更好,也更主流,如func (p *chan) get() any
,func (p *chan) put(any) err
,擴展性也更強,通過參數可增加超時、同步、非同步等技能。
箭頭符號並沒有規定位置,與C
指針一樣,如下兩個語句等效
ch1 := make(chan int)
i := <-ch1
i := <- ch1
箭頭語法的讀寫有相對性,可讀性一般,有時候無法分辨是讀或寫,看起來很奇怪,如下偽代碼
func main() {
input := make(chan int, 2)
output := make(chan int, 2)
go func() {
input <- 10
}()
output<- <-input
fmt.Println(<-output)
}
管道是用於協程之間通訊,主流使用方式如下
ch2 := make(chan interface{}, 10)
go func() {
data := <-ch2 // 用戶協程讀取
fmt.Println(data)
}()
ch2 <- "hello" // 主協程寫入
time.Sleep(time.Second)
管道也支持遍歷,與箭頭符號一樣,無數據時候迴圈將被阻塞,迴圈永遠不會結束,除非關閉管道
chanInt := make(chan int, 10)
for chanInt, ok := range chanInts {
fmt.Println(chanInt)
}
管道也支持關閉,關閉後的管道不允許寫入,panic 異常
chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1 // panic: send on closed channel
讀取則不同,已有數據可繼續讀取,無數據時返回false
,不阻塞
if value, ok := <-chanInts; ok { // 從管道讀取數據不在阻塞
fmt.Println("從管讀取=", value)
} else {
fmt.Println("從管道讀取失敗", ok)
return
}
單向管道
管道也支持單向模式,僅允許讀取、或者寫入
var queue <-chan string = make(chan string)
函數形參也可以定義定向管道
func customer(channel <-chan string) { // 形參為只讀管道
for {
message := <-channel // 只允許讀取數據
fmt.Println(message)
}
}
channel := make(chan string)
go customer(channel)
管道阻塞
Go
管道的讀寫都是同步模式,當管道容量還有空間,則寫入成功,否則將阻塞直到寫入成功。從管道讀取也一樣,有數據直接讀取,否則將阻塞直到讀取成功。
var done = make(chan bool)
func aGoroutine() {
fmt.Println("hello")
done <- true // 寫管道
}
func main() {
go aGoroutine()
<-done // 讀阻塞
}
主協程從管道讀取數據時將被阻塞,直到用戶協程寫入數據。管道非常適合用於生產者消費者模式,需要平滑兩者的性能差異,可通過管道容量實現緩衝,所以除非特定場景,都建議管道容量大於零。
有些場景可以使用管道控制線程併發數
// 待補充
阻塞特性也帶來了些問題,程式無法控制超時(箭頭函數語法的後遺症),go 也提供瞭解決方案, 使用select
關鍵,與網路編程的select
函數類似,監測多個通道是否可讀狀態,都可讀隨機選擇一個,都不可讀進入Default
分支,否則阻塞
select {
case n := <-input:
fmt.Println(n)
case m := <-output:
fmt.Println(m)
default:
fmt.Println("default")
}
當然也可以使用select
向管道寫入數據,只要不關閉管道總是可寫入,此時加入default
分支永遠不會被執行到,如下隨機石頭剪刀布
ch := make(chan string)
go func() {
for {
select {
case ch <- "石頭":
case ch <- "剪刀":
case ch <- "布":
}
}
}()
for value := range ch {
log.Println(value)
time.Sleep(time.Second)
}
模擬線程池
由於go的管道非常輕量且簡潔,大部分直接使用,封裝線程池模式並不常見。案例僅作為功能演示,非常簡單幾十行代碼即可實現線程池的基本功能,體現了go併發模型的簡潔、高效。
type Runnable interface {
Start()
}
// 線程池對象
type ThreadPool struct {
queueSize int
workSize int
channel chan Runnable
wait sync.WaitGroup
}
// 工作線程, 執行非同步任務
func (pool *ThreadPool) doWorker(name string) {
log.Printf("%s 啟動工作協程", name)
for true {
if runnable, ok := <-pool.channel; ok {
log.Printf("%s 獲取任務, %v\n", name, runnable)
runnable.Start()
log.Printf("%s 任務執行成功, %v\n", name, runnable)
} else {
log.Printf("%s 線程池關閉, 退出工作協程\n", name)
pool.wait.Done()
return
}
}
}
// 啟動工作線程
func (pool *ThreadPool) worker() {
pool.wait.Add(pool.workSize)
for i := 0; i < pool.workSize; i++ {
go pool.doWorker(fmt.Sprintf("work-%d", i))
}
}
// Submit 提交任務
func (pool *ThreadPool) Submit(task Runnable) bool {
defer func() { recover() }()
pool.channel <- task
return true
}
// Close 關閉線程池
func (pool *ThreadPool) Close() {
defer func() { recover() }()
close(pool.channel)
}
// Wait 等待線程池任務完成
func (pool *ThreadPool) Wait() {
pool.Close()
pool.wait.Wait()
}
// NewThreadPool 工廠函數,創建線程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
pool.worker()
return pool
}
使用線程池
type person struct {
name string
}
func (p *person) Start() {
fmt.Println(p.name)
}
func main() {
threadPool := executor.NewThreadPool(10, 2) // 創建線程池, 隊列長度10, 工作線程2
for i := 0; i < 5; i++ {
threadPool.Submit(&person{name: "xx"}) // 提交十個任務
}
threadPool.Wait() // 阻塞等待所有任務完成
}
模擬管道
任何線程之間的通訊都依賴底層鎖機制,channel是對鎖機制封裝後的實現對象,與Java中線程池任務隊列機制幾乎一樣,但要簡潔很多。使用切片簡單模擬
介面聲明
type Queue interface {
// Put 向隊列添加任務, 添加成功返回true, 添加失敗返回false, 隊列滿了則阻塞直到添加成功
Put(task interface{}) bool
// Get 從隊列獲取任務, 一直阻塞直到獲取任務, 隊列關閉且沒有任務則返回false
Get() (interface{}, bool)
// Size 查看隊列中的任務數
Size() int
// Close 關閉隊列, 關閉後將無法添加任務, 已有的任務可以繼續獲取
Close()
}
基於切片實現
// SliceQueue 使用切片實現, 自動擴容屬性隊列永遠都不會滿, 擴容時候會觸發數據複製, 性能一般
type SliceQueue struct {
sync.Mutex
cond *sync.Cond
queue []interface{}
close atomic.Bool
}
func (q *SliceQueue) Get() (data interface{}, ok bool) {
q.Lock()
defer q.Unlock()
for true {
if len(q.queue) == 0 {
if q.close.Load() == true {
return nil, false
}
q.cond.Wait()
}
if data := q.doGet(); data != nil {
return data, true
}
}
return
}
func (q *SliceQueue) doGet() interface{} {
if len(q.queue) >= 1 {
data := q.queue[0]
q.queue = q.queue[1:]
return data
}
return nil
}
func (q *SliceQueue) Put(task interface{}) bool {
q.Lock()
defer func() {
q.cond.Signal()
q.Unlock()
}()
if q.close.Load() == true {
return false
}
q.queue = append(q.queue, task)
return true
}
func (q *SliceQueue) Size() int {
return len(q.queue)
}
func (q *SliceQueue) Close() {
if q.close.Load() == true {
return
}
q.Lock()
defer q.Unlock()
q.close.Store(true)
q.cond.Broadcast()
}
func NewSliceQueue() Queue {
sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
sliceQueue.cond = sync.NewCond(sliceQueue)
return sliceQueue
}
基於環行數組實現
type ArrayQueue struct {
sync.Mutex
readCond *sync.Cond
writeCond *sync.Cond
readIndex int
writeIndex int
queueMaxSize int
close atomic.Bool
queue []interface{}
}
func (q *ArrayQueue) Put(task interface{}) bool {
q.Lock()
defer q.Unlock()
for true {
if q.close.Load() == true {
return false
}
if q.IsFull() {
q.writeCond.Wait()
if q.IsFull() {
continue
}
}
q.queue[q.writeIndex] = task
q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
q.readCond.Signal()
return true
}
return true
}
func (q *ArrayQueue) Get() (interface{}, bool) {
q.Lock()
defer q.Unlock()
for true {
if q.IsEmpty() {
if q.close.Load() == true {
return nil, false
}
q.readCond.Wait()
if q.IsEmpty() {
continue
}
}
task := q.queue[q.readIndex]
q.readIndex = (q.readIndex + 1) % q.queueMaxSize
q.writeCond.Signal()
return task, true
}
return nil, true
}
func (q *ArrayQueue) Size() int {
return q.queueMaxSize
}
func (q *ArrayQueue) Close() {
if q.close.Load() == true {
return
}
q.Lock()
q.Unlock()
q.close.Store(true)
q.readCond.Broadcast()
}
func (q *ArrayQueue) IsFull() bool {
return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}
func (q *ArrayQueue) IsEmpty() bool {
return q.readIndex == q.writeIndex
}
func NewArrayQueue(size int) Queue {
queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
queue.readCond = sync.NewCond(queue)
queue.writeCond = sync.NewCond(queue)
return queue
}
測試用例
func TestWith(t *testing.T) {
q := NewSliceQueue()
go func() {
time.Sleep(time.Second * 2)
q.Put(true) // 向隊列寫入數據, 與 chan<- 功能相同
}()
q.Get() // 阻塞直到讀取數據, 與 <-chan 功能相同
}