使用Go搭建並行排序處理管道筆記

来源:https://www.cnblogs.com/jn-shao/archive/2022/05/09/16248109.html
-Advertisement-
Play Games

一、並行管道搭建: 總結下實現思路: 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的; 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群; 根據粒度,單機版:非併發節點可以 ...


一、並行管道搭建:

總結下實現思路:

  1. 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的;
  2. 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群;
  3. 根據粒度,單機版:非併發節點可以是排序方法,併發節點可以是一個線程/協程去處理(非同步排序),集群版節點是一個主機;
  4. 單機版,不管併發還是非併發,節點採用的是記憶體共用數據;集群版節點則需要網路連接請求應答來共用數據;
  5. go語言非同步數據傳輸通道通過channel實現的;
  6. 每個節點將處理的數據非同步發送到各自channel中,等待一個主節點獲取歸併,集群版多了網路的數據傳輸。

 

二、代碼實現:
  1. 本地節點 nodes.go:
    package pipeline
    
    import (
    	"encoding/binary"
    	"fmt"
    	"io"
    	"math/rand"
    	"sort"
    	"time"
    )
    
    var startTime time.Time
    
    func Init() {
    	startTime = time.Now()
    }
    
    //內部處理方法
    //這裡是排序:非同步處理容器元素排序
    func InMemSort(in <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		a := []int{}
    		for v := range in {
    			a = append(a, v)
    		}
    		fmt.Println("Read done:", time.Since(startTime))
    
    		sort.Ints(a)
    		fmt.Println("InMemSort done:", time.Since(startTime))
    
    		for _, v := range a {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    
    //兩路和並,每路通過內部方法非同步處理
    //這裡是排序:in1,in2元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列)
    func Merge(in1, in2 <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	// go func() {
    	// 	v1, ok1 := <-in1
    	// 	v2, ok2 := <-in2
    	// 	for {
    	// 		if ok1 || ok2 {
    	// 			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    	// 				out <- v1
    	// 				v1, ok1 = <-in1
    	// 			} else {
    	// 				out <- v2
    	// 				v2, ok2 = <-in2
    	// 			}
    	// 		} else {
    	// 			close(out)
    	// 			break
    	// 		}
    	// 	}
    	// }()
    	go func() {
    		v1, ok1 := <-in1
    		v2, ok2 := <-in2
    		for ok1 || ok2 {
    			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    				out <- v1
    				v1, ok1 = <-in1
    			} else {
    				out <- v2
    				v2, ok2 = <-in2
    			}
    		}
    		close(out)
    
    		fmt.Println("Merge done:", time.Since(startTime))
    	}()
    	return out
    }
    
    //讀取原數據
    //chunkSize=-1全讀
    func ReadSource(r io.Reader, chunkSize int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		buffer := make([]byte, 8) //int長度根據操作系統來的,64位為int64,64位8個位元組
    		bytesRead := 0
    		for { //持續讀取
    			n, err := r.Read(buffer) //讀取一個int 8byte
    			bytesRead += n
    			if n > 0 {
    				out <- int(binary.BigEndian.Uint64(buffer)) //位元組數組轉int
    			}
    			if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀
    				break
    			}
    		}
    		close(out)
    	}()
    	return out
    }
    
    //寫處理後(排序)數據
    func WriteSink(w io.Writer, in <-chan int) {
    	for v := range in {
    		buffer := make([]byte, 8)
    		binary.BigEndian.PutUint64(buffer, uint64(v))
    		w.Write(buffer)
    	}
    }
    
    //隨機生成數據源
    func RandomSource(count int) <-chan int {
    	out := make(chan int)
    	go func() {
    		for i := 0; i < count; i++ {
    			out <- rand.Int()
    		}
    		close(out)
    	}()
    	return out
    }
    
    //多路兩兩歸併,每路通過內部方法非同步處理
    //這裡是排序:ins元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列)
    func MergeN(ins ...<-chan int) <-chan int {
    	if len(ins) == 1 {
    		return ins[0]
    	}
    	m := len(ins) / 2
    	return Merge(
    		MergeN(ins[:m]...),
    		MergeN(ins[m:]...)) //chennel非同步併發歸併
    }
    

      

  2. 網路節點:
    package pipeline
    
    import (
    	"bufio"
    	"net"
    )
    
    //節點服務端數據寫入到Network中
    //開啟服務後,用goroutine等連接,避免創建pipeline阻塞
    func NetworkSink(addr string, in <-chan int) {
    	//net必須是面向流的網路:"tcp"、"tcp4"、"tcp6"、"unix"或"unixpacket"
    	listener, err := net.Listen("tcp", addr) //addr ip:port
    	if err != nil {
    		panic(err)
    	}
    	go func() { //不能等待阻塞
    		for {
    			conn, err := listener.Accept()
    			if err != nil {
    				continue
    			}
    			w := bufio.NewWriter(conn)
    			WriteSink(w, in)
    			w.Flush()    //使用bufio Writer最後一定要Flush把緩存數據發出去  defer
    			conn.Close() //關閉
    		}
    		// defer listener.Close()
    		// conn, err := listener.Accept()
    		// if err != nil {
    		// 	panic(err)
    		// }
    		// defer conn.Close()
    		// w := bufio.NewWriter(conn)
    		// WriteSink(w, in)
    		// defer w.Flush()
    	}()
    }
    
    //Network向節點服務端讀取數據源
    func NetworkSource(addr string) <-chan int {
    	out := make(chan int)
    	go func() {
    		conn, err := net.Dial("tcp", addr)
    		if err != nil {
    			panic(err)
    		}
    		defer conn.Close()
    
    		r := ReadSource(bufio.NewReader(conn), -1)
    		for v := range r {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    

      

  3. 創建管道:
    package main
    
    import (
    	"bufio"
    	"fmt"
    	"goBase/pipelinedemo/pipeline"
    	"os"
    	"strconv"
    )
    
    const sourceFilename = "../large.in"
    const resultFilename = "../large.out"
    
    //單機版而言,併發使用channel效率肯定是下降的
    //好處,當文件過大,一臺機器排不了,多機排序
    func main() {
    	p, files := createNetworkPipeline(sourceFilename, 800000000, 4) //平均每個文件讀取int64數:800000000/8/4
    	defer func() {
    		for _, file := range files {
    			file.Close()
    		}
    	}()
    	writeToFile(p, resultFilename) //該方法運行,通道才真正打開
    	printFile(resultFilename)
    }
    
    
    //創建並行處理管道
    //fileSize 文件位元組數
    //chunkCount 節點數 讀取文件分塊數
    func createNetworkPipeline(filename string, fileSize, chunkCount int) (<-chan int, []*os.File) {
    
    	chunkSize := fileSize / chunkCount //每個節點讀取文件位元組數
    
    	//outs := make([]<-chan int, chunkCount)
    	outs := []<-chan int{}
    	sortAddr := []string{}
    
    	files := []*os.File{}
    
    	pipeline.Init() //開始計時
    
    	//#region 節點服務端工作
    
    	for count := 0; count < chunkCount; count++ {
    		file, err := os.Open(filename) //這裡file沒有close,需要返回*[]File,在外面close
    		if err != nil {
    			panic(err)
    		}
    		files = append(files, file)
    
    		//Seek設置下一次讀/寫的位置。offset為相對偏移量,
    		//whence決定相對位置:0為相對文件開頭,1為相對當前位置,2為相對文件結尾
    		file.Seek(int64(count*chunkSize), 0) //讀文件位元組範圍
    
    		source := pipeline.ReadSource(bufio.NewReader(file), chunkSize)
    
    		// outs = append(outs, pipeline.InMemSort(source))
    		//本機地址
    		addr := ":" + strconv.Itoa(7000+count)                 //將數字轉換成對應的字元串類型的數字
    		pipeline.NetworkSink(addr, pipeline.InMemSort(source)) //開啟節點服務監聽,收到請求發送數據將寫入到Network,非同步不能等待阻塞
    
    		sortAddr = append(sortAddr, addr)
    	}
    
    	//#endregion
    
    
    	//#region Network工作
    
    	for _, addr := range sortAddr {
    		outs = append(outs, pipeline.NetworkSource(addr))
    	}
    	//構建管道,goroutine還沒有運行,不能確定InMemSort是否全部排序完成,不能在該方法close file
    	return pipeline.MergeN(outs...), files
    
    	//#endregion
    }
    
    func writeToFile(in <-chan int, filename string) {
    	file, err := os.Create(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    
    	w := bufio.NewWriter(file)
    	defer w.Flush()
    
    	pipeline.WriteSink(w, in)
    }
    
    func printFile(filename string) {
    	file, err := os.Open(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    	count := 0
    	all := pipeline.ReadSource(bufio.NewReader(file), -1)
    	for s := range all {
    		fmt.Println(s)
    		count++
    		if count > 100 {
    			break
    		}
    	}
    }

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大家好,我是半夏👴,一個剛剛開始寫文的沙雕程式員.如果喜歡我的文章,可以關註➕ 點贊 👍 加我微信:frontendpicker,一起學習交流前端,成為更優秀的工程師~關註公眾號:搞前端的半夏,瞭解更多前端知識! 點我探索新世界! 原文鏈接 ==>http://sylblog.xin/archi ...
  • 詳解JavaScript中的事件迴圈機制!!! 一、簡單講解 這個大家應該或多或少都知道的 for (var i = 0; i < 10; i++) { setTimeout(() => { console.log(i); // 輸出10個10 }); } 解析:先執行for迴圈,迴圈疊加i,然後再 ...
  • 前端周刊:2022-6 期 前端開發 videojs-plugin-marker--videojs 進度條打點插件 基於 videojs-marker-plugin 項目二次開發,支持 marker 點更新和 marker 點擊事件;更換構建工具為 Vite。 @cutting-mat/direct ...
  • ####一、前言 #####因為最近練習Vue3項目的時候,發現Chrome瀏覽器的devtools插件不起作用了,這才想起當前安裝的devtools是5版本的,而Vue3項目需要6版本才支持。 ####二、安裝 #####1. 在Github上搜索devtools項目,直達車 #####2. 找到 ...
  • 認識jsJS是寄生語言,寄生在HTML中,與JAVA沒有關係JS可以寫在html文件中,以<script></script>標簽對出現在一些瀏覽器中,使用控制台可以監控到JS的工作<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8 ...
  • 1.概念 1.1.知道的越少越好 迪米特法則,結合其含義又稱之為“最少知道原則”,即一個類作為一個調用方,應當對自己依賴的類(被調用的類)其中所處理的邏輯細節,知道的越少越好。對於被依賴的類(被調用的類)不管在使用上多麼的複雜,它都應儘量將處理邏輯封裝在它的內部,對調用方提供簡潔明瞭的公共方法即可, ...
  • 大家好,我是DD,已經是封閉在家的第51天了! 最近一直在更新Java新特性和IDEA Tips兩個專欄,其他方向內容的動態關註少了。昨天天晚上刷推的時候,瞄到了這個神奇的東西,覺得挺cool的,拿出來分享下: 相信你看到圖,不用我說,你也猜到是啥了吧?html里可以跑python代碼了! 看到好多 ...
  • 講義: 動態sql可以定義代碼片斷,可以進行邏輯判斷,可以進行迴圈處理(批量處理),使條件判斷更為簡單。 一、動態sql核心標簽: 1、<sql>:當多種類型的查詢語句的查詢欄位或者查詢條件相同時,可以將其定義為常量,方便調用。 2、<include>:用來引用<sql>定義的代碼片斷。 <!--定 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...