MIT-6.824 lab1-MapReduce

来源:https://www.cnblogs.com/gatsby123/archive/2019/03/20/10567465.html
-Advertisement-
Play Games

概述 本lab將用go完成一個MapReduce框架,完成後將大大加深對MapReduce的理解。 Part I: Map/Reduce input and output 這部分需要我們實現common_map.go中的doMap()和common_reduce.go中的doReduce()兩個函數 ...


概述

本lab將用go完成一個MapReduce框架,完成後將大大加深對MapReduce的理解。

Part I: Map/Reduce input and output

這部分需要我們實現common_map.go中的doMap()和common_reduce.go中的doReduce()兩個函數。
可以先從測試用例下手:

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

從Sequential()開始調用鏈如下:
調用鏈
現在要做的是完成doMap()和doReduce()。

doMap():

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //打開inFile文件,讀取全部內容
    //調用mapF,將內容轉換為鍵值對
    //根據reduceName()返回的文件名,打開nReduce個中間文件,然後將鍵值對以json的格式保存到中間文件

    inputContent, err := ioutil.ReadFile(inFile)
    if err != nil {
        panic(err)
    }

    keyValues := mapF(inFile, string(inputContent))

    var intermediateFileEncoders []*json.Encoder
    for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber++ {
        intermediateFile, err := os.Create(reduceName(jobName, mapTask, reduceTaskNumber))
        if err != nil {
            panic(err)
        }
        defer intermediateFile.Close()
        enc := json.NewEncoder(intermediateFile)
        intermediateFileEncoders = append(intermediateFileEncoders, enc)
    }
    for _, kv := range keyValues {
        err := intermediateFileEncoders[ihash(kv.Key) % nReduce].Encode(kv)
        if err != nil {
            panic(err)
        }
    }
}

總結來說就是:

  1. 讀取輸入文件內容
  2. 將內容交個用戶定義的Map函數執行,生成鍵值對
  3. 保存鍵值對

doReduce:

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //讀取當前reduceTaskNumber對應的中間文件中的鍵值對,將相同的key的value進行併合
    //調用reduceF
    //將reduceF的結果以json形式保存到mergeName()返回的文件中

    kvs := make(map[string][]string)
    for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber++ {
        midDatafileName := reduceName(jobName, mapTaskNumber, reduceTask)
        file, err := os.Open(midDatafileName)
        if err != nil {
            panic(err)
        }
        defer file.Close()

        dec := json.NewDecoder(file)
        for {
            var kv KeyValue
            err = dec.Decode(&kv)
            if err != nil {
                break
            }
            values, ok := kvs[kv.Key]
            if ok {
                kvs[kv.Key] = append(values, kv.Value)
            } else {
                kvs[kv.Key] = []string{kv.Value}
            }
        }
    }

    outputFile, err := os.Create(outFile)
    if err != nil {
        panic(err)
    }
    defer outputFile.Close()
    enc := json.NewEncoder(outputFile)
    for key, values := range kvs {
        enc.Encode(KeyValue{key, reduceF(key, values)})
    }
}

總結:

  1. 讀取中間數據
  2. 執行reduceF
  3. 保存結果

文件轉換的過程大致如下:
文件轉換

Part II: Single-worker word count

這部分將用一個簡單的實例展示如何使用MR框架。需要我們實現main/wc.go中的mapF()和reduceF()來統計單詞的詞頻。

mapF:

func mapF(filename string, contents string) []mapreduce.KeyValue {
    // Your code here (Part II).
    words := strings.FieldsFunc(contents, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    var kvs []mapreduce.KeyValue
    for _, word := range words {
        kvs = append(kvs, mapreduce.KeyValue{word, "1"})
    }
    return kvs
}

將文本內容分割成單詞,每個單詞對應一個<word, "1">鍵值對。

reduceF:

func reduceF(key string, values []string) string {
    // Your code here (Part II).
    return strconv.Itoa(len(values))
}

value中有多少個"1",就說明這個word出現了幾次。

Part III: Distributing MapReduce tasks

目前實現的版本都是執行完一個map然後在執行下一個map,也就是說沒有並行,這恰恰是MapReduce最大的買點。這部分需要實現schedule(),該函數將任務分配給Worker去執行。當然這裡並沒有真正的多機部署,而是使用多線程進行模擬。
master和worker的關係大致如下:
master&worker
在創建worker對象的時候會調用Register() RPC,master收到RPC後,將該worker的id保存在數組中,執行shedule()是可以根據該id,通過DoTask() RPC調用該worker的DoTask()執行map或reduce任務。

schedule.go

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    //總共有ntasks個任務,registerChan中保存著空閑的workers
    taskChan := make(chan int)
    var wg sync.WaitGroup
    go func() {
        for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
            taskChan <- taskNumber
            fmt.Printf("taskChan <- %d in %s\n", taskNumber, phase)
            wg.Add(1)

        }

        wg.Wait()                           //ntasks個任務執行完畢後才能通過
        close(taskChan)
    }()


    for task := range taskChan {            //所有任務都處理完後跳出迴圈
        worker := <- registerChan         //消費worker
        fmt.Printf("given task %d to %s in %s\n", task, worker, phase)

        var arg DoTaskArgs
        arg.JobName = jobName
        arg.Phase = phase
        arg.TaskNumber = task
        arg.NumOtherPhase = n_other

        if phase == mapPhase {
            arg.File = mapFiles[task]
        }

        go func(worker string, arg DoTaskArgs) {
            if call(worker, "Worker.DoTask", arg, nil) {
                //執行成功後,worker需要執行其它任務
                //註意:需要先掉wg.Done(),然後調register<-worker,否則會出現死鎖
                //fmt.Printf("worker %s run task %d success in phase %s\n", worker, task, phase)
                wg.Done()
                registerChan <- worker  //回收worker
            } else {
                //如果失敗了,該任務需要被重新執行
                //註意:這裡不能用taskChan <- task,因為task這個變數在別的地方可能會被修改。比如task 0執行失敗了,我們這裡希望
                //將task 0重新加入到taskChan中,但是因為執行for迴圈的那個goroutine,可能已經修改task這個變數為1了,我們錯誤地
                //把task 1重新執行了一遍,並且task 0沒有得到執行。
                taskChan <- arg.TaskNumber
            }
        }(worker, arg)

    }
    fmt.Printf("Schedule: %v done\n", phase)

}

這裡用到了兩個channel,分別是registerChan和taskChan。
registerChan中保存了可用的worker id。
生產:

  1. worker調用Register()進行註冊,往裡添加
  2. worker成功執行DoTask()後,該worker需要重新加入registerChan

消費:

  1. schedule()拿到一個任務後,消費registerChan

taskChan中保存了任務號。任務執行失敗需要重新加入taskChan。

Part IV: Handling worker failures

之前的代碼已經體現了,對於失敗的任務重新執行。

Part V: Inverted index generation

這是MapReduce的一個應用,生成倒排索引,比如想查某個單詞出現在哪些文本中,就可以建立倒排索引來解決。

func mapF(document string, value string) (res []mapreduce.KeyValue) {
    // Your code here (Part V).
    words := strings.FieldsFunc(value, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    var kvs []mapreduce.KeyValue
    for _, word := range words {
        kvs = append(kvs, mapreduce.KeyValue{word, document})
    }
    return kvs
}

func reduceF(key string, values []string) string {
    // Your code here (Part V).
    values = removeDuplicationAndSort(values)
    return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}

func removeDuplicationAndSort(values []string) []string {
    kvs := make(map[string]struct{})
    for _, value := range values {
        _, ok := kvs[value]
        if !ok {
            kvs[value] = struct{}{}
        }
    }
    var ret []string
    for k := range kvs {
        ret = append(ret, k)
    }
    sort.Strings(ret)
    return ret
}

mapF()生成<word, document>的鍵值對,reduceF()處理word對應的所有document,去重並且排序,然後拼接到一起。

具體代碼在:https://github.com/gatsbyd/mit_6.824_2018
如有錯誤,歡迎指正:
15313676365


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、screen 命令不間斷會話 1、安裝screen(從系統鏡像作為yum倉庫安裝) 1.1、載入系統鏡像 1.2、mount /dev/cdrom /media/cdrom/ (掛在系統鏡像) vim /etc/fstab (添加開機啟動項) 1.3、yum倉庫配置 1.3.1、掛載系統鏡像 1 ...
  • 最近一直有安裝虛擬機的想法,今天剛剛知道win10有自帶的Linux子系統,就準備試一下: 首先要保證自己的電腦處於開發者選項: 然後就要在控制面板的程式和功能頁面點擊“啟用或者關閉WIndows功能‘ 然後愉快的等待重啟,對要很愉快! 重啟之後呢,按win+R,輸入cmd進入命令界面,輸入bash ...
  • 本文講述如何通過樹莓派的硬體PWM控制好盈電調來驅動RC車子的前進後退,以及如何驅動伺服電機來控制車子轉向。 1. 好盈電調簡介 車子上的電調型號為:WP-10BLS-A-RTR,在好盈官網並沒有搜到對應手冊,但找到一份通用RC競速車的電調使用說明,不過說明書中並沒有提及信號調製方式,繼續尋找,看到 ...
  • linux中刪除文件內空白行的幾種方法 有時你可能需要在 Linux 中刪除某個文件中的空行。如果是的,你可以使用下麵方法中的其中一個。有很多方法可以做到,但我在這裡只是列舉一些簡單的方法。 你可能已經知道 grep、awk 和 sed 命令是專門用來處理文本數據的工具。 下列 5 種方法可以做到。 ...
  • 1.linux 查看安裝了什麼擴展 2.查看文件位置 3. 查看php.ini位置 ...
  • 轉載自:http://www.cnblogs.com/wainiwann/p/3942203.html 在開發的一個基於rtmp聊天的程式時發現了一個很奇怪的現象。 在windows下當我們執行 closesocket的操作之後,阻塞的 recv會立即返回 1 。 而在linux下當我們執行clos ...
  • 作者:不悔 原文鏈接: "https://www.opsbj.com/2019/03/20/mysql install/" 常見的 MySQL 安裝方式有如下三種: 1. RPM 包方式:這種方式安裝適合對資料庫要求不太高的場合,安裝速度快; 2. 通用二進位包方式:安裝速度相較於源碼方式快,可以自 ...
  • 大數據是什麼? 首先提一個問題:“大數據"是一項專門的技術嗎?有的人可能會以為大數據是一項專門的技術,其實不是。“大數據"這三個字只是一門市場語言(Marketing Language),其背後是硬體、資料庫、操作系統、I-ladoop等一系列技術的綜合應用。 大數據導論 <!--[if gte v ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...