一、並行管道搭建: 總結下實現思路: 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的; 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群; 根據粒度,單機版:非併發節點可以 ...
一、並行管道搭建:
總結下實現思路:
- 歸併排序:進行集合元素排序(節點),並兩兩節點歸併排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的;
- 節點:任務處理單元,歸併排序節點是處理輸出有序集合任務的單元;文件過大單台機排不了需要多台機集群;
- 根據粒度,單機版:非併發節點可以是排序方法,併發節點可以是一個線程/協程去處理(非同步排序),集群版節點是一個主機;
- 單機版,不管併發還是非併發,節點採用的是記憶體共用數據;集群版節點則需要網路連接請求應答來共用數據;
- go語言非同步數據傳輸通道通過channel實現的;
- 每個節點將處理的數據非同步發送到各自channel中,等待一個主節點獲取歸併,集群版多了網路的數據傳輸。
二、代碼實現:
- 本地節點 nodes.go:
package pipeline import ( "encoding/binary" "fmt" "io" "math/rand" "sort" "time" ) var startTime time.Time func Init() { startTime = time.Now() } //內部處理方法 //這裡是排序:非同步處理容器元素排序 func InMemSort(in <-chan int) <-chan int { out := make(chan int, 1024) go func() { a := []int{} for v := range in { a = append(a, v) } fmt.Println("Read done:", time.Since(startTime)) sort.Ints(a) fmt.Println("InMemSort done:", time.Since(startTime)) for _, v := range a { out <- v } close(out) }() return out } //兩路和並,每路通過內部方法非同步處理 //這裡是排序:in1,in2元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列) func Merge(in1, in2 <-chan int) <-chan int { out := make(chan int, 1024) // go func() { // v1, ok1 := <-in1 // v2, ok2 := <-in2 // for { // if ok1 || ok2 { // if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 // out <- v1 // v1, ok1 = <-in1 // } else { // out <- v2 // v2, ok2 = <-in2 // } // } else { // close(out) // break // } // } // }() go func() { v1, ok1 := <-in1 v2, ok2 := <-in2 for ok1 || ok2 { if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 out <- v1 v1, ok1 = <-in1 } else { out <- v2 v2, ok2 = <-in2 } } close(out) fmt.Println("Merge done:", time.Since(startTime)) }() return out } //讀取原數據 //chunkSize=-1全讀 func ReadSource(r io.Reader, chunkSize int) <-chan int { out := make(chan int, 1024) go func() { buffer := make([]byte, 8) //int長度根據操作系統來的,64位為int64,64位8個位元組 bytesRead := 0 for { //持續讀取 n, err := r.Read(buffer) //讀取一個int 8byte bytesRead += n if n > 0 { out <- int(binary.BigEndian.Uint64(buffer)) //位元組數組轉int } if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀 break } } close(out) }() return out } //寫處理後(排序)數據 func WriteSink(w io.Writer, in <-chan int) { for v := range in { buffer := make([]byte, 8) binary.BigEndian.PutUint64(buffer, uint64(v)) w.Write(buffer) } } //隨機生成數據源 func RandomSource(count int) <-chan int { out := make(chan int) go func() { for i := 0; i < count; i++ { out <- rand.Int() } close(out) }() return out } //多路兩兩歸併,每路通過內部方法非同步處理 //這裡是排序:ins元素需要排好序(經過內部方法InMemSort非同步處理)的容器單元(channel 非同步容器/隊列) func MergeN(ins ...<-chan int) <-chan int { if len(ins) == 1 { return ins[0] } m := len(ins) / 2 return Merge( MergeN(ins[:m]...), MergeN(ins[m:]...)) //chennel非同步併發歸併 }
- 網路節點:
package pipeline import ( "bufio" "net" ) //節點服務端數據寫入到Network中 //開啟服務後,用goroutine等連接,避免創建pipeline阻塞 func NetworkSink(addr string, in <-chan int) { //net必須是面向流的網路:"tcp"、"tcp4"、"tcp6"、"unix"或"unixpacket" listener, err := net.Listen("tcp", addr) //addr ip:port if err != nil { panic(err) } go func() { //不能等待阻塞 for { conn, err := listener.Accept() if err != nil { continue } w := bufio.NewWriter(conn) WriteSink(w, in) w.Flush() //使用bufio Writer最後一定要Flush把緩存數據發出去 defer conn.Close() //關閉 } // defer listener.Close() // conn, err := listener.Accept() // if err != nil { // panic(err) // } // defer conn.Close() // w := bufio.NewWriter(conn) // WriteSink(w, in) // defer w.Flush() }() } //Network向節點服務端讀取數據源 func NetworkSource(addr string) <-chan int { out := make(chan int) go func() { conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } defer conn.Close() r := ReadSource(bufio.NewReader(conn), -1) for v := range r { out <- v } close(out) }() return out }
- 創建管道:
package main import ( "bufio" "fmt" "goBase/pipelinedemo/pipeline" "os" "strconv" ) const sourceFilename = "../large.in" const resultFilename = "../large.out" //單機版而言,併發使用channel效率肯定是下降的 //好處,當文件過大,一臺機器排不了,多機排序 func main() { p, files := createNetworkPipeline(sourceFilename, 800000000, 4) //平均每個文件讀取int64數:800000000/8/4 defer func() { for _, file := range files { file.Close() } }() writeToFile(p, resultFilename) //該方法運行,通道才真正打開 printFile(resultFilename) } //創建並行處理管道 //fileSize 文件位元組數 //chunkCount 節點數 讀取文件分塊數 func createNetworkPipeline(filename string, fileSize, chunkCount int) (<-chan int, []*os.File) { chunkSize := fileSize / chunkCount //每個節點讀取文件位元組數 //outs := make([]<-chan int, chunkCount) outs := []<-chan int{} sortAddr := []string{} files := []*os.File{} pipeline.Init() //開始計時 //#region 節點服務端工作 for count := 0; count < chunkCount; count++ { file, err := os.Open(filename) //這裡file沒有close,需要返回*[]File,在外面close if err != nil { panic(err) } files = append(files, file) //Seek設置下一次讀/寫的位置。offset為相對偏移量, //whence決定相對位置:0為相對文件開頭,1為相對當前位置,2為相對文件結尾 file.Seek(int64(count*chunkSize), 0) //讀文件位元組範圍 source := pipeline.ReadSource(bufio.NewReader(file), chunkSize) // outs = append(outs, pipeline.InMemSort(source)) //本機地址 addr := ":" + strconv.Itoa(7000+count) //將數字轉換成對應的字元串類型的數字 pipeline.NetworkSink(addr, pipeline.InMemSort(source)) //開啟節點服務監聽,收到請求發送數據將寫入到Network,非同步不能等待阻塞 sortAddr = append(sortAddr, addr) } //#endregion //#region Network工作 for _, addr := range sortAddr { outs = append(outs, pipeline.NetworkSource(addr)) } //構建管道,goroutine還沒有運行,不能確定InMemSort是否全部排序完成,不能在該方法close file return pipeline.MergeN(outs...), files //#endregion } func writeToFile(in <-chan int, filename string) { file, err := os.Create(filename) if err != nil { panic(err) } defer file.Close() w := bufio.NewWriter(file) defer w.Flush() pipeline.WriteSink(w, in) } func printFile(filename string) { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close() count := 0 all := pipeline.ReadSource(bufio.NewReader(file), -1) for s := range all { fmt.Println(s) count++ if count > 100 { break } } }