Java開發者的Golang進修指南:從0->1帶你實現協程池

来源:https://www.cnblogs.com/guoxiaoyu/p/17953496
-Advertisement-
Play Games

當然,我寫的簡易版協程池還有很多可以優化的地方,比如可以實現動態擴容等功能。今天我們要簡單總結一下協程池的優勢,主要是為了降低資源開銷。協程池的好處在於可以重覆利用協程,避免頻繁創建和銷毀協程,從而減少系統開銷,提高系統性能。此外,協程池還可以提高響應速度,因為一旦接收到任務,可以立即執行,不需要等... ...


在Java編程中,為了降低開銷和優化程式的效率,我們常常使用線程池來管理線程的創建和銷毀,並儘量復用已創建的對象。這樣做不僅可以提高程式的運行效率,還能減少垃圾回收器對對象的回收次數。

在Golang中,我們知道協程(goroutine)由於其體積小且效率高,在高併發場景中扮演著重要的角色。然而,資源始終是有限的,即使你的記憶體很大,也會有一個限度。因此,協程池在某些情況下肯定也會有其獨特的適用場景。畢竟,世上並不存在所謂的銀彈,只有在特定情況下才能找到最優的解決方案。因此,在Golang中,我們仍然需要考慮使用協程池的情況,並根據具體場景來選擇最佳的解決方案。

今天,我們將從Java線程池的角度出發,手把手地帶你實現一個Golang協程池。如果你覺得有些困難,記住我們的宗旨是使用固定數量的協程來處理所有的任務。這樣做的好處是可以避免協程數量過多導致資源浪費,也能確保任務的有序執行。讓我們一起開始,一步步地構建這個Golang協程池吧!

協程池

首先,讓我們編寫一個簡單的多協程代碼,並逐步進行優化,最終實現一個簡化版的協程池。假如我們有10個任務需要處理。我們最簡單做法就是直接使用go關鍵字直接去生成相應的協程即可,比如這樣:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg  sync.WaitGroup
    wg.Add(5)
    for i := 1; i <= 5; i++ {
        go func(index int) {
            fmt.Printf("Goroutine %d started\n", index)
            time.Sleep(1 * time.Second)
            fmt.Printf("Goroutine %d finished\n", index)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println("All goroutines finished")
}

我們當前的用法非常簡單,無需專門維護goroutine容量大小。每當有一個任務出現,我們就創建一個goroutine來處理。現在,讓我們進一步優化這種用法。

優化版協程池

現在我們需要首先創建一個pool對象和一個專門用於運行協程的worker對象。如果你熟悉Java中線程池的源碼,對於worker這個名稱應該不會陌生。worker是一個專門用於線程復用的對象,而我們的worker同樣負責運行任務。

實體Job沒啥好說的,就是我們具體的任務。我們先來定義一個簡單的任務實體Job:

type Job struct {
    id  int
    num int
}

我們來看下主角Worker定義:

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模擬任務處理
            fmt.Println("jobs 模擬任務處理")
            time.Sleep(2 * time.Second) // 休眠2秒鐘
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

你可以看到,我的實體擁有一個channel。這個channel類似於我們想要獲取的任務隊列。與Java中使用鏈表形式並通過獨占鎖獲取共用鏈表這種臨界資源的方式不同,我選擇使用了Golang中的channel來進行通信。因為Golang更傾向於使用channel進行通信,而不是共用資源。所以,我選擇了使用channel。為了避免在添加任務時直接阻塞,我特意創建了一個帶有任務緩衝的channel。

在這裡,Start()方法是一個真正的協程,我會從channel中持續獲取任務,以保持這個協程不被銷毀,直到沒有任務可獲取為止。這個邏輯類似於Java中的線程池。

讓我們進一步深入地探討一下Pool池的定義:

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

他的定義可能有點複雜,但是我可以用簡單的語言向你解釋,就是那些東西,只是用Golang的寫法來實現,與Java的實現方式類似。

首先,我定義了一個名為worker的數組,用於存儲當前存在的worker數量。這裡並沒有實現核心和非核心worker的區分。另外,我還創建了一個獨立的channel,用於保存可緩衝任務的大小。這些參數在初始化時是必須提供的。

Start方法的主要目的是啟動worker開始執行任務。首先,它會啟動一個核心協程,等待任務被派發。然後,dispatchJobs方法會開始監聽channel是否有任務,如果有任務,則會將任務分派給worker進行處理。在整個過程中,通過channel進行通信,沒有使用鏈表或其他共用資源實體。需要註意的是,dispatchJobs方法在另一個協程中被調用,這是為了避免阻塞主線程。

getLeastBusyWorker方法是用來獲取阻塞任務最少的worker的。這個方法的主要目標是保持任務平均分配。而AddJob方法則是用來直接向channel中添加job任務的,這個方法比較簡單,不需要過多解釋。

協程池最終實現

image

經過一系列的反覆修改和優化,我們終於成功實現了一個功能完善且高效的Golang協程池。下麵是最終版本的代碼:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    id  int
    num int
}

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模擬任務處理
            fmt.Println("jobs 模擬任務處理")
            time.Sleep(2 * time.Second) // 休眠2秒鐘
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

func main() {
    pool := NewPool(3, 10)
    pool.Start()
    // 添加任務到協程池
    for i := 1; i <= 15; i++ {
        pool.AddJob(Job{
            id:  i,
            num: i,
        })
    }

    // 等待所有任務完成
    pool.wg.Wait()
    close(pool.jobChan)
    fmt.Println("All jobs finished")
}

三方協程池

在這種場景下,既然已經有一個存在的場景,那麼顯然輪子是肯定有的。不論使用哪種編程語言,我們都可以探索一下,以下是Golang語言中關於三方協程池的實現工具。

ants: ants是一個高性能的協程池實現,支持動態調整協程池的大小,可以通過簡單的API調用來將任務提交給協程池進行執行。官方地址:https://github.com/panjf2000/ants

gopool:gopool 是一個高性能的 goroutine 池,旨在重用 goroutine 並限制 goroutine 的數量。官方地址:https://github.com/bytedance/gopkg/tree/develop/util/gopool

這些庫都提供了簡單易用的API,可以方便地創建和管理協程池,同時也支持動態調整協程池的大小,以滿足不同場景下的需求。

總結

當然,我寫的簡易版協程池還有很多可以優化的地方,比如可以實現動態擴容等功能。今天我們要簡單總結一下協程池的優勢,主要是為了降低資源開銷。協程池的好處在於可以重覆利用協程,避免頻繁創建和銷毀協程,從而減少系統開銷,提高系統性能。此外,協程池還可以提高響應速度,因為一旦接收到任務,可以立即執行,不需要等待協程創建的時間。另外,協程池還具有增強可管理性的優點,可以對協程進行集中調度和統一管理,方便進行性能調優。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 下麵介紹三種用於進行排序的專用視窗函數: 1、RANK() 在計算排序時,若存在相同位次,會跳過之後的位次。 例如,有3條排在第1位時,排序為:1,1,1,4······ 2、DENSE_RANK() 這就是題目中所用到的函數,在計算排序時,若存在相同位次,不會跳過之後的位次。 例如,有3條排在第1 ...
  • 在近日舉辦的鴻蒙生態千帆啟航儀式上,華為常務董事、終端BG CEO餘承東表示,鴻蒙生態設備已經增至8億 ,將打開萬億產業新藍海。 在本次論壇上,華為宣佈HarmonyOS NEXT鴻蒙星河版(開發者預覽版)已面向開發者開放申請。該版本,能夠實現原生精緻、原生易用、原生流暢、原生安全、原生智能、原生互 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 依賴管理解決了在軟體開發過程中管理和協調各種依賴項的問題,簡化了開發流程,提高了項目的可靠性、可維護性和可重覆性。它們幫助開發人員更高效地管理項目的依賴關係,減少了潛在的錯誤和衝突,並提供了更好的開發體驗。 常用的依賴管理 在 JavaS ...
  • 最近,有群里在群里發了這麼一個非常有意思的卡片 Hover 動效,來源於此網站 -- key-drop,效果如下: 非常有意思酷炫的效果。而本文,我們不會完全還原此效果,而是基於此效果,嘗試去製作這麼一個類似的卡片交互效果: 該效果的幾個核心點: 卡片的 3D 旋轉跟隨滑鼠移動效果 如何讓卡片在 H ...
  • 一、定義 定義一個操作中演算法的框架,而將一些步驟延遲到子類中。模板方法模式使得子類不改變一個演算法的結構即可重定義該演算法的特定步驟。模板方法是一種類行為型模式 二、描述 模板方法模式結構比較簡單,其核心是抽象類和其中的模板方法的設計,包含以下兩個角色: 1、AbstractClass(抽象類):在抽象 ...
  • 本文在原文基礎上有刪減,原文參考泛型、Trait 和生命周期。 目錄泛型數據類型在函數定義中使用泛型結構體定義中的泛型枚舉定義中的泛型方法定義中的泛型泛型代碼的性能Trait:定義共同行為定義 trait為類型實現 trait預設實現trait 作為參數Trait Bound 語法通過 + 指定多個 ...
  • 一、pom.xml需要引入的依賴二、項目開啟熔斷器開關 2.1 註解方式 2.2 xml方式三、依賴類缺失問題四、版本匹配安全檢查問題五、測試驗證六、結論 一、pom.xml需要引入的依賴 pom.xml <!-- springboot升級到2.6.7,同樣適用於2.7.0,2.7.18等 --> ...
  • 看到標題大家可能會有點疑惑吧:OpenFeign 不是挺好用的嗎?尤其是微服務之間的遠程調用,平時用的也挺習慣的,為啥要替換呢? ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...