當然,我寫的簡易版協程池還有很多可以優化的地方,比如可以實現動態擴容等功能。今天我們要簡單總結一下協程池的優勢,主要是為了降低資源開銷。協程池的好處在於可以重覆利用協程,避免頻繁創建和銷毀協程,從而減少系統開銷,提高系統性能。此外,協程池還可以提高響應速度,因為一旦接收到任務,可以立即執行,不需要等... ...
在Java編程中,為了降低開銷和優化程式的效率,我們常常使用線程池來管理線程的創建和銷毀,並儘量復用已創建的對象。這樣做不僅可以提高程式的運行效率,還能減少垃圾回收器對對象的回收次數。
在Golang中,我們知道協程(goroutine)由於其體積小且效率高,在高併發場景中扮演著重要的角色。然而,資源始終是有限的,即使你的記憶體很大,也會有一個限度。因此,協程池在某些情況下肯定也會有其獨特的適用場景。畢竟,世上並不存在所謂的銀彈,只有在特定情況下才能找到最優的解決方案。因此,在Golang中,我們仍然需要考慮使用協程池的情況,並根據具體場景來選擇最佳的解決方案。
今天,我們將從Java線程池的角度出發,手把手地帶你實現一個Golang協程池。如果你覺得有些困難,記住我們的宗旨是使用固定數量的協程來處理所有的任務。這樣做的好處是可以避免協程數量過多導致資源浪費,也能確保任務的有序執行。讓我們一起開始,一步步地構建這個Golang協程池吧!
協程池
首先,讓我們編寫一個簡單的多協程代碼,並逐步進行優化,最終實現一個簡化版的協程池。假如我們有10個任務需要處理。我們最簡單做法就是直接使用go
關鍵字直接去生成相應的協程即可,比如這樣:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 1; i <= 5; i++ {
go func(index int) {
fmt.Printf("Goroutine %d started\n", index)
time.Sleep(1 * time.Second)
fmt.Printf("Goroutine %d finished\n", index)
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("All goroutines finished")
}
我們當前的用法非常簡單,無需專門維護goroutine容量大小。每當有一個任務出現,我們就創建一個goroutine來處理。現在,讓我們進一步優化這種用法。
優化版協程池
現在我們需要首先創建一個pool對象和一個專門用於運行協程的worker對象。如果你熟悉Java中線程池的源碼,對於worker這個名稱應該不會陌生。worker是一個專門用於線程復用的對象,而我們的worker同樣負責運行任務。
實體Job沒啥好說的,就是我們具體的任務。我們先來定義一個簡單的任務實體Job:
type Job struct {
id int
num int
}
我們來看下主角Worker定義:
type Worker struct {
id int
jobChannel chan Job
wg *sync.WaitGroup
}
func NewWorker(id int, wga *sync.WaitGroup) *Worker {
return &Worker{
id: id,
jobChannel: make(chan Job, 1),
wg: wga,
}
}
func (w *Worker) Start() {
go func() {
for job := range w.jobChannel {
fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
// 模擬任務處理
fmt.Println("jobs 模擬任務處理")
time.Sleep(2 * time.Second) // 休眠2秒鐘
result := job.num * 2
fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
w.wg.Done()
}
}()
}
你可以看到,我的實體擁有一個channel。這個channel類似於我們想要獲取的任務隊列。與Java中使用鏈表形式並通過獨占鎖獲取共用鏈表這種臨界資源的方式不同,我選擇使用了Golang中的channel來進行通信。因為Golang更傾向於使用channel進行通信,而不是共用資源。所以,我選擇了使用channel。為了避免在添加任務時直接阻塞,我特意創建了一個帶有任務緩衝的channel。
在這裡,Start()
方法是一個真正的協程,我會從channel中持續獲取任務,以保持這個協程不被銷毀,直到沒有任務可獲取為止。這個邏輯類似於Java中的線程池。
讓我們進一步深入地探討一下Pool池的定義:
type Pool struct {
workers []*Worker
jobChan chan Job
wg sync.WaitGroup
}
func NewPool(numWorkers, jobQueueSize int) *Pool {
pl := &Pool{
workers: make([]*Worker, numWorkers),
jobChan: make(chan Job, jobQueueSize),
}
for i := 0; i < numWorkers; i++ {
pl.workers[i] = NewWorker(i+1, &pl.wg)
}
return pl
}
func (p *Pool) Start() {
for _, worker := range p.workers {
worker.Start()
}
go p.dispatchJobs()
}
func (p *Pool) dispatchJobs() {
for job := range p.jobChan {
worker := p.getLeastBusyWorker()
worker.jobChannel <- job
}
}
func (p *Pool) getLeastBusyWorker() *Worker {
leastBusy := p.workers[0]
for _, worker := range p.workers {
if len(worker.jobChannel) < len(leastBusy.jobChannel) {
leastBusy = worker
}
}
return leastBusy
}
func (p *Pool) AddJob(job Job) {
fmt.Println("jobs add")
p.wg.Add(1)
p.jobChan <- job
}
他的定義可能有點複雜,但是我可以用簡單的語言向你解釋,就是那些東西,只是用Golang的寫法來實現,與Java的實現方式類似。
首先,我定義了一個名為worker的數組,用於存儲當前存在的worker數量。這裡並沒有實現核心和非核心worker的區分。另外,我還創建了一個獨立的channel,用於保存可緩衝任務的大小。這些參數在初始化時是必須提供的。
Start
方法的主要目的是啟動worker開始執行任務。首先,它會啟動一個核心協程,等待任務被派發。然後,dispatchJobs方法會開始監聽channel是否有任務,如果有任務,則會將任務分派給worker進行處理。在整個過程中,通過channel進行通信,沒有使用鏈表或其他共用資源實體。需要註意的是,dispatchJobs方法在另一個協程中被調用,這是為了避免阻塞主線程。
getLeastBusyWorker
方法是用來獲取阻塞任務最少的worker的。這個方法的主要目標是保持任務平均分配。而AddJob
方法則是用來直接向channel中添加job任務的,這個方法比較簡單,不需要過多解釋。
協程池最終實現
經過一系列的反覆修改和優化,我們終於成功實現了一個功能完善且高效的Golang協程池。下麵是最終版本的代碼:
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
id int
num int
}
type Worker struct {
id int
jobChannel chan Job
wg *sync.WaitGroup
}
func NewWorker(id int, wga *sync.WaitGroup) *Worker {
return &Worker{
id: id,
jobChannel: make(chan Job, 1),
wg: wga,
}
}
func (w *Worker) Start() {
go func() {
for job := range w.jobChannel {
fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
// 模擬任務處理
fmt.Println("jobs 模擬任務處理")
time.Sleep(2 * time.Second) // 休眠2秒鐘
result := job.num * 2
fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
w.wg.Done()
}
}()
}
type Pool struct {
workers []*Worker
jobChan chan Job
wg sync.WaitGroup
}
func NewPool(numWorkers, jobQueueSize int) *Pool {
pl := &Pool{
workers: make([]*Worker, numWorkers),
jobChan: make(chan Job, jobQueueSize),
}
for i := 0; i < numWorkers; i++ {
pl.workers[i] = NewWorker(i+1, &pl.wg)
}
return pl
}
func (p *Pool) Start() {
for _, worker := range p.workers {
worker.Start()
}
go p.dispatchJobs()
}
func (p *Pool) dispatchJobs() {
for job := range p.jobChan {
worker := p.getLeastBusyWorker()
worker.jobChannel <- job
}
}
func (p *Pool) getLeastBusyWorker() *Worker {
leastBusy := p.workers[0]
for _, worker := range p.workers {
if len(worker.jobChannel) < len(leastBusy.jobChannel) {
leastBusy = worker
}
}
return leastBusy
}
func (p *Pool) AddJob(job Job) {
fmt.Println("jobs add")
p.wg.Add(1)
p.jobChan <- job
}
func main() {
pool := NewPool(3, 10)
pool.Start()
// 添加任務到協程池
for i := 1; i <= 15; i++ {
pool.AddJob(Job{
id: i,
num: i,
})
}
// 等待所有任務完成
pool.wg.Wait()
close(pool.jobChan)
fmt.Println("All jobs finished")
}
三方協程池
在這種場景下,既然已經有一個存在的場景,那麼顯然輪子是肯定有的。不論使用哪種編程語言,我們都可以探索一下,以下是Golang語言中關於三方協程池的實現工具。
ants
: ants是一個高性能的協程池實現,支持動態調整協程池的大小,可以通過簡單的API調用來將任務提交給協程池進行執行。官方地址:https://github.com/panjf2000/ants
gopool
:gopool 是一個高性能的 goroutine 池,旨在重用 goroutine 並限制 goroutine 的數量。官方地址:https://github.com/bytedance/gopkg/tree/develop/util/gopool
這些庫都提供了簡單易用的API,可以方便地創建和管理協程池,同時也支持動態調整協程池的大小,以滿足不同場景下的需求。
總結
當然,我寫的簡易版協程池還有很多可以優化的地方,比如可以實現動態擴容等功能。今天我們要簡單總結一下協程池的優勢,主要是為了降低資源開銷。協程池的好處在於可以重覆利用協程,避免頻繁創建和銷毀協程,從而減少系統開銷,提高系統性能。此外,協程池還可以提高響應速度,因為一旦接收到任務,可以立即執行,不需要等待協程創建的時間。另外,協程池還具有增強可管理性的優點,可以對協程進行集中調度和統一管理,方便進行性能調優。