協程併發下數據彙總:除了互斥鎖,還有其他方式嗎?

来源:https://www.cnblogs.com/chenjiazhan/archive/2023/05/21/17418276.html
-Advertisement-
Play Games

深入探討Python中的併發編程,特別關註多線程和多進程的應用。我們將先從基本概念開始,然後通過詳細舉例探討每一種機制,最後分享一些實戰經驗以及一種優雅的編程技巧。 ...


1. 簡介

本文介紹了在併發編程中數據彙總的問題,並探討了在併發環境下使用互斥鎖和通道兩種方式來保證數據安全性的方法。

首先,通過一個實例,描述了一個併發拉取數據並彙總的案例,並使用互斥鎖來確保線程安全。然後,討論了互斥鎖的一些缺點,引出了通道作為一種替代方案,並介紹了通道的基本使用和特性。接下來,通過實例演示瞭如何使用通道來實現併發下的數據彙總。

最後,引用了etcd中使用通道實現協程併發下數據彙總的例子,展示了通道在實際項目中的應用。

2. 問題引入

在請求處理過程中,經常需要通過RPC介面拉取數據。有時候,由於數據量較大,單個數據拉取操作可能會導致整個請求的處理時間較長。為了加快處理速度,我們通常考慮同時開啟多個協程併發地拉取數據。一旦多個協程併發拉取數據後,主協程需要彙總這些協程拉取到的數據,然後再返回結果。在這個過程中,往往涉及對共用資源的併發訪問,為了保證線程安全性,通常會使用互斥鎖。下麵通過一個簡單的代碼來展示該過程:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

var (
        // 彙總結果
        dataList []Data
        // 互斥鎖
        mutex    sync.Mutex
)

func fetchData(page int, wg *sync.WaitGroup) {
        // 模擬RPC介面拉取數據的耗時操作
        time.Sleep(time.Second)

        // 假設從RPC介面獲取到了一批數據
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page),
        }

        // 使用互斥鎖保護共用數據的併發訪問
        mutex.Lock()
        defer mutext.Unlock()
        dataList = append(dataList, data)

        wg.Done()
}

func main() {
        var wg sync.WaitGroup

        // 定義需要拉取的數據頁數
        numPages := 10

        // 啟動多個協程併發地拉取數據
        for i := 1; i <= numPages; i++ {
            wg.Add(1)
            go fetchData(i, &wg)
        }

        // 等待所有協程完成
        wg.Wait()

        // 列印拉取到的數據
        fmt.Println("Fetched data:")
        for _, data := range dataList {
            fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
        }
}

在上述示例中,我們定義了一個共用的dataList切片用於保存拉取到的數據。每個goroutine通過調用fetchData函數來模擬拉取數據的過程,並使用互斥鎖mutex保護dataList的併發訪問。主協程使用sync.WaitGroup等待所有協程完成數據拉取任務,然後列印出拉取到的數據。通過併發地拉取數據,並使用互斥鎖保證線程安全,我們可以顯著提高數據拉取的速度,並且確保數據的正確性和一致性。

回看上述實現,其實是涉及到了多個協程操作同一份數據,有可能導致線程安全的問題,然後這裡是通過互斥鎖來保證線程安全的。確實,使用互斥鎖是可以保證線程安全的,但是也是存在一些缺點的,比如競爭和阻塞,兩個協程同時競爭互斥鎖時,只有一個協程能夠獲得鎖,而其他協程則會被阻塞,這個就可能導致性能瓶頸,當然在這個場景下問題不大。其次就是代碼的複雜性提高了,使用互斥鎖需要仔細設計和管理,確保鎖的正確獲取和釋放。這增加了代碼的複雜性和維護成本,如果在代碼中處理鎖的方式不正確,可能會死鎖,導致程式無法繼續執行。

那我們其實就有疑問,在協程併發下數據彙總的場景,是否存在其他方式,不需要通過使用互斥鎖,也能夠保證線程安全呢? 其實還真有,Go語言中的channel非常適用於這種情況。通過使用通道,我們可以實現線程安全的數據共用和同步,而無需顯式地使用互斥鎖。下麵我們來瞭解一下channel

3. channel的使用

3.1 channel的基本介紹

3.1.1 基本說明

channel在Go語言中是一種特殊的數據結構,用於協程之間的通信和同步。它類似於一個先進先出(FIFO)的隊列,用於數據的傳輸和共用。在併發環境中,可以將數據發送到通道,也可以從通道中接收數據,而這兩個操作都是線程安全的。

使用channel的優勢在於它提供了內置的同步機制,無需顯式地使用互斥鎖來處理併發訪問。

當一個協程向通道發送數據時,如果通道已滿,發送操作會被阻塞,直到有其他協程從通道中接收數據釋放空間。同樣地,當一個協程從通道接收數據時,如果通道為空,接收操作也會被阻塞,直到有其他協程向通道發送數據。

同時,當多個協程同時訪問通道時,Go運行時系統會自動處理協程之間的同步和併發訪問的細節,保證數據的正確性和一致性。從而可以放心地在多個協程中使用通道進行數據的發送和接收操作,而不需要額外的鎖或同步機制來保證線程安全。

因此,使用channel其實是可以避免常見的併發問題,如競態條件和死鎖,簡化了併發編程的複雜性。

3.1.2 基本使用

通過上面對channel的基本介紹,我們已經對channel有了基本的瞭解,其實可以粗略理解其為一個併發安全的隊列。下麵來瞭解下channel的基本語法,從而能夠開始使用channel

channel基本操作分為創建channel,發送數據到channel,接收channel中的數據,以及關閉channel。下麵對其進行簡單展示:

創建channel,使用make函數創建通道,通道的類型可以根據需要選擇,例如intstring等:

ch := make(chan int)

發送數據到channel:使用<-操作符將數據發送到通道中

ch <- data

接收channel中的數據: 使用<-操作符從通道中接收數據

result := <-ch

關閉channel, 使用close函數關閉通道。關閉通道後,仍然可以從通道接收數據,但無法再向通道發送數據

close(ch)

通過上面channel的四個基本操作,便能夠實現在不同協程間線程安全得傳遞數據。最後通過一個例子,完整得展示channel的基本使用。

package main

import "fmt"

func main() {
        ch := make(chan string) // 創建字元串通道
        defer close(ch)
        go func() {
                ch <- "hello, channel!" // 發送數據到通道
        }()

        result := <-ch // 從通道接收數據
        fmt.Println(result)
}

在這個示例中,我們創建了一個字元串通道ch。然後,在一個單獨的協程中,我們向通道發送了字元串"hello, channel!"。最後,主協程從通道中接收數據,並將其列印出來。

通過使用通道,我們可以實現協程之間的數據傳輸和同步,確保數據的安全共用和線程安全性。通道的使用能夠簡化併發編程的複雜性,提供一種高效、可靠的方式來處理併發場景下的數據傳遞。

3.2 使用channel實現彙總數據

下麵,我們使用channel來實現併發數據彙總,替換掉之前使用互斥鎖來保證線程安全的實現:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {
        // 模擬 RPC 介面拉取數據的耗時操作
        time.Sleep(time.Second)

        // 假設從 RPC 介面獲取到了一批數據
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page),
        }

        ch <- data // 將數據發送到通道

        wg.Done()
}

func main() {
        var wg sync.WaitGroup

        // 定義需要拉取的數據頁數
        numPages := 10

        dataCh := make(chan Data, 10) // 創建用於接收數據的通道

        // 啟動多個協程併發地拉取數據
        for i := 1; i <= numPages; i++ {
                wg.Add(1)
                go fetchData(i, dataCh, &wg)
        }

        go func() {
                wg.Wait()
                close(dataCh) // 關閉通道,表示數據已經全部發送完成
        }()

        // 從通道接收數據並彙總
        var dataList []Data
        for data := range dataCh {
            dataList = append(dataList, data)
        }

        // 列印拉取到的數據
        fmt.Println("Fetched data:")
        for _, data := range dataList {
                fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
        }
}

在修改後的代碼中,我們創建了一個用於接收數據的 dataCh。每個協程通過將數據發送到該channel 來完成數據的彙總。主協程通過從channel接收數據,並將其添加到 dataList 中實現數據的彙總過程。這種方式不需要顯式地加鎖和解鎖,並且避免了互斥鎖帶來的複雜性和性能問題。

通過使用channel,我們能夠以一種更直觀、更安全的方式實現協程之間的數據傳遞和同步。channel在併發編程中起到了關鍵的作用,簡化了併發操作的管理和實現。同時,它提供了內置的同步機制,保證了數據的正確性和一致性,避免了死鎖和競態條件的問題。

3.3 總結

協程間的併發下彙總數據可以歸類為協程間的數據傳遞這個場景。在這個場景中,多個協程併發地拉取數據,然後將數據彙總到一個共用的數據結構中。為了保證數據的正確性和一致性,需要使用某種機制來確保多個協程對共用數據的併發訪問是安全的。

在原始的實現中,使用了互斥鎖來保護共用數據的併發訪問。互斥鎖提供了互斥訪問的機制,確保同一時間只有一個協程可以訪問共用數據,從而避免了數據競爭和不一致性。這種方式在保證線程安全的同時,引入了鎖的開銷和複雜性。

而使用channel來實現協程間的安全數據傳遞可以更簡潔和高效。每個協程可以將拉取到的數據通過channel發送到主協程,主協程通過接收channel中的數據來進行彙總。channel提供了併發安全的數據傳遞機制,協程之間的數據傳輸是同步和有序的。由於channel本身就提供了同步機制,不需要額外的鎖和同步操作,能夠更簡潔地實現協程間的安全數據傳遞。

因此,如果需要在多個協程間實現數據傳遞,而且由此可能帶來線程安全的問題,此時使用channel來實現是相對比較合適的。

4. 開源項目中的使用

假設我們需要對etcd進行性能測試,此時需要模擬大量併發請求,對etcd進行負載測試,並收集每個請求的執行時間、成功/失敗狀態等結果數據。然後主協程需要收集每一個請求的結果數據,併進行統計計算,生成相應的性能報告。基於此,能夠計算出總請求數、請求成功率、平均執行時間、最慢/最快請求等統計信息,以及錯誤分佈情況和慢速請求的詳細信息。

從上面的講述來看,其實我們可以大概想象出這個模型,多個協程併發執行,然後獲取每個請求的結果數據。然後主協程需要收集彙總這些數據,基於此來生成性能報告。這個模型其實也就是我們上面所說的協程併發下的數據彙總,因此通過channel來實現協程間的數據傳輸,是非常合適的。

下麵我們來看看etcd中對應的實現。etcd中存在一個report對象的實現,能夠接受一系列的請求數據的結果,然後生成性能報告返回回去。結構體定義如下:

type report struct {
   results   chan Result
   stats Stats
}
func (r *report) Results() chan<- Result { return r.results }

// Result describes the timings for an operation.
type Result struct {
   Start  time.Time
   End    time.Time
   Err    error
}

func newReport(precision string) *report {
   r := &report{
      results:   make(chan Result, 16),
   }
   return r
}

Result結構體為單個測試的結果,而 report 結構體則用於整個測試過程的報告和統計信息。通過使用 results 通道,可以將每個測試的結果發送到 report 結構體中,以便進行統計和生成報告。

當進行性能壓測時,首先通過newReport生成一個report對象,然後啟動多個協程同時進行壓測請求,每一個請求處理完成之後,便會生成一個處理結果,存儲到Result對象當中。然後基於report對象的Results方法獲取到對應的channel,將處理結果傳輸給主協程。

主協程便通過遍歷report對象中的results變數對應的channel,彙總計算所有處理結果,基於此便能夠生成壓測結果和報告。下麵來看其具體流程。

首先是創建一個report對象,然後啟動多個協程來處理請求,將結果發送到report對象中的results對應的channel中。

// 這裡NewReportSample方法,其實是對上面newReport方法的一個封裝
r := NewReportSample("%f")
// 這裡假設只有一個協程,模擬執行一系列的測試,並將測試結果發送到 Report 對象的 results 通道中。
go func() {
   start := time.Now()
   for i := 0; i < 5; i++ {
      // 不真實進行請求,只是簡單獲取執行結果,將測試結果進行傳輸
      end := start.Add(time.Second)
      r.Results() <- Result{Start: start, End: end}
      start = end
   }
   r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
   // 假設所有壓測請求都執行完成了
   close(r.Results())
}()
// 主協程 彙總所有的處理結果,然後生成壓測報告
stats := <-r.Stats()

以上代碼中,r 是通過 NewReportSample("%f") 創建的一個 Report 對象。然後,在一個單獨的協程中,執行了一系列的測試,並將測試結果發送到 r.Results() 通道中。

這段代碼的作用是模擬執行一系列的測試,並將測試結果發送到 Report 對象的 results 通道中。通過使用 r.Results() 方法返回的通道,可以將測試結果發送到報告對象中進行統計和處理。

接下來,主協程應該不斷從 r.Results()方法返回的通道中讀取數據,彙總所有的處理結果,從而生成壓測報告。這個方法其實是被封裝在r.Stas()方法中,具體如下:

func (r *report) Stats() <-chan Stats {
    // 創建一個channel
   donec := make(chan Stats, 1)
   // 啟動一個協程來執行
   go func() {
      defer close(donec)
      r.processResults()
      s := r.stats.copy()
      if r.sps != nil {
         s.TimeSeries = r.sps.getTimeSeries()
      }
      // 執行完成的話,將結果返回
      donec <- s
   }()
   // 返回channel
   return donec
}

// Stats方法啟動的協程中,實際運行的任務
func (r *report) processResults() {
   st := time.Now()
   // 遍歷r.results方法中channel中的數據,然後執行處理流程
   for res := range r.results {
      r.processResult(&res)
   }
   // 後續執行一些具體的計算邏輯
}

上述代碼是 report 結構體中的兩個方法,其中 Stats() 方法返回一個只讀的 Stats 通道。這個方法會在一個單獨的協程中執行,並處理 results 通道中的測試結果。事實上就是彙總channel中的數據,然後進行一定的處理,然後返回。

5. 總結

本文通過介紹併發編程中的數據彙總問題,提出了使用互斥鎖和通道來保證線程安全的方法。互斥鎖適用於臨界區保護和共用資源的互斥訪問,但可能存在死鎖和性能瓶頸的問題。相比之下,通道提供了更直觀和安全的協程間通信方式,避免了鎖的問題,並提供了更靈活的併發模式。

基於以上內容的介紹,大概能夠明確下,在數據傳遞和彙總的場景下,使用channel來實現可能是更為合適的,能夠提高代碼的可讀性和併發安全性。希望以上內容對你有所幫助。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • # 使用 Async Rust 構建簡單的 P2P 節點 ### P2P 簡介 - P2P:peer-to-peer - P2P 是一種網路技術,可以在不同的電腦之間共用各種計算資源,如 CPU、網路帶寬和存儲。 - P2P 是當今用戶線上共用文件(如音樂、圖像和其他數字媒體)的一種非常常用的方法 ...
  • ## 1.1 為什麼要學 Qt Qt是一個跨平臺的 C++ 圖形用戶界面應用程式框架 Qt 為應用程式開發者提供建立藝術級圖形界面所需的所有功能 Qt 是完全面向對象的,很容易擴展,並且允許真正的組件編程 (1)Qt 發展史 在講解學習 Qt 的必要性之前, 先來瞭解下 Qt 的發展歷史: 1991 ...
  • 用go設計開發一個自己的輕量級登錄庫/框架吧(拓展篇),給自己的庫/框架拓展一下吧,主庫:https://github.com/weloe/token-go ...
  • ### 1.0 匿名對象的基本知識 * 匿名對象 顧名思義,匿名對象指的就是沒有名字的對象,在使用中理解為實例化一個類對象,但是並不把它賦給一個對應的類變數,而是直接使用。在理解匿名對象前,我們先創建一個類便於後面的使用。 * 匿名對象具有以下特征: 語法上:只創建對象,但不用變數來接收,例如:假設 ...
  • Groovy是一種基於Java平臺的動態編程語言,它結合了Python、Ruby和Smalltalk等語言的特性,同時與Java無縫集成。 ...
  • 從 JDK7 開始,引入了表示非同步通道的 `AsynchronousSockerChannel` 類和 `AsynchronousServerSocketChannel` 類,這兩個類的作用與 `SocketChannel` 類和 `ServerSockelChannel` 相似,區別在於非同步通道的 ...
  • ## 文章首發 [【重學C++】02 脫離指針陷阱:深入淺出 C++ 智能指針](https://mp.weixin.qq.com/s/McD-kfsiQ7hW1UnsAriC1g) ## 前言 大家好,今天是【重學C++】系列的第二講,我們來聊聊C++的智能指針。 ## 為什麼需要智能指針 在上一 ...
  • CAS 是樂觀鎖設計思想的實現。CAS 的思想是:在“讀取 - 修改 - 寫回”操作序列中,先讀取並修改數據,寫回數據前先判斷讀取數據後的這段時間內數據是否發生變化(共用變數的當前值是否是我們的期望值)。通過 CAS 我們可以以無鎖的方式,保證對共用數據進行 “讀取 - 修改 - 寫回” 操作序列的... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...