Mit6.824 是我在學習一些分散式系統方面的知識的時候偶然看到的,然後就開始嘗試跟課。不得不說,國外的課程難度是真的大,一周的時間居然要學一門 Go 語言,然後還要讀論文,進而做MapReduce 實驗。 由於 MR(MapReduce) 框架需要建立在 DFS(Distributed... ...
前言
Mit6.824 是我在學習一些分散式系統方面的知識的時候偶然看到的,然後就開始嘗試跟課。不得不說,國外的課程難度是真的大,一周的時間居然要學一門 Go 語言,然後還要讀論文,進而做MapReduce 實驗。
由於 MR(MapReduce) 框架需要建立在 DFS(Distributed File System)的基礎上實現,所以本實驗是通過使用多線程來模擬分散式環境。雖然難度上大大降低,但是通過該實驗,還是會讓我們對 MR 的核心原理有一個較為深刻的認識。
做實驗之前我們需要先把經典的 MapReduce 論文給看了,窩比較建議直接看英文原文,但如果時間不充裕的話,可以直接在網上找中文的翻譯版。
剛開始做這個實驗的時候真的是一頭霧水,完全不知道如何下手。後來發現這個工程有一個自動化測試文件(test_test.go),每部分實驗都會使用這個測試文件里的函數對代碼進行測試。我們只要順著這個測試函數逐步倒推,然後補全代碼即可。
Part I: Map/Reduce input and output
第一部分是先實現一個順序版(sequential)的MR,讓我們對 MR 的流程有一個大體的認識,並且實現doMap()
和 doReduce()
兩個函數。
其包含兩個測試函數TestSequentialSingle()
和 TestSequentialMany()
。
TestSequentialSingle()
每個map worker處理一個文件,所以map worker的數量就等於文件的數量。
測試單個map worker 和 reduce worker。
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
TestSequentialMany()
此測試函數測試多個 map worker 和多個 reduce worker。
其運行邏輯和TestSequentialSingle
類似。
func TestSequentialMany(t *testing.T) {
mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
Sequential()
測試函數將工作名稱,測試文件,reduce 的數量,用戶定義的 map 函數,reduce 函數
五個實參傳遞給Sequential()
// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
Sequential()
首先獲取一個Master
對象的指針,然後利用函數閉包運行Master.run()
。
Master.run()
// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
schedule func(phase jobPhase),
finish func(),
) {
mr.jobName = jobName
mr.files = files
mr.nReduce = nreduce
fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)
schedule(mapPhase)
schedule(reducePhase)
finish()
mr.merge()
fmt.Printf("%s: Map/Reduce task completed\n", mr.address)
mr.doneChannel <- true
}
doMap()
doMap()
和 doReduce()
是需要我們去實現的函數。
doMap()
的實現主要是將用戶定義的MapFunc()
切割的文本,通過 hash 分到 'nReduce'個切片中去。
func doMap(
jobName string, // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {
// read contents from 'infile'
dat,err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal("doMap: readFile ", err)
}
//transfer data into ‘kvSlice’ according to the mapF()
kvSlice := mapF(inFile, string(dat))
//divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
for i:=0;i<nReduce;i++ {
s1 := make([]KeyValue,0)
reduceKv = append(reduceKv, s1)
}
for _,kv := range kvSlice{
hash := ihash(kv.Key) % nReduce
reduceKv[hash] = append(reduceKv[hash],kv)
}
//write 'reduceKv' into ‘nReduce’ JSON files
for i := 0;i<nReduce;i++ {
file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
if err != nil {
log.Fatal("doMap: create ", err)
}
enc := json.NewEncoder(file)
for _, kv := range reduceKv[i]{
err := enc.Encode(&kv)
if err != nil {
log.Fatal("doMap: json encodem ", err)
}
}
file.Close()
}
}
doReduce()
doReduce()
主要是將 key 值相同的 value 打包發送給用戶定義的 ReduceFunc()
,獲得一個新的 kv對,key 值不變,而value值則是ReduceFunc()
的返回值,排序,最後將新的 kv對 切片寫入文件。
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//read kv slice from the json file
var kvSlice []KeyValue
for i := 0;i<nMap;i++{
//file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
if err != nil {
log.Fatal("doReduce: open ", err)
}
var kv KeyValue
dec := json.NewDecoder(file)
for{
err := dec.Decode(&kv)
kvSlice = append(kvSlice,kv)
if err == io.EOF {
break
}
}
file.Close()
/********/
//此處如果用 defer,可能會造成文件開啟過多,造成程式崩潰
/********/
}
//sort the intermediate kv slices by key
sort.Sort(ByKey(kvSlice))
//process kv slices in the reduceF()
var reduceFValue []string
var outputKv []KeyValue
var preKey string = kvSlice[0].Key
for i,kv := range kvSlice{
if i == (len(kvSlice) - 1) {
reduceFValue = append(reduceFValue, kv.Value)
outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
} else {
if kv.Key != preKey {
outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
reduceFValue = make([]string, 0)
}
reduceFValue = append(reduceFValue, kv.Value)
}
preKey = kv.Key
}
//write the reduce output as JSON encoded kv objects to the file named outFile
file,err := os.Create(outFile)
if err != nil {
log.Fatal("doRuduce: create ", err)
}
defer file.Close()
enc := json.NewEncoder(file)
for _, kv := range outputKv{
err := enc.Encode(&kv)
if err != nil {
log.Fatal("doRuduce: json encode ", err)
}
}
}
Part II: Single-worker word count
第二部分是實現mapF()
和 reduceF()
函數,來實現通過順序 MR統計詞頻的功能。
比較簡單,就直接放代碼了。
func mapF(filename string, contents string) []mapreduce.KeyValue {
f := func(c rune) bool {
return !unicode.IsLetter(c)
}
var strSlice []string = strings.FieldsFunc(contents,f)
var kvSlice []mapreduce.KeyValue
for _,str := range strSlice {
kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"})
}
return kvSlice
}
func reduceF(key string, values []string) string {
var cnt int64
for _,str := range values{
temp,err := strconv.ParseInt(str,10,64)
if(err != nil){
fmt.Println("wc :parseint ",err)
}
cnt += temp
}
return strconv.FormatInt(cnt,10)
}
Part III: Distributing MapReduce tasks && Part IV: Handling worker failures
第三部分和第四部分可以一起來做,主要是完成schedule()
,實現一個通過線程併發執行 map worker 和 reduce worker 的 MR 框架。框架通過 RPC 來模擬分散式計算,並要帶有 worker 的容災功能。
TestBasic()
測試函數啟動兩個線程運行RUnWoker()
。
func TestBasic(t *testing.T) {
mr := setup()
for i := 0; i < 2; i++ {
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
setup() && Distributed()
func setup() *Master {
files := makeInputs(nMap)
master := port("master")
mr := Distributed("test", files, nReduce, master)
return mr
}
通過mr.startRPCServer()
啟動 master 的 RPC 伺服器,然後通過 mr.run()
進行 worker 的調度。
// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
ch := make(chan string)
go mr.forwardRegistrations(ch)
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}
Master.forwardRegistrations()
該函數通過worker 的數量來判斷是否有新 worker 啟動,一旦發現有新的 worker 啟動,則使用管道(ch)通知schedule()
。
理解該函數對實現後面的schedule()
至關重要。
// helper function that sends information about all existing
// and newly registered workers to channel ch. schedule()
// reads ch to learn about workers.
func (mr *Master) forwardRegistrations(ch chan string) {
i := 0
for {
mr.Lock()
if len(mr.workers) > i {
// there's a worker that we haven't told schedule() about.
w := mr.workers[i]
go func() { ch <- w }() // send without holding the lock.
i = i + 1
} else {
// wait for Register() to add an entry to workers[]
// in response to an RPC from a new worker.
mr.newCond.Wait()
}
mr.Unlock()
}
}
schedule()
shedule()
雖然不長,但實現起來還是有點難度的。
waitGroup
用來判斷任務是否完成。
registerChan
來監聽是否有新的 worker 啟動,如果有的話,就啟動一個線程來運行該 worker。通過新開線程來運行新 worker的邏輯比較符合分散式 MR 的特點。
對於 宕掉的worker執行call()
操作時,會返回false
。
每開始執行一個任務,就讓waitGroup
減一,而執行失敗(call()
返回 false
)則將waitGroup
加一,代表會將該任務安排給其他 worker。
waitGroup.Wait()
則會等到任務完全執行完返回。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
waitGroup := sync.WaitGroup{}
waitGroup.Add(ntasks)
taskChan := make(chan int, ntasks)
for i:=0;i<ntasks;i++ {
taskChan <- i
}
go func() {
for {
ch := <- registerChan
go func(c string) {
for {
i := <- taskChan
if call(c,"Worker.DoTask", &DoTaskArgs{jobName,
mapFiles[i],phase,i,n_other},new(struct{})){
waitGroup.Done()
} else{
taskChan <- i
}
}
}(ch)
}
}()
waitGroup.Wait()
fmt.Printf("Schedule: %v phase done\n", phase)
}
RunWorker()
通過RunWorker()
來增加 worker。
nRPC
來控制 worker 的壽命,每接收一次 rpc 請求就 -1s。如果初始值為 -1,則代表改 worker 是永生的。
// RunWorker sets up a connection with the master, registers its address, and
// waits for tasks to be scheduled.
func RunWorker(MasterAddress string, me string,
MapFunc func(string, string) []KeyValue,
ReduceFunc func(string, []string) string,
nRPC int,
) {
debug("RunWorker %s\n", me)
wk := new(Worker)
wk.name = me
wk.Map = MapFunc
wk.Reduce = ReduceFunc
wk.nRPC = nRPC
rpcs := rpc.NewServer()
rpcs.Register(wk)
os.Remove(me) // only needed for "unix"
l, e := net.Listen("unix", me)
if e != nil {
log.Fatal("RunWorker: worker ", me, " error: ", e)
}
wk.l = l
wk.register(MasterAddress)
// DON'T MODIFY CODE BELOW
for {
wk.Lock()
if wk.nRPC == 0 {
wk.Unlock()
break
}
wk.Unlock()
conn, err := wk.l.Accept()
if err == nil {
wk.Lock()
wk.nRPC--
wk.Unlock()
go rpcs.ServeConn(conn)
} else {
break
}
}
wk.l.Close()
debug("RunWorker %s exit\n", me)
}
Part V: Inverted index generation
第五部分是實現倒排索引。此處要求的倒排索引,就是在輸出結果時,需要將出現過 key 值文件的文件名在 key 值後面輸出。
功能是通過完成 mapF()
和 reduceF()
來實現的。
mapF()
將key 值所在文件的文件名賦給 kv對 的value。
func mapF(document string, value string) (res []mapreduce.KeyValue) {
f := func(c rune) bool {
return !unicode.IsLetter(c)
}
var strSlice []string = strings.FieldsFunc(value,f)
var kvSlice []mapreduce.KeyValue
for _,str := range strSlice {
kvSlice = append(kvSlice, mapreduce.KeyValue{str, document})
}
return kvSlice
}
reduceF()
將相同 key 值的所有 value 打包並統計數量返回。
func reduceF(key string, values []string) string {
var cnt int64
var documents string
set := make(map[string]bool)
for _,str := range values{
set[str] = true
}
var keys []string
for key := range set{
if set[key] == false{
continue
}
keys = append(keys,key)
}
sort.Strings(keys)
for _,key := range keys{
cnt++
if cnt >= 2{
documents += ","
}
documents += key
}
//return strconv.FormatInt(cnt,10)
return strconv.FormatInt(cnt,10) + " " + documents
}
後記
從剛開始的無從下手,到現在通過Lab1全部測試,MR 實驗算是完全做完了,還是很有成就感的。
除了對 MR 有一個更深的理解之外,也深深感受到了優秀系統的魅力——功能強大,結構簡潔。
同時又瞭解了一門新語言——GoLang,一門專門為高併發系統而設計的語言,用起來還是很舒服的。
但這畢竟是分散式系統的第一個實驗,欠缺的知識還很多,繼續努力。