mit 6.824 lab1分析

来源:https://www.cnblogs.com/lt6668964/archive/2023/04/10/17304413.html
-Advertisement-
Play Games

1、原視頻地址 https://www.bilibili.com/video/BV1ME411A73k/?spm_id_from=333.1007.top_right_bar_window_custom_collection.content.click&vd_source=33b50a4dd201d ...


6.824 lab1 筆記

1. 閱讀論文

2. 官網rules & hints

2.1 rules

  1. map階段每個worker應該把中間文件分成nReduce份,nReduce是reduce任務的數量
  2. worker完成reduce任務後生成文件名mr-out-X
  3. mr-out-X文件每行應該是"%v %v"格式,參考main/mrsequential.go
  4. worker處理完map任務,應該把生成的中間文件放到當前目錄中,便於worker執行reduce任務時讀取中間文件
  5. 當所有任務完成時,Done()函數應該返回true,使得coordinator退出
  6. 所有任務完成時,worker應該退出,方法是:
    1. 當worker調用rpc向coordinator請求任務時,連接不上coordinator,此時可以認為coordinator已經退出因為所有任務已經完成了
    2. 當worker調用rpc向coordinator請求任務時,coordinator可以向其回覆所有任務已經完成

2.2 hints

  1. 剛開始可以修改mr/worker.go's ``Worker()向coordinator 發送一個RPC請求一個任務。然後修改coordinator回覆一個文件名,代表空閑的map任務。然後worker根據文件名讀取文件,調用wc.so-Map函數,調用Map函數可參考mrsequential.go`

  2. 如果修改了mr/目錄下任何文件,應該重新build MapReduce plugins,go build -buildmode=plugin ../mrapps/wc.go

  3. worker處理完map任務後產生的中間文件命名格式mr-X-Y,x是map任務的編號,y是reduce任務編號。

    // 初始文件,通過命令行傳入的,如
    // pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt 
    // len(files) = 3 nReduce = 4
    // 中間文件  x:map任務的編號 y:reduce任務編號
    // mr-0-0 mr-1-0 mr-2-0
    // mr-0-1 mr-1-1 mr-2-1
    // mr-0-2 mr-1-2 mr-2-2
    // mr-0-3 mr-1-3 mr-2-3
    
  4. map任務存儲數據到文件可以使用json格式,便於reduce任務讀取

      // map
      enc := json.NewEncoder(file)
      for _, kv := ... {
        err := enc.Encode(&kv)
          
      // reduce
      dec := json.NewDecoder(file)
      for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil {
          break
        }
        kva = append(kva, kv)
      }
    
  5. map階段使用ihash(key)函數把key映射到哪個reduce任務,如某個worker取得了2號map任務,ihash("apple") = 1,那麼就應該把該key放到mr-2-1文件中

  6. 可以參考mrsequential.go代碼:讀取初始輸入文件、排序key、存儲reduce輸出文件

  7. coordinator是rpc server,將會被併發訪問,需要對共用變數加鎖

  8. 若當前未有空閑的map任務可以分配,worker應該等待一段時間再請求任務,若worker頻繁請求任務,coordinator就會頻繁加鎖、訪問數據、釋放鎖,浪費資源和時間。如使用time.Sleep(),worker可以每隔一秒發送一次請求任務rpc

  9. coordinator無法辨別某個worker是否crash,有可能某個worker還在運行,但是運行極其慢(由於硬體損壞等原因),最好的辦法是:coordinator監控某個任務,若該任務未在規定時間內由worker報告完成,那麼coordinator可以把該任務重新分配給其他worker,該lab規定超時時間是10s

  10. 為了確保某個worker在寫入文件時,不會有其他worker同時寫入;又或者是某個worker寫入文件時中途退出了,只寫了部分數據,不能讓這個沒寫完的文件讓其他worker看到。可以使用臨時文件ioutil.TempFile,當寫入全部完成時,再使用原子重命名os.Rename

  11. Go RPC只能傳struct中大寫字母開頭的變數

  12. 調用RPC call() 函數時,reply struct應該為空,不然會報錯

      reply := SomeType{}
      call(..., &reply)
    

3. 架構設計

3.1 RPC設計

在該lab中,我們需要兩個RPC,一個是callTask RPC向coordinator請求一個任務,一個是callTaskDone RPC向coordinator報告某個任務的完成,以下皆在rpc.go中定義

  1. 首先定義一個枚舉變數,表示coordinator給worker分配的任務類型,也可用來表示coordinator當前的phase

    type taskType int
    
    const (
        // map任務
    	mapType taskType = iota
        // reduce任務
    	reduceType
        // 當前沒有空閑任務,請等待
        waitting
        // 已經完成全部任務,可以退出了
    	done
    )
    
  2. 定義拉取任務RPC的args和reply struct

    CallTaskArgs中不需要包含變數,只需要讓coordinator知道該worker正在請求一個任務,coordinator會隨機選擇空閑任務進行分配填入CallTaskReply

    CallTaskReply包含以下變數:

    • FileName:map階段,worker需要知道具體的文件名才能解析該文件
    • tp:指示該任務的具體類型
    • TaskID:worker將該變數放入CallTaskDoneArgs中,coordinator可以迅速定位Task[TaskID],並且在reduce階段中,搭配nFiles變數,worker讀取mr-0-TaskIDmr-1-TaskID....mr-nFiles-1-TaskID文件
    • nFiles:初始文件的數量,用於搭配TaskID,在上面已介紹
    • nReduce:用於map階段,ihash(key) % nReduce
    type CallTaskArgs struct {
    }
    type CallTaskReply struct {
    	FileName string
    	TaskID   int
    	tp       taskType
    	nFiles   int
    	nReduce  int
    }
    
  3. 定義報告任務完成RPC的args和reply struct

    TaskID變數作用在CallTaskReply: TaskID 中提及

    tp的作用是用於coordinator判斷該RPC是否是合法的,舉例:worker-1成功請求到map-1任務,但是因為worker-1節點硬體問題處理緩慢而導致coordinator檢測到該map-1任務超時,於是把map-1任務分配給了worker-2。等到某個時間點,已經完成所有map任務,coordinator進入到了reduce階段,但此時worker-1節點才剛運行完map-1任務並報告給coordinator,coordinator檢測到當前是reduce階段,但收到報告完成的rpc是map類型,不會對其進行任何操作。

    type CallTaskDoneArgs struct {
    	TaskID int
    	tp     taskType
    }
    type CallTaskDoneReply struct {
    }
    

3.2 Coordinator

3.2.1 結構體設計

type taskState int

const (
	spare taskState = iota
	executing
	finish
)

type task struct {
	fileName string
	id       int
	state    taskState
	start    time.Time
}

首先設計一個task struct,該結構體代表一個任務

  • filename:在map階段,用於coordinator告知worker要讀取的初始文件
  • id: 該任務的id,傳給worker,作用在RPC設計中提及
  • state:任務有三個狀態:空閑、執行中、已完成。若空閑則可以分配給worker;若執行中,則監視該任務是否超時
  • start:任務剛開始執行的時間
type Coordinator struct {
	// Your definitions here.
	mu         sync.Mutex
	state      taskType
	tasks      []*task
	mapChan    chan *task
	reduceChan chan *task
	nReduce    int
	nFiles     int
	finished   int
}

接著設計主要Coordinator結構體

  • state:當前系統的狀態,map階段(分配map任務)、reduce階段(分配reduce任務)、全部完成done(可以結束系統運行)
  • tasks: *task的切片,維護了一組任務
  • mapChanreduceChan:用於分發map、reduce任務的channel。map階段,若有空閑map任務,則放至channel中,當有worker請求任務時,則可取出來。reduce階段同理
  • finished:當前已完成任務的數量。map階段,若finished == nFiles,則表示所有map任務完成,可以進入reduce階段。reduce階段同理,進入done

3.2.2 初始化

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapPhase(files, nReduce)
	go c.watch()
	c.server()
	return &c
}

func (c *Coordinator) mapPhase(files []string, nReduce int) {
	c.state = mapType                 //設置系統狀態為map階段
	c.nReduce = nReduce        
	c.nFiles = len(files)
	c.tasks = make([]*task, c.nFiles)
	c.mapChan = make(chan *task, c.nFiles) // c.nFiles長度的map channel
	for i := 0; i < c.nFiles; i++ {
		c.tasks[i] = &task{fileName: files[i], id: i}
		c.mapChan <- c.tasks[i]            // 剛開始所有任務都是空閑狀態,放入channel中
	}
}

系統剛開始時即map階段,mapPhase初始化Coordinator結構體。然後啟動c.watch()協程,用於監視任務是否超時,放後面講

3.2.3 分配任務

func (c *Coordinator) CallTask(args *CallTaskArgs, reply *CallTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state == done {
		reply.Tp = done
	} else if c.state == mapType {
		switch len(c.mapChan) > 0 {
		case true:
			task := <-c.mapChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	} else {
		switch len(c.reduceChan) > 0 {
		case true:
			task := <-c.reduceChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	}
	return nil
}

func (c *Coordinator) setReply(t *task, reply *CallTaskReply) {
	if t.state == finish {
		reply.Tp = waitting
		return
	}
	reply.Tp = c.state
	reply.TaskID = t.id
	reply.NReduce = c.nReduce
	reply.NFiles = c.nFiles
	reply.FileName = t.fileName
	t.state = executing
	t.start = time.Now()
}

分配任務的主要函數,worker處會調用call("Coordinator.CallTask", &args, &reply)

  1. 若當前系統狀態為done,則返回done,告知worker可以退出了
  2. 若當前系統狀態為map階段:如果當前有任務可以分配len(c.mapChan) > 0,則取出一個task,調用c.setReply(task, reply),將任務的相關信息填入reply中,並把task的當前狀態設為執行中,開始時間設為time.Now()。如果沒有可分配的任務,則設reply.Tp = waitting,讓worker等待一會再請求任務
  3. 若當前系統狀態為reduce階段:同上

3.2.4 任務完成

func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state != args.Tp || c.state == done {
		return nil
	}
	if c.tasks[args.TaskID].state != finish {
		c.tasks[args.TaskID].state = finish
		c.finished++
		//fmt.Printf("task %v done\n", args.TaskID)
		if c.state == mapType && c.finished == c.nFiles {
			c.reducePhase()
		} else if c.state == reduceType && c.finished == c.nReduce {
			close(c.reduceChan)
			c.state = done
		}
	}
	return nil
}

func (c *Coordinator) reducePhase() {
	//fmt.Printf("reduce phase\n")
	close(c.mapChan)
	c.state = reduceType
	c.tasks = make([]*task, c.nReduce)
	c.finished = 0
	c.reduceChan = make(chan *task, c.nReduce)
	for i := 0; i < c.nReduce; i++ {
		c.tasks[i] = &task{id: i}
		c.reduceChan <- c.tasks[i]
	}
}

worker處會調用call("Coordinator.CallTaskDone", &args, &reply)來報告某任務的完成

首先判斷c.state != args.Tp,即報告完成的任務類型和當前系統狀態不匹配,可能發生在該情況:work-1請求了map-1任務,但是work-1運行太緩慢導致Coordinator監測到map-1任務超時,於是把map-1任務分配給了work-2。當所有map任務完成時,Coordinator進入了reduce階段,這時work-1才報告map-1任務完成,與當前系統狀態不匹配,故會直接返回

若該任務未完成,則將該任務標記未已完成,c.finished++

  1. 如果當前為map階段並且所有map任務已完成c.state == mapType && c.finished == c.nFiles,則進入reduce階段:
    1. 關閉map channel
    2. 將系統狀態設為reduce
    3. 重置c.tasks為一系列reduce任務
    4. 創建長度為c.nReduce的reduce channel
    5. 放入任務
  2. 如果當前為reduce階段並且所有map任務已完成c.state == reduceType && c.finished == c.nReduce,則進入done階段:
    1. 關閉reduce channel
    2. 將系統狀態設為done

3.2.5 監測超時任務goroutine

func (c *Coordinator) watch() {
	for {
		time.Sleep(time.Second)
		c.mu.Lock()
		if c.state == done {
			return
		}
		for _, task := range c.tasks {
			if task.state == executing && time.Since(task.start) > timeout {
				task.state = spare
				switch c.state {
				case mapType:
					c.mapChan <- task
				case reduceType:
					c.reduceChan <- task
				}
			}
		}
		c.mu.Unlock()
	}
}

如果當前系統狀態為done了,可以退出協程了

迴圈任務列表,如果該任務狀態是正在執行但是超時了time.Since(task.start) > timeout(time.Since可以計算系統當前時間距離start過去了多少時間),將該任務狀態重置為空閑狀態,並且根據系統當前狀態,把該任務重新放入對應的channel中

3.3 Worker

3.3.1 主流程

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		args := CallTaskArgs{}
		reply := CallTaskReply{}
		ok := call("Coordinator.CallTask", &args, &reply)
		//now := time.Now()
		if ok {
			switch reply.Tp {
			case mapType:
				executeMap(reply.FileName, reply.NReduce, reply.TaskID, mapf)
			case reduceType:
				executeReduce(reply.NFiles, reply.TaskID, reducef)
			case waitting:
				time.Sleep(time.Second * 2)
				continue
			case done:
				os.Exit(0)
			}
		} else {
			time.Sleep(time.Second * 2)
			continue
		}
		//fmt.Printf("finish task: %v %v use %v\n", reply.TaskID, rs(reply.Tp), time.Since(now).Seconds())
		a := CallTaskDoneArgs{reply.TaskID, reply.Tp}
		r := CallTaskDoneReply{}
		call("Coordinator.CallTaskDone", &a, &r)
		time.Sleep(time.Second * 2)
	}
}

首先向Coordinator發送請求任務rpc:

  1. map任務:執行
  2. reduce任務:執行
  3. waitting:當前Coordinator沒有空閑任務,sleep一段時間再請求
  4. done:所有任務已完成,退出

任務執行完成後,報告任務完成

3.3.2 執行map任務

func executeMap(fileName string, nReduce, taskID int, mapf func(string, string) []KeyValue) {
	file, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", fileName)
	}
	file.Close()
	kva := mapf(fileName, string(content))
    // 上面的代碼參考mrsequential.go
	files := []*os.File{}
	tmpFileNames := []string{}
	encoders := []*json.Encoder{}
	for i := 0; i < nReduce; i++ {
		tempFile, err := ioutil.TempFile("./", "")
		if err != nil {
			log.Fatalf("cannot open temp file")
		}
		files = append(files, tempFile)
		tmpFileNames = append(tmpFileNames, tempFile.Name())
		encoders = append(encoders, json.NewEncoder(tempFile))
	}
	for _, kv := range kva {
		n := ihash(kv.Key) % nReduce
		encoders[n].Encode(kv)
	}
	for i := 0; i < nReduce; i++ {
		files[i].Close()
		os.Rename(tmpFileNames[i], "./"+intermediateFileName(taskID, i))
	}
}

在當前目錄創建nReduce個臨時文件ioutil.TempFile("./", ""),使用該臨時文件創建json.Encoder(在hints第四條),使用ihash函數把每個key映射到哪個文件,寫入json格式,然後對每個臨時文件重命名為mr-x-y格式

生成中間文件名函數:

func intermediateFileName(x, y int) string {
	return fmt.Sprintf("mr-%v-%v", x, y)
}

3.3.3 執行reduce

func executeReduce(nFiles, taskID int, reducef func(string, []string) string) {
	kvs := []KeyValue{}
	for i := 0; i < nFiles; i++ {
		filename := intermediateFileName(i, taskID)
        // 讀取每個中間文件
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
        // 參考hints第四條,從文件中取出json格式的每條數據
		decoder := json.NewDecoder(file)
		for {
			var kv KeyValue
            // 已讀到文件末尾
			if err := decoder.Decode(&kv); err != nil {
				break
			}
			kvs = append(kvs, kv)
		}
		file.Close()
	}
    // 參考mrsequential.go
	oname := fmt.Sprintf("mr-out-%v", taskID)
	tempFile, _ := ioutil.TempFile("./", "")
	tempFileName := tempFile.Name()
	sort.Sort(ByKey(kvs))
	for i := 0; i < len(kvs); {
		j := i + 1
		for j < len(kvs) && kvs[j].Key == kvs[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvs[k].Value)
		}
		output := reducef(kvs[i].Key, values)
		fmt.Fprintf(tempFile, "%v %v\n", kvs[i].Key, output)
		i = j
	}
	tempFile.Close()
	os.Rename(tempFileName, "./"+oname)
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文旨在通過部署微前端項目的實踐過程中沉澱出一套部署方案,針對項目分別部署在不同的伺服器上的場景,就一些重點步驟、碰到的問題做了一些總結。 ...
  • 回到十年前,前端技術就像一名戴著厚重眼鏡的書呆子,總是小心翼翼,被各種各樣的瀏覽器相容性問題欺負(就像在小學被欺負一樣)。 ...
  • 本文使用Three.js實現一個可以應用到實際項目中的3D線上看房案例。通過閱讀本文和實踐案例,你將學到的知識包括:使用 Three.js 實現多個場景的載入和場景間的絲滑過渡切換、隨著空間一直和角度實時變化的房源小地圖、在全景場景中添加如地面指引、空間物體展示、房間標註等多種類型的交互熱點等。 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 瞭解排序演算法的優缺點和適用場景是非常重要的,因為在實際開發中,需要根據實際情況選擇最合適的排序演算法。不同的排序演算法適用於不同的場景,有的演算法適用於小規模的數據集,有的演算法適用於大規模的數據集,有的演算法適用於穩定排序,有的演算法適用於不穩定排 ...
  • 在開發中我們有時候需要每隔 一段時間發送一次電子郵件,或者在某個特定的時間進行發送郵件,無需手動去操作,基於這樣的情況下我們需要用到了定時任務,一般可以寫個定時器,來完成相應的需求,在 node.js 中自已實現也非常容易,接下來要介紹的是node-schedule來完成定時任務 ...
  • @Configuration 標註在類上,啟動 Spring 會自動掃描@Configuration註解的類,將其註冊到IOC容器並實例化bean對象。如果在@Configuration註解的類中使用@Bean註解某個類對象的方法,Spring也會自動將註解了@Bean的方法註冊到IOC容器,併進行 ...
  • 何為請求限流? 請求限流是一種控制API或其他Web服務的流量的技術。它的目的是限制客戶端對伺服器發出的請求的數量或速率,以防止伺服器過載或響應時間變慢,從而提高系統的可用性和穩定性。 中小型項目請求限流的需求 按IP、用戶、全局限流 基於不同實現的限流設計(基於Redis或者LRU緩存) 基於註解 ...
  • 最近在開發過程中遇到一個sqlalchemy lost connection的報錯,記錄解決方法。 報錯信息 python後端開發,使用的框架是Fastapi + sqlalchemy。在一個介面請求中報錯如下: [2023-03-24 06:36:35 +0000] [217] [ERROR] E ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...