概述 本lab將用go完成一個MapReduce框架,完成後將大大加深對MapReduce的理解。 Part I: Map/Reduce input and output 這部分需要我們實現common_map.go中的doMap()和common_reduce.go中的doReduce()兩個函數 ...
概述
本lab將用go完成一個MapReduce框架,完成後將大大加深對MapReduce的理解。
Part I: Map/Reduce input and output
這部分需要我們實現common_map.go中的doMap()和common_reduce.go中的doReduce()兩個函數。
可以先從測試用例下手:
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
從Sequential()開始調用鏈如下:
現在要做的是完成doMap()和doReduce()。
doMap():
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//打開inFile文件,讀取全部內容
//調用mapF,將內容轉換為鍵值對
//根據reduceName()返回的文件名,打開nReduce個中間文件,然後將鍵值對以json的格式保存到中間文件
inputContent, err := ioutil.ReadFile(inFile)
if err != nil {
panic(err)
}
keyValues := mapF(inFile, string(inputContent))
var intermediateFileEncoders []*json.Encoder
for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber++ {
intermediateFile, err := os.Create(reduceName(jobName, mapTask, reduceTaskNumber))
if err != nil {
panic(err)
}
defer intermediateFile.Close()
enc := json.NewEncoder(intermediateFile)
intermediateFileEncoders = append(intermediateFileEncoders, enc)
}
for _, kv := range keyValues {
err := intermediateFileEncoders[ihash(kv.Key) % nReduce].Encode(kv)
if err != nil {
panic(err)
}
}
}
總結來說就是:
- 讀取輸入文件內容
- 將內容交個用戶定義的Map函數執行,生成鍵值對
- 保存鍵值對
doReduce:
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//讀取當前reduceTaskNumber對應的中間文件中的鍵值對,將相同的key的value進行併合
//調用reduceF
//將reduceF的結果以json形式保存到mergeName()返回的文件中
kvs := make(map[string][]string)
for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber++ {
midDatafileName := reduceName(jobName, mapTaskNumber, reduceTask)
file, err := os.Open(midDatafileName)
if err != nil {
panic(err)
}
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv KeyValue
err = dec.Decode(&kv)
if err != nil {
break
}
values, ok := kvs[kv.Key]
if ok {
kvs[kv.Key] = append(values, kv.Value)
} else {
kvs[kv.Key] = []string{kv.Value}
}
}
}
outputFile, err := os.Create(outFile)
if err != nil {
panic(err)
}
defer outputFile.Close()
enc := json.NewEncoder(outputFile)
for key, values := range kvs {
enc.Encode(KeyValue{key, reduceF(key, values)})
}
}
總結:
- 讀取中間數據
- 執行reduceF
- 保存結果
文件轉換的過程大致如下:
Part II: Single-worker word count
這部分將用一個簡單的實例展示如何使用MR框架。需要我們實現main/wc.go中的mapF()和reduceF()來統計單詞的詞頻。
mapF:
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
words := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
var kvs []mapreduce.KeyValue
for _, word := range words {
kvs = append(kvs, mapreduce.KeyValue{word, "1"})
}
return kvs
}
將文本內容分割成單詞,每個單詞對應一個<word, "1">鍵值對。
reduceF:
func reduceF(key string, values []string) string {
// Your code here (Part II).
return strconv.Itoa(len(values))
}
value中有多少個"1",就說明這個word出現了幾次。
Part III: Distributing MapReduce tasks
目前實現的版本都是執行完一個map然後在執行下一個map,也就是說沒有並行,這恰恰是MapReduce最大的買點。這部分需要實現schedule(),該函數將任務分配給Worker去執行。當然這裡並沒有真正的多機部署,而是使用多線程進行模擬。
master和worker的關係大致如下:
在創建worker對象的時候會調用Register() RPC,master收到RPC後,將該worker的id保存在數組中,執行shedule()是可以根據該id,通過DoTask() RPC調用該worker的DoTask()執行map或reduce任務。
schedule.go
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
//總共有ntasks個任務,registerChan中保存著空閑的workers
taskChan := make(chan int)
var wg sync.WaitGroup
go func() {
for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
taskChan <- taskNumber
fmt.Printf("taskChan <- %d in %s\n", taskNumber, phase)
wg.Add(1)
}
wg.Wait() //ntasks個任務執行完畢後才能通過
close(taskChan)
}()
for task := range taskChan { //所有任務都處理完後跳出迴圈
worker := <- registerChan //消費worker
fmt.Printf("given task %d to %s in %s\n", task, worker, phase)
var arg DoTaskArgs
arg.JobName = jobName
arg.Phase = phase
arg.TaskNumber = task
arg.NumOtherPhase = n_other
if phase == mapPhase {
arg.File = mapFiles[task]
}
go func(worker string, arg DoTaskArgs) {
if call(worker, "Worker.DoTask", arg, nil) {
//執行成功後,worker需要執行其它任務
//註意:需要先掉wg.Done(),然後調register<-worker,否則會出現死鎖
//fmt.Printf("worker %s run task %d success in phase %s\n", worker, task, phase)
wg.Done()
registerChan <- worker //回收worker
} else {
//如果失敗了,該任務需要被重新執行
//註意:這裡不能用taskChan <- task,因為task這個變數在別的地方可能會被修改。比如task 0執行失敗了,我們這裡希望
//將task 0重新加入到taskChan中,但是因為執行for迴圈的那個goroutine,可能已經修改task這個變數為1了,我們錯誤地
//把task 1重新執行了一遍,並且task 0沒有得到執行。
taskChan <- arg.TaskNumber
}
}(worker, arg)
}
fmt.Printf("Schedule: %v done\n", phase)
}
這裡用到了兩個channel,分別是registerChan和taskChan。
registerChan中保存了可用的worker id。
生產:
- worker調用Register()進行註冊,往裡添加
- worker成功執行DoTask()後,該worker需要重新加入registerChan
消費:
- schedule()拿到一個任務後,消費registerChan
taskChan中保存了任務號。任務執行失敗需要重新加入taskChan。
Part IV: Handling worker failures
之前的代碼已經體現了,對於失敗的任務重新執行。
Part V: Inverted index generation
這是MapReduce的一個應用,生成倒排索引,比如想查某個單詞出現在哪些文本中,就可以建立倒排索引來解決。
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
var kvs []mapreduce.KeyValue
for _, word := range words {
kvs = append(kvs, mapreduce.KeyValue{word, document})
}
return kvs
}
func reduceF(key string, values []string) string {
// Your code here (Part V).
values = removeDuplicationAndSort(values)
return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}
func removeDuplicationAndSort(values []string) []string {
kvs := make(map[string]struct{})
for _, value := range values {
_, ok := kvs[value]
if !ok {
kvs[value] = struct{}{}
}
}
var ret []string
for k := range kvs {
ret = append(ret, k)
}
sort.Strings(ret)
return ret
}
mapF()生成<word, document>的鍵值對,reduceF()處理word對應的所有document,去重並且排序,然後拼接到一起。
具體代碼在:https://github.com/gatsbyd/mit_6.824_2018
如有錯誤,歡迎指正:
15313676365