1、原視頻地址 https://www.bilibili.com/video/BV1ME411A73k/?spm_id_from=333.1007.top_right_bar_window_custom_collection.content.click&vd_source=33b50a4dd201d ...
6.824 lab1 筆記
1. 閱讀論文
略
2. 官網rules & hints
2.1 rules
- map階段每個worker應該把中間文件分成nReduce份,nReduce是reduce任務的數量
- worker完成reduce任務後生成文件名
mr-out-X
mr-out-X
文件每行應該是"%v %v"
格式,參考main/mrsequential.go
- worker處理完map任務,應該把生成的中間文件放到當前目錄中,便於worker執行reduce任務時讀取中間文件
- 當所有任務完成時,
Done()
函數應該返回true,使得coordinator
退出 - 所有任務完成時,worker應該退出,方法是:
- 當worker調用rpc向coordinator請求任務時,連接不上coordinator,此時可以認為coordinator已經退出因為所有任務已經完成了
- 當worker調用rpc向coordinator請求任務時,coordinator可以向其回覆所有任務已經完成
2.2 hints
-
剛開始可以修改
mr/worker.go
's ``Worker()向coordinator 發送一個RPC請求一個任務。然後修改coordinator回覆一個文件名,代表空閑的map任務。然後worker根據文件名讀取文件,調用
wc.so-Map函數,調用Map函數可參考
mrsequential.go` -
如果修改了
mr/
目錄下任何文件,應該重新build MapReduce plugins,go build -buildmode=plugin ../mrapps/wc.go
-
worker處理完map任務後產生的中間文件命名格式
mr-X-Y
,x是map任務的編號,y是reduce任務編號。// 初始文件,通過命令行傳入的,如 // pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt // len(files) = 3 nReduce = 4 // 中間文件 x:map任務的編號 y:reduce任務編號 // mr-0-0 mr-1-0 mr-2-0 // mr-0-1 mr-1-1 mr-2-1 // mr-0-2 mr-1-2 mr-2-2 // mr-0-3 mr-1-3 mr-2-3
-
map任務存儲數據到文件可以使用json格式,便於reduce任務讀取
// map enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv) // reduce dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
-
map階段使用
ihash(key)
函數把key映射到哪個reduce任務,如某個worker取得了2號map任務,ihash("apple") = 1
,那麼就應該把該key放到mr-2-1
文件中 -
可以參考
mrsequential.go
代碼:讀取初始輸入文件、排序key、存儲reduce輸出文件 -
coordinator是rpc server,將會被併發訪問,需要對共用變數加鎖
-
若當前未有空閑的map任務可以分配,worker應該等待一段時間再請求任務,若worker頻繁請求任務,coordinator就會頻繁加鎖、訪問數據、釋放鎖,浪費資源和時間。如使用
time.Sleep()
,worker可以每隔一秒發送一次請求任務rpc -
coordinator無法辨別某個worker是否crash,有可能某個worker還在運行,但是運行極其慢(由於硬體損壞等原因),最好的辦法是:coordinator監控某個任務,若該任務未在規定時間內由worker報告完成,那麼coordinator可以把該任務重新分配給其他worker,該lab規定超時時間是10s
-
為了確保某個worker在寫入文件時,不會有其他worker同時寫入;又或者是某個worker寫入文件時中途退出了,只寫了部分數據,不能讓這個沒寫完的文件讓其他worker看到。可以使用臨時文件
ioutil.TempFile
,當寫入全部完成時,再使用原子重命名os.Rename
。 -
Go RPC只能傳struct中大寫字母開頭的變數
-
調用RPC
call()
函數時,reply struct應該為空,不然會報錯reply := SomeType{} call(..., &reply)
3. 架構設計
3.1 RPC設計
在該lab中,我們需要兩個RPC,一個是callTask RPC
向coordinator請求一個任務,一個是callTaskDone RPC
向coordinator報告某個任務的完成,以下皆在rpc.go
中定義
-
首先定義一個枚舉變數,表示coordinator給worker分配的任務類型,也可用來表示coordinator當前的phase
type taskType int const ( // map任務 mapType taskType = iota // reduce任務 reduceType // 當前沒有空閑任務,請等待 waitting // 已經完成全部任務,可以退出了 done )
-
定義拉取任務RPC的args和reply struct
CallTaskArgs
中不需要包含變數,只需要讓coordinator知道該worker正在請求一個任務,coordinator會隨機選擇空閑任務進行分配填入CallTaskReply
中CallTaskReply
包含以下變數:FileName
:map階段,worker需要知道具體的文件名才能解析該文件tp
:指示該任務的具體類型TaskID
:worker將該變數放入CallTaskDoneArgs
中,coordinator可以迅速定位Task[TaskID],並且在reduce階段中,搭配nFiles
變數,worker讀取mr-0-TaskID
、mr-1-TaskID
....mr-nFiles-1-TaskID
文件nFiles
:初始文件的數量,用於搭配TaskID
,在上面已介紹nReduce
:用於map階段,ihash(key) % nReduce
type CallTaskArgs struct { } type CallTaskReply struct { FileName string TaskID int tp taskType nFiles int nReduce int }
-
定義報告任務完成RPC的args和reply struct
TaskID
變數作用在CallTaskReply: TaskID
中提及tp
的作用是用於coordinator判斷該RPC是否是合法的,舉例:worker-1成功請求到map-1任務,但是因為worker-1節點硬體問題處理緩慢而導致coordinator檢測到該map-1任務超時,於是把map-1任務分配給了worker-2。等到某個時間點,已經完成所有map任務,coordinator進入到了reduce階段,但此時worker-1節點才剛運行完map-1任務並報告給coordinator,coordinator檢測到當前是reduce階段,但收到報告完成的rpc是map類型,不會對其進行任何操作。type CallTaskDoneArgs struct { TaskID int tp taskType } type CallTaskDoneReply struct { }
3.2 Coordinator
3.2.1 結構體設計
type taskState int
const (
spare taskState = iota
executing
finish
)
type task struct {
fileName string
id int
state taskState
start time.Time
}
首先設計一個task struct,該結構體代表一個任務
filename
:在map階段,用於coordinator告知worker要讀取的初始文件id
: 該任務的id,傳給worker,作用在RPC設計中提及state
:任務有三個狀態:空閑、執行中、已完成。若空閑則可以分配給worker;若執行中,則監視該任務是否超時start
:任務剛開始執行的時間
type Coordinator struct {
// Your definitions here.
mu sync.Mutex
state taskType
tasks []*task
mapChan chan *task
reduceChan chan *task
nReduce int
nFiles int
finished int
}
接著設計主要Coordinator結構體
state
:當前系統的狀態,map階段(分配map任務)、reduce階段(分配reduce任務)、全部完成done(可以結束系統運行)tasks
: *task的切片,維護了一組任務mapChan
、reduceChan
:用於分發map、reduce任務的channel。map階段,若有空閑map任務,則放至channel中,當有worker請求任務時,則可取出來。reduce階段同理finished
:當前已完成任務的數量。map階段,若finished == nFiles
,則表示所有map任務完成,可以進入reduce階段。reduce階段同理,進入done
3.2.2 初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.mapPhase(files, nReduce)
go c.watch()
c.server()
return &c
}
func (c *Coordinator) mapPhase(files []string, nReduce int) {
c.state = mapType //設置系統狀態為map階段
c.nReduce = nReduce
c.nFiles = len(files)
c.tasks = make([]*task, c.nFiles)
c.mapChan = make(chan *task, c.nFiles) // c.nFiles長度的map channel
for i := 0; i < c.nFiles; i++ {
c.tasks[i] = &task{fileName: files[i], id: i}
c.mapChan <- c.tasks[i] // 剛開始所有任務都是空閑狀態,放入channel中
}
}
系統剛開始時即map階段,mapPhase
初始化Coordinator結構體。然後啟動c.watch()
協程,用於監視任務是否超時,放後面講
3.2.3 分配任務
func (c *Coordinator) CallTask(args *CallTaskArgs, reply *CallTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == done {
reply.Tp = done
} else if c.state == mapType {
switch len(c.mapChan) > 0 {
case true:
task := <-c.mapChan
c.setReply(task, reply)
case false:
reply.Tp = waitting
}
} else {
switch len(c.reduceChan) > 0 {
case true:
task := <-c.reduceChan
c.setReply(task, reply)
case false:
reply.Tp = waitting
}
}
return nil
}
func (c *Coordinator) setReply(t *task, reply *CallTaskReply) {
if t.state == finish {
reply.Tp = waitting
return
}
reply.Tp = c.state
reply.TaskID = t.id
reply.NReduce = c.nReduce
reply.NFiles = c.nFiles
reply.FileName = t.fileName
t.state = executing
t.start = time.Now()
}
分配任務的主要函數,worker處會調用call("Coordinator.CallTask", &args, &reply)
。
- 若當前系統狀態為done,則返回done,告知worker可以退出了
- 若當前系統狀態為map階段:如果當前有任務可以分配
len(c.mapChan) > 0
,則取出一個task,調用c.setReply(task, reply)
,將任務的相關信息填入reply中,並把task的當前狀態設為執行中,開始時間設為time.Now()
。如果沒有可分配的任務,則設reply.Tp = waitting
,讓worker等待一會再請求任務 - 若當前系統狀態為reduce階段:同上
3.2.4 任務完成
func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state != args.Tp || c.state == done {
return nil
}
if c.tasks[args.TaskID].state != finish {
c.tasks[args.TaskID].state = finish
c.finished++
//fmt.Printf("task %v done\n", args.TaskID)
if c.state == mapType && c.finished == c.nFiles {
c.reducePhase()
} else if c.state == reduceType && c.finished == c.nReduce {
close(c.reduceChan)
c.state = done
}
}
return nil
}
func (c *Coordinator) reducePhase() {
//fmt.Printf("reduce phase\n")
close(c.mapChan)
c.state = reduceType
c.tasks = make([]*task, c.nReduce)
c.finished = 0
c.reduceChan = make(chan *task, c.nReduce)
for i := 0; i < c.nReduce; i++ {
c.tasks[i] = &task{id: i}
c.reduceChan <- c.tasks[i]
}
}
worker處會調用call("Coordinator.CallTaskDone", &args, &reply)
來報告某任務的完成
首先判斷c.state != args.Tp
,即報告完成的任務類型和當前系統狀態不匹配,可能發生在該情況:work-1請求了map-1任務,但是work-1運行太緩慢導致Coordinator監測到map-1任務超時,於是把map-1任務分配給了work-2。當所有map任務完成時,Coordinator進入了reduce階段,這時work-1才報告map-1任務完成,與當前系統狀態不匹配,故會直接返回
若該任務未完成,則將該任務標記未已完成,c.finished++
。
- 如果當前為map階段並且所有map任務已完成
c.state == mapType && c.finished == c.nFiles
,則進入reduce階段:- 關閉map channel
- 將系統狀態設為reduce
- 重置c.tasks為一系列reduce任務
- 創建長度為c.nReduce的reduce channel
- 放入任務
- 如果當前為reduce階段並且所有map任務已完成
c.state == reduceType && c.finished == c.nReduce
,則進入done階段:- 關閉reduce channel
- 將系統狀態設為done
3.2.5 監測超時任務goroutine
func (c *Coordinator) watch() {
for {
time.Sleep(time.Second)
c.mu.Lock()
if c.state == done {
return
}
for _, task := range c.tasks {
if task.state == executing && time.Since(task.start) > timeout {
task.state = spare
switch c.state {
case mapType:
c.mapChan <- task
case reduceType:
c.reduceChan <- task
}
}
}
c.mu.Unlock()
}
}
如果當前系統狀態為done了,可以退出協程了
迴圈任務列表,如果該任務狀態是正在執行但是超時了time.Since(task.start) > timeout
(time.Since可以計算系統當前時間距離start過去了多少時間),將該任務狀態重置為空閑狀態,並且根據系統當前狀態,把該任務重新放入對應的channel中
3.3 Worker
3.3.1 主流程
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
args := CallTaskArgs{}
reply := CallTaskReply{}
ok := call("Coordinator.CallTask", &args, &reply)
//now := time.Now()
if ok {
switch reply.Tp {
case mapType:
executeMap(reply.FileName, reply.NReduce, reply.TaskID, mapf)
case reduceType:
executeReduce(reply.NFiles, reply.TaskID, reducef)
case waitting:
time.Sleep(time.Second * 2)
continue
case done:
os.Exit(0)
}
} else {
time.Sleep(time.Second * 2)
continue
}
//fmt.Printf("finish task: %v %v use %v\n", reply.TaskID, rs(reply.Tp), time.Since(now).Seconds())
a := CallTaskDoneArgs{reply.TaskID, reply.Tp}
r := CallTaskDoneReply{}
call("Coordinator.CallTaskDone", &a, &r)
time.Sleep(time.Second * 2)
}
}
首先向Coordinator發送請求任務rpc:
- map任務:執行
- reduce任務:執行
- waitting:當前Coordinator沒有空閑任務,sleep一段時間再請求
- done:所有任務已完成,退出
任務執行完成後,報告任務完成
3.3.2 執行map任務
func executeMap(fileName string, nReduce, taskID int, mapf func(string, string) []KeyValue) {
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", fileName)
}
file.Close()
kva := mapf(fileName, string(content))
// 上面的代碼參考mrsequential.go
files := []*os.File{}
tmpFileNames := []string{}
encoders := []*json.Encoder{}
for i := 0; i < nReduce; i++ {
tempFile, err := ioutil.TempFile("./", "")
if err != nil {
log.Fatalf("cannot open temp file")
}
files = append(files, tempFile)
tmpFileNames = append(tmpFileNames, tempFile.Name())
encoders = append(encoders, json.NewEncoder(tempFile))
}
for _, kv := range kva {
n := ihash(kv.Key) % nReduce
encoders[n].Encode(kv)
}
for i := 0; i < nReduce; i++ {
files[i].Close()
os.Rename(tmpFileNames[i], "./"+intermediateFileName(taskID, i))
}
}
在當前目錄創建nReduce個臨時文件ioutil.TempFile("./", "")
,使用該臨時文件創建json.Encoder
(在hints第四條),使用ihash
函數把每個key映射到哪個文件,寫入json格式,然後對每個臨時文件重命名為mr-x-y
格式
生成中間文件名函數:
func intermediateFileName(x, y int) string {
return fmt.Sprintf("mr-%v-%v", x, y)
}
3.3.3 執行reduce
func executeReduce(nFiles, taskID int, reducef func(string, []string) string) {
kvs := []KeyValue{}
for i := 0; i < nFiles; i++ {
filename := intermediateFileName(i, taskID)
// 讀取每個中間文件
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 參考hints第四條,從文件中取出json格式的每條數據
decoder := json.NewDecoder(file)
for {
var kv KeyValue
// 已讀到文件末尾
if err := decoder.Decode(&kv); err != nil {
break
}
kvs = append(kvs, kv)
}
file.Close()
}
// 參考mrsequential.go
oname := fmt.Sprintf("mr-out-%v", taskID)
tempFile, _ := ioutil.TempFile("./", "")
tempFileName := tempFile.Name()
sort.Sort(ByKey(kvs))
for i := 0; i < len(kvs); {
j := i + 1
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
output := reducef(kvs[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", kvs[i].Key, output)
i = j
}
tempFile.Close()
os.Rename(tempFileName, "./"+oname)
}