使用Go搭建並行排序處理管道筆記

来源:https://www.cnblogs.com/jn-shao/archive/2022/05/09/16248109.html
-Advertisement-
Play Games

一、並行管道搭建: 總結下實現思路: 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的; 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群; 根據粒度,單機版:非併發節點可以 ...


一、並行管道搭建:

總結下實現思路:

  1. 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的;
  2. 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群;
  3. 根據粒度,單機版:非併發節點可以是排序方法,併發節點可以是一個線程/協程去處理(非同步排序),集群版節點是一個主機;
  4. 單機版,不管併發還是非併發,節點採用的是記憶體共用數據;集群版節點則需要網路連接請求應答來共用數據;
  5. go語言非同步數據傳輸通道通過channel實現的;
  6. 每個節點將處理的數據非同步發送到各自channel中,等待一個主節點獲取歸併,集群版多了網路的數據傳輸。

 

二、代碼實現:
  1. 本地節點 nodes.go:
    package pipeline
    
    import (
    	"encoding/binary"
    	"fmt"
    	"io"
    	"math/rand"
    	"sort"
    	"time"
    )
    
    var startTime time.Time
    
    func Init() {
    	startTime = time.Now()
    }
    
    //內部處理方法
    //這裡是排序:非同步處理容器元素排序
    func InMemSort(in <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		a := []int{}
    		for v := range in {
    			a = append(a, v)
    		}
    		fmt.Println("Read done:", time.Since(startTime))
    
    		sort.Ints(a)
    		fmt.Println("InMemSort done:", time.Since(startTime))
    
    		for _, v := range a {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    
    //兩路和並,每路通過內部方法非同步處理
    //這裡是排序:in1,in2元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列)
    func Merge(in1, in2 <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	// go func() {
    	// 	v1, ok1 := <-in1
    	// 	v2, ok2 := <-in2
    	// 	for {
    	// 		if ok1 || ok2 {
    	// 			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    	// 				out <- v1
    	// 				v1, ok1 = <-in1
    	// 			} else {
    	// 				out <- v2
    	// 				v2, ok2 = <-in2
    	// 			}
    	// 		} else {
    	// 			close(out)
    	// 			break
    	// 		}
    	// 	}
    	// }()
    	go func() {
    		v1, ok1 := <-in1
    		v2, ok2 := <-in2
    		for ok1 || ok2 {
    			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    				out <- v1
    				v1, ok1 = <-in1
    			} else {
    				out <- v2
    				v2, ok2 = <-in2
    			}
    		}
    		close(out)
    
    		fmt.Println("Merge done:", time.Since(startTime))
    	}()
    	return out
    }
    
    //讀取原數據
    //chunkSize=-1全讀
    func ReadSource(r io.Reader, chunkSize int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		buffer := make([]byte, 8) //int長度根據操作系統來的,64位為int64,64位8個位元組
    		bytesRead := 0
    		for { //持續讀取
    			n, err := r.Read(buffer) //讀取一個int 8byte
    			bytesRead += n
    			if n > 0 {
    				out <- int(binary.BigEndian.Uint64(buffer)) //位元組數組轉int
    			}
    			if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀
    				break
    			}
    		}
    		close(out)
    	}()
    	return out
    }
    
    //寫處理後(排序)數據
    func WriteSink(w io.Writer, in <-chan int) {
    	for v := range in {
    		buffer := make([]byte, 8)
    		binary.BigEndian.PutUint64(buffer, uint64(v))
    		w.Write(buffer)
    	}
    }
    
    //隨機生成數據源
    func RandomSource(count int) <-chan int {
    	out := make(chan int)
    	go func() {
    		for i := 0; i < count; i++ {
    			out <- rand.Int()
    		}
    		close(out)
    	}()
    	return out
    }
    
    //多路兩兩歸併,每路通過內部方法非同步處理
    //這裡是排序:ins元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列)
    func MergeN(ins ...<-chan int) <-chan int {
    	if len(ins) == 1 {
    		return ins[0]
    	}
    	m := len(ins) / 2
    	return Merge(
    		MergeN(ins[:m]...),
    		MergeN(ins[m:]...)) //chennel非同步併發歸併
    }
    

      

  2. 網路節點:
    package pipeline
    
    import (
    	"bufio"
    	"net"
    )
    
    //節點服務端數據寫入到Network中
    //開啟服務後,用goroutine等連接,避免創建pipeline阻塞
    func NetworkSink(addr string, in <-chan int) {
    	//net必須是面向流的網路:"tcp"、"tcp4"、"tcp6"、"unix"或"unixpacket"
    	listener, err := net.Listen("tcp", addr) //addr ip:port
    	if err != nil {
    		panic(err)
    	}
    	go func() { //不能等待阻塞
    		for {
    			conn, err := listener.Accept()
    			if err != nil {
    				continue
    			}
    			w := bufio.NewWriter(conn)
    			WriteSink(w, in)
    			w.Flush()    //使用bufio Writer最後一定要Flush把緩存數據發出去  defer
    			conn.Close() //關閉
    		}
    		// defer listener.Close()
    		// conn, err := listener.Accept()
    		// if err != nil {
    		// 	panic(err)
    		// }
    		// defer conn.Close()
    		// w := bufio.NewWriter(conn)
    		// WriteSink(w, in)
    		// defer w.Flush()
    	}()
    }
    
    //Network向節點服務端讀取數據源
    func NetworkSource(addr string) <-chan int {
    	out := make(chan int)
    	go func() {
    		conn, err := net.Dial("tcp", addr)
    		if err != nil {
    			panic(err)
    		}
    		defer conn.Close()
    
    		r := ReadSource(bufio.NewReader(conn), -1)
    		for v := range r {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    

      

  3. 創建管道:
    package main
    
    import (
    	"bufio"
    	"fmt"
    	"goBase/pipelinedemo/pipeline"
    	"os"
    	"strconv"
    )
    
    const sourceFilename = "../large.in"
    const resultFilename = "../large.out"
    
    //單機版而言,併發使用channel效率肯定是下降的
    //好處,當文件過大,一臺機器排不了,多機排序
    func main() {
    	p, files := createNetworkPipeline(sourceFilename, 800000000, 4) //平均每個文件讀取int64數:800000000/8/4
    	defer func() {
    		for _, file := range files {
    			file.Close()
    		}
    	}()
    	writeToFile(p, resultFilename) //該方法運行,通道才真正打開
    	printFile(resultFilename)
    }
    
    
    //創建並行處理管道
    //fileSize 文件位元組數
    //chunkCount 節點數 讀取文件分塊數
    func createNetworkPipeline(filename string, fileSize, chunkCount int) (<-chan int, []*os.File) {
    
    	chunkSize := fileSize / chunkCount //每個節點讀取文件位元組數
    
    	//outs := make([]<-chan int, chunkCount)
    	outs := []<-chan int{}
    	sortAddr := []string{}
    
    	files := []*os.File{}
    
    	pipeline.Init() //開始計時
    
    	//#region 節點服務端工作
    
    	for count := 0; count < chunkCount; count++ {
    		file, err := os.Open(filename) //這裡file沒有close,需要返回*[]File,在外面close
    		if err != nil {
    			panic(err)
    		}
    		files = append(files, file)
    
    		//Seek設置下一次讀/寫的位置。offset為相對偏移量,
    		//whence決定相對位置:0為相對文件開頭,1為相對當前位置,2為相對文件結尾
    		file.Seek(int64(count*chunkSize), 0) //讀文件位元組範圍
    
    		source := pipeline.ReadSource(bufio.NewReader(file), chunkSize)
    
    		// outs = append(outs, pipeline.InMemSort(source))
    		//本機地址
    		addr := ":" + strconv.Itoa(7000+count)                 //將數字轉換成對應的字元串類型的數字
    		pipeline.NetworkSink(addr, pipeline.InMemSort(source)) //開啟節點服務監聽,收到請求發送數據將寫入到Network,非同步不能等待阻塞
    
    		sortAddr = append(sortAddr, addr)
    	}
    
    	//#endregion
    
    
    	//#region Network工作
    
    	for _, addr := range sortAddr {
    		outs = append(outs, pipeline.NetworkSource(addr))
    	}
    	//構建管道,goroutine還沒有運行,不能確定InMemSort是否全部排序完成,不能在該方法close file
    	return pipeline.MergeN(outs...), files
    
    	//#endregion
    }
    
    func writeToFile(in <-chan int, filename string) {
    	file, err := os.Create(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    
    	w := bufio.NewWriter(file)
    	defer w.Flush()
    
    	pipeline.WriteSink(w, in)
    }
    
    func printFile(filename string) {
    	file, err := os.Open(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    	count := 0
    	all := pipeline.ReadSource(bufio.NewReader(file), -1)
    	for s := range all {
    		fmt.Println(s)
    		count++
    		if count > 100 {
    			break
    		}
    	}
    }

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大家好,我是半夏👴,一個剛剛開始寫文的沙雕程式員.如果喜歡我的文章,可以關註➕ 點贊 👍 加我微信:frontendpicker,一起學習交流前端,成為更優秀的工程師~關註公眾號:搞前端的半夏,瞭解更多前端知識! 點我探索新世界! 原文鏈接 ==>http://sylblog.xin/archi ...
  • 詳解JavaScript中的事件迴圈機制!!! 一、簡單講解 這個大家應該或多或少都知道的 for (var i = 0; i < 10; i++) { setTimeout(() => { console.log(i); // 輸出10個10 }); } 解析:先執行for迴圈,迴圈疊加i,然後再 ...
  • 前端周刊:2022-6 期 前端開發 videojs-plugin-marker--videojs 進度條打點插件 基於 videojs-marker-plugin 項目二次開發,支持 marker 點更新和 marker 點擊事件;更換構建工具為 Vite。 @cutting-mat/direct ...
  • ####一、前言 #####因為最近練習Vue3項目的時候,發現Chrome瀏覽器的devtools插件不起作用了,這才想起當前安裝的devtools是5版本的,而Vue3項目需要6版本才支持。 ####二、安裝 #####1. 在Github上搜索devtools項目,直達車 #####2. 找到 ...
  • 認識jsJS是寄生語言,寄生在HTML中,與JAVA沒有關係JS可以寫在html文件中,以<script></script>標簽對出現在一些瀏覽器中,使用控制台可以監控到JS的工作<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8 ...
  • 1.概念 1.1.知道的越少越好 迪米特法則,結合其含義又稱之為“最少知道原則”,即一個類作為一個調用方,應當對自己依賴的類(被調用的類)其中所處理的邏輯細節,知道的越少越好。對於被依賴的類(被調用的類)不管在使用上多麼的複雜,它都應儘量將處理邏輯封裝在它的內部,對調用方提供簡潔明瞭的公共方法即可, ...
  • 大家好,我是DD,已經是封閉在家的第51天了! 最近一直在更新Java新特性和IDEA Tips兩個專欄,其他方向內容的動態關註少了。昨天天晚上刷推的時候,瞄到了這個神奇的東西,覺得挺cool的,拿出來分享下: 相信你看到圖,不用我說,你也猜到是啥了吧?html里可以跑python代碼了! 看到好多 ...
  • 講義: 動態sql可以定義代碼片斷,可以進行邏輯判斷,可以進行迴圈處理(批量處理),使條件判斷更為簡單。 一、動態sql核心標簽: 1、<sql>:當多種類型的查詢語句的查詢欄位或者查詢條件相同時,可以將其定義為常量,方便調用。 2、<include>:用來引用<sql>定義的代碼片斷。 <!--定 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...