Mit6.824 Lab1-MapReduce

来源:https://www.cnblogs.com/bnyf/archive/2018/06/02/9127307.html
-Advertisement-
Play Games

Mit6.824 是我在學習一些分散式系統方面的知識的時候偶然看到的,然後就開始嘗試跟課。不得不說,國外的課程難度是真的大,一周的時間居然要學一門 Go 語言,然後還要讀論文,進而做MapReduce 實驗。 由於 MR(MapReduce) 框架需要建立在 DFS(Distributed... ...


前言

Mit6.824 是我在學習一些分散式系統方面的知識的時候偶然看到的,然後就開始嘗試跟課。不得不說,國外的課程難度是真的大,一周的時間居然要學一門 Go 語言,然後還要讀論文,進而做MapReduce 實驗。
由於 MR(MapReduce) 框架需要建立在 DFS(Distributed File System)的基礎上實現,所以本實驗是通過使用多線程來模擬分散式環境。雖然難度上大大降低,但是通過該實驗,還是會讓我們對 MR 的核心原理有一個較為深刻的認識。
做實驗之前我們需要先把經典的 MapReduce 論文給看了,窩比較建議直接看英文原文,但如果時間不充裕的話,可以直接在網上找中文的翻譯版。
剛開始做這個實驗的時候真的是一頭霧水,完全不知道如何下手。後來發現這個工程有一個自動化測試文件(test_test.go),每部分實驗都會使用這個測試文件里的函數對代碼進行測試。我們只要順著這個測試函數逐步倒推,然後補全代碼即可。

Part I: Map/Reduce input and output

第一部分是先實現一個順序版(sequential)的MR,讓我們對 MR 的流程有一個大體的認識,並且實現doMap()doReduce() 兩個函數。
其包含兩個測試函數TestSequentialSingle()TestSequentialMany()

TestSequentialSingle()

每個map worker處理一個文件,所以map worker的數量就等於文件的數量。
測試單個map worker 和 reduce worker。

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

TestSequentialMany()

此測試函數測試多個 map worker 和多個 reduce worker。
其運行邏輯和TestSequentialSingle類似。

func TestSequentialMany(t *testing.T) {
    mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

Sequential()

測試函數將工作名稱,測試文件,reduce 的數量,用戶定義的 map 函數,reduce 函數五個實參傳遞給Sequential()

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i++ {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files) + nreduce}
    })
    return
}

Sequential()首先獲取一個Master 對象的指針,然後利用函數閉包運行Master.run()

Master.run()

// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

    mr.doneChannel <- true
}

doMap()

doMap()doReduce()是需要我們去實現的函數。
doMap()的實現主要是將用戶定義的MapFunc()切割的文本,通過 hash 分到 'nReduce'個切片中去。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTaskNumber int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(file string, contents string) []KeyValue,
) {
    // read contents from 'infile'
    dat,err := ioutil.ReadFile(inFile)
    if err != nil {
        log.Fatal("doMap: readFile ", err)
    }

    //transfer data into ‘kvSlice’ according to the mapF()
    kvSlice := mapF(inFile, string(dat))

    //divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
    var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
    for i:=0;i<nReduce;i++ {
        s1 := make([]KeyValue,0)
        reduceKv = append(reduceKv, s1)
    }
    for _,kv := range kvSlice{
        hash := ihash(kv.Key) % nReduce
        reduceKv[hash] = append(reduceKv[hash],kv)
    }

    //write 'reduceKv' into ‘nReduce’ JSON files
    for i := 0;i<nReduce;i++ {
        file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
        if err != nil {
            log.Fatal("doMap: create ", err)
        }

        enc := json.NewEncoder(file)
        for _, kv := range reduceKv[i]{
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("doMap: json encodem ", err)
            }
        }

        file.Close()

    }
}

doReduce()

doReduce()主要是將 key 值相同的 value 打包發送給用戶定義的 ReduceFunc(),獲得一個新的 kv對,key 值不變,而value值則是ReduceFunc()的返回值,排序,最後將新的 kv對 切片寫入文件。

type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTaskNumber int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //read kv slice from the json file
    var kvSlice []KeyValue
    for i := 0;i<nMap;i++{
        //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
        file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
        if err != nil {
            log.Fatal("doReduce: open ", err)
        }
        var kv KeyValue
        dec := json.NewDecoder(file)
        for{
            err := dec.Decode(&kv)
            kvSlice = append(kvSlice,kv)
            if err == io.EOF {
                break
            }
        }
        file.Close()
        /********/
        //此處如果用 defer,可能會造成文件開啟過多,造成程式崩潰
        /********/
    }

    //sort the intermediate kv slices by key
    sort.Sort(ByKey(kvSlice))

    //process kv slices in the reduceF()
    var reduceFValue []string
    var outputKv []KeyValue
    var preKey string = kvSlice[0].Key
    for i,kv := range kvSlice{
        if i == (len(kvSlice) - 1) {
            reduceFValue = append(reduceFValue, kv.Value)
            outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
        } else {
                if kv.Key != preKey {
                    outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
                    reduceFValue = make([]string, 0)
                }
                reduceFValue = append(reduceFValue, kv.Value)
        }

        preKey = kv.Key
    }

    //write the reduce output as JSON encoded kv objects to the file named outFile
    file,err := os.Create(outFile)
    if err != nil {
        log.Fatal("doRuduce: create ", err)
    }
    defer file.Close()

    enc := json.NewEncoder(file)
    for _, kv := range outputKv{
        err := enc.Encode(&kv)
        if err != nil {
            log.Fatal("doRuduce: json encode ", err)
        }
    }
}

Part II: Single-worker word count

第二部分是實現mapF()reduceF()函數,來實現通過順序 MR統計詞頻的功能。
比較簡單,就直接放代碼了。

func mapF(filename string, contents string) []mapreduce.KeyValue {
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    var strSlice []string = strings.FieldsFunc(contents,f)
    var kvSlice []mapreduce.KeyValue
    for _,str := range strSlice {
        kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"})
    }

    return kvSlice
}

func reduceF(key string, values []string) string {
    var cnt int64
    for _,str := range values{
        temp,err := strconv.ParseInt(str,10,64)
        if(err != nil){
            fmt.Println("wc :parseint ",err)
        }
        cnt += temp
    }
    return strconv.FormatInt(cnt,10)
}

Part III: Distributing MapReduce tasks && Part IV: Handling worker failures

第三部分和第四部分可以一起來做,主要是完成schedule(),實現一個通過線程併發執行 map worker 和 reduce worker 的 MR 框架。框架通過 RPC 來模擬分散式計算,並要帶有 worker 的容災功能。

TestBasic()

測試函數啟動兩個線程運行RUnWoker()

func TestBasic(t *testing.T) {
    mr := setup()
    for i := 0; i < 2; i++ {
        go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
            MapFunc, ReduceFunc, -1)
    }
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

setup() && Distributed()

func setup() *Master {
    files := makeInputs(nMap)
    master := port("master")
    mr := Distributed("test", files, nReduce, master)
    return mr
}

通過mr.startRPCServer() 啟動 master 的 RPC 伺服器,然後通過 mr.run()進行 worker 的調度。

// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
    mr = newMaster(master)
    mr.startRPCServer()
    go mr.run(jobName, files, nreduce,
        func(phase jobPhase) {
            ch := make(chan string)
            go mr.forwardRegistrations(ch)
            schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
        },
        func() {
            mr.stats = mr.killWorkers()
            mr.stopRPCServer()
        })
    return
}

Master.forwardRegistrations()

該函數通過worker 的數量來判斷是否有新 worker 啟動,一旦發現有新的 worker 啟動,則使用管道(ch)通知schedule()
理解該函數對實現後面的schedule()至關重要。

// helper function that sends information about all existing
// and newly registered workers to channel ch. schedule()
// reads ch to learn about workers.
func (mr *Master) forwardRegistrations(ch chan string) {
    i := 0
    for {
        mr.Lock()
        if len(mr.workers) > i {
            // there's a worker that we haven't told schedule() about.
            w := mr.workers[i]
            go func() { ch <- w }() // send without holding the lock.
            i = i + 1
        } else {
            // wait for Register() to add an entry to workers[]
            // in response to an RPC from a new worker.
            mr.newCond.Wait()
        }
        mr.Unlock()
    }
}

schedule()

shedule()雖然不長,但實現起來還是有點難度的。
waitGroup用來判斷任務是否完成。
registerChan來監聽是否有新的 worker 啟動,如果有的話,就啟動一個線程來運行該 worker。通過新開線程來運行新 worker的邏輯比較符合分散式 MR 的特點。
對於 宕掉的worker執行call()操作時,會返回false
每開始執行一個任務,就讓waitGroup減一,而執行失敗(call()返回 false)則將waitGroup加一,代表會將該任務安排給其他 worker。

waitGroup.Wait()則會等到任務完全執行完返回。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    // All ntasks tasks have to be scheduled on workers, and only once all of
    // them have been completed successfully should the function return.
    // Remember that workers may fail, and that any given worker may finish
    // multiple tasks.

    waitGroup := sync.WaitGroup{}
    waitGroup.Add(ntasks)

    taskChan := make(chan int, ntasks)
    for i:=0;i<ntasks;i++  {
        taskChan <- i
    }

    go func() {
        for {
            ch := <- registerChan
            go func(c string) {
                for {
                    i := <- taskChan
                    if call(c,"Worker.DoTask", &DoTaskArgs{jobName,
                        mapFiles[i],phase,i,n_other},new(struct{})){
                        waitGroup.Done()
                    } else{
                        taskChan <- i
                    }
                }
            }(ch)
        }
    }()

    waitGroup.Wait()

    fmt.Printf("Schedule: %v phase done\n", phase)
}

RunWorker()

通過RunWorker() 來增加 worker。
nRPC來控制 worker 的壽命,每接收一次 rpc 請求就 -1s。如果初始值為 -1,則代表改 worker 是永生的。

// RunWorker sets up a connection with the master, registers its address, and
// waits for tasks to be scheduled.
func RunWorker(MasterAddress string, me string,
    MapFunc func(string, string) []KeyValue,
    ReduceFunc func(string, []string) string,
    nRPC int,
) {
    debug("RunWorker %s\n", me)
    wk := new(Worker)
    wk.name = me
    wk.Map = MapFunc
    wk.Reduce = ReduceFunc
    wk.nRPC = nRPC
    rpcs := rpc.NewServer()
    rpcs.Register(wk)
    os.Remove(me) // only needed for "unix"
    l, e := net.Listen("unix", me)
    if e != nil {
        log.Fatal("RunWorker: worker ", me, " error: ", e)
    }
    wk.l = l
    wk.register(MasterAddress)

    // DON'T MODIFY CODE BELOW
    for {
        wk.Lock()
        if wk.nRPC == 0 {
            wk.Unlock()
            break
        }
        wk.Unlock()
        conn, err := wk.l.Accept()
        if err == nil {
            wk.Lock()
            wk.nRPC--
            wk.Unlock()
            go rpcs.ServeConn(conn)
        } else {
            break
        }
    }
    wk.l.Close()
    debug("RunWorker %s exit\n", me)
}

Part V: Inverted index generation

第五部分是實現倒排索引。此處要求的倒排索引,就是在輸出結果時,需要將出現過 key 值文件的文件名在 key 值後面輸出。
功能是通過完成 mapF()reduceF() 來實現的。

mapF()

將key 值所在文件的文件名賦給 kv對 的value。

func mapF(document string, value string) (res []mapreduce.KeyValue) {
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    var strSlice []string = strings.FieldsFunc(value,f)
    var kvSlice []mapreduce.KeyValue
    for _,str := range strSlice {
        kvSlice = append(kvSlice, mapreduce.KeyValue{str, document})
    }

    return kvSlice
}

reduceF()

將相同 key 值的所有 value 打包並統計數量返回。

func reduceF(key string, values []string) string {
    var cnt int64
    var documents string
    set := make(map[string]bool)
    for _,str := range values{
        set[str] = true
    }
    var keys []string
    for key := range set{
        if set[key] == false{
            continue
        }
        keys = append(keys,key)
    }
    sort.Strings(keys)
    for _,key := range keys{
        cnt++
        if cnt >= 2{
            documents += ","
        }
        documents += key
    }
    //return strconv.FormatInt(cnt,10)
    return strconv.FormatInt(cnt,10) + " " + documents
}
 

後記

從剛開始的無從下手,到現在通過Lab1全部測試,MR 實驗算是完全做完了,還是很有成就感的。
除了對 MR 有一個更深的理解之外,也深深感受到了優秀系統的魅力——功能強大,結構簡潔。
同時又瞭解了一門新語言——GoLang,一門專門為高併發系統而設計的語言,用起來還是很舒服的。
但這畢竟是分散式系統的第一個實驗,欠缺的知識還很多,繼續努力。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 零、創建一個.Net Core 2.0 的ConsoleApp 應用,建完就是這個樣子了。 添加Log4Net 的引用,(不想看可以不看,個人習慣)Install-Package log4net添加Config文件夾往文件夾裡面添加Log4net.xml(別忘記了設置Copy always)添加Lo ...
  • ②command對象用來操作資料庫。(三個重要的方法:ExecuteNonQuery(),ExecuteReader(),ExecuteScalar()) ⑴以update(改數據)為例,用到ExecuteNonQuery()方法(執行SQL語句,返回受影響行) 點擊事件(button2) 執行前數 ...
  • 本來想用正則Split一下sql語句中簡單場景的的GO,於是用^GO$(配合忽略大小寫和多行模式),可居然連這種情況都搞不掂: 如果刪掉$就能匹配了,但這顯然不是辦法,遂又在VS的C#交互視窗、RegexTester(.net寫的)、chrome控制台等地方試,發現只有chrome能匹配,而只要是基 ...
  • 最近有一個疑問:IList已經繼承了ICollection<T>,而ICollection<T>繼承了 IEnumerable<T>, IEnumerable,那為什麼IList還要繼承 IEnumerable<T>, IEnumerable? 於是我自己寫了介面測試:用dnSpy反編譯看到,Tes ...
  • ASP.NET Core MVC的Model Binding會將HTTP Request數據,以映射的方式對應到參數中。基本上跟ASP.NET MVC差不多,但能Binding的來源更多了一些。本篇將介紹ASP.NET Core的Model Binding。 Model Binding 要接收Cli ...
  • 巨硬build後發了15.7.1滿載期待的升級了。。結果NM淚奔................... 為啥 淚奔? 使用Chrome 調試閃退,MMP ,一想肯定是VS的鍋咯,各種抓頭髮。。 試試看看VS配置發現 ,多了點東西。。 都勾上後,瞬間跑起來了,但是問題來了,每次會新運行一個Chrome ...
  • 原文地址:http://www.cnblogs.com/qingyuan/archive/2010/05/11/1732415.html 1.什麼是委托,為什麼要使用委托 我正在埋頭苦寫程式,突然想喝水,但是又不想自己去掉杯水而打斷自己的思路,於是我就想讓女朋友去給我倒水。她去給我倒水,首先我得讓她 ...
  • Dapper.net的速度很快,最近看源碼,原來他orm的實現是通過編寫大量IL代碼實現的。 使用DynamicMethod,自己編織一個給實體賦值的方法。這種寫法效率很高,接近直接對屬性賦值。比使用反射賦值效率高10倍左右。 下麵分別使用Emit,反射,直接賦值100000次來進行對比測試。 下麵 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...