mit 6.824 lab1分析

来源:https://www.cnblogs.com/lt6668964/archive/2023/04/10/17304413.html
-Advertisement-
Play Games

1、原視頻地址 https://www.bilibili.com/video/BV1ME411A73k/?spm_id_from=333.1007.top_right_bar_window_custom_collection.content.click&vd_source=33b50a4dd201d ...


6.824 lab1 筆記

1. 閱讀論文

2. 官網rules & hints

2.1 rules

  1. map階段每個worker應該把中間文件分成nReduce份,nReduce是reduce任務的數量
  2. worker完成reduce任務後生成文件名mr-out-X
  3. mr-out-X文件每行應該是"%v %v"格式,參考main/mrsequential.go
  4. worker處理完map任務,應該把生成的中間文件放到當前目錄中,便於worker執行reduce任務時讀取中間文件
  5. 當所有任務完成時,Done()函數應該返回true,使得coordinator退出
  6. 所有任務完成時,worker應該退出,方法是:
    1. 當worker調用rpc向coordinator請求任務時,連接不上coordinator,此時可以認為coordinator已經退出因為所有任務已經完成了
    2. 當worker調用rpc向coordinator請求任務時,coordinator可以向其回覆所有任務已經完成

2.2 hints

  1. 剛開始可以修改mr/worker.go's ``Worker()向coordinator 發送一個RPC請求一個任務。然後修改coordinator回覆一個文件名,代表空閑的map任務。然後worker根據文件名讀取文件,調用wc.so-Map函數,調用Map函數可參考mrsequential.go`

  2. 如果修改了mr/目錄下任何文件,應該重新build MapReduce plugins,go build -buildmode=plugin ../mrapps/wc.go

  3. worker處理完map任務後產生的中間文件命名格式mr-X-Y,x是map任務的編號,y是reduce任務編號。

    // 初始文件,通過命令行傳入的,如
    // pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt 
    // len(files) = 3 nReduce = 4
    // 中間文件  x:map任務的編號 y:reduce任務編號
    // mr-0-0 mr-1-0 mr-2-0
    // mr-0-1 mr-1-1 mr-2-1
    // mr-0-2 mr-1-2 mr-2-2
    // mr-0-3 mr-1-3 mr-2-3
    
  4. map任務存儲數據到文件可以使用json格式,便於reduce任務讀取

      // map
      enc := json.NewEncoder(file)
      for _, kv := ... {
        err := enc.Encode(&kv)
          
      // reduce
      dec := json.NewDecoder(file)
      for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil {
          break
        }
        kva = append(kva, kv)
      }
    
  5. map階段使用ihash(key)函數把key映射到哪個reduce任務,如某個worker取得了2號map任務,ihash("apple") = 1,那麼就應該把該key放到mr-2-1文件中

  6. 可以參考mrsequential.go代碼:讀取初始輸入文件、排序key、存儲reduce輸出文件

  7. coordinator是rpc server,將會被併發訪問,需要對共用變數加鎖

  8. 若當前未有空閑的map任務可以分配,worker應該等待一段時間再請求任務,若worker頻繁請求任務,coordinator就會頻繁加鎖、訪問數據、釋放鎖,浪費資源和時間。如使用time.Sleep(),worker可以每隔一秒發送一次請求任務rpc

  9. coordinator無法辨別某個worker是否crash,有可能某個worker還在運行,但是運行極其慢(由於硬體損壞等原因),最好的辦法是:coordinator監控某個任務,若該任務未在規定時間內由worker報告完成,那麼coordinator可以把該任務重新分配給其他worker,該lab規定超時時間是10s

  10. 為了確保某個worker在寫入文件時,不會有其他worker同時寫入;又或者是某個worker寫入文件時中途退出了,只寫了部分數據,不能讓這個沒寫完的文件讓其他worker看到。可以使用臨時文件ioutil.TempFile,當寫入全部完成時,再使用原子重命名os.Rename

  11. Go RPC只能傳struct中大寫字母開頭的變數

  12. 調用RPC call() 函數時,reply struct應該為空,不然會報錯

      reply := SomeType{}
      call(..., &reply)
    

3. 架構設計

3.1 RPC設計

在該lab中,我們需要兩個RPC,一個是callTask RPC向coordinator請求一個任務,一個是callTaskDone RPC向coordinator報告某個任務的完成,以下皆在rpc.go中定義

  1. 首先定義一個枚舉變數,表示coordinator給worker分配的任務類型,也可用來表示coordinator當前的phase

    type taskType int
    
    const (
        // map任務
    	mapType taskType = iota
        // reduce任務
    	reduceType
        // 當前沒有空閑任務,請等待
        waitting
        // 已經完成全部任務,可以退出了
    	done
    )
    
  2. 定義拉取任務RPC的args和reply struct

    CallTaskArgs中不需要包含變數,只需要讓coordinator知道該worker正在請求一個任務,coordinator會隨機選擇空閑任務進行分配填入CallTaskReply

    CallTaskReply包含以下變數:

    • FileName:map階段,worker需要知道具體的文件名才能解析該文件
    • tp:指示該任務的具體類型
    • TaskID:worker將該變數放入CallTaskDoneArgs中,coordinator可以迅速定位Task[TaskID],並且在reduce階段中,搭配nFiles變數,worker讀取mr-0-TaskIDmr-1-TaskID....mr-nFiles-1-TaskID文件
    • nFiles:初始文件的數量,用於搭配TaskID,在上面已介紹
    • nReduce:用於map階段,ihash(key) % nReduce
    type CallTaskArgs struct {
    }
    type CallTaskReply struct {
    	FileName string
    	TaskID   int
    	tp       taskType
    	nFiles   int
    	nReduce  int
    }
    
  3. 定義報告任務完成RPC的args和reply struct

    TaskID變數作用在CallTaskReply: TaskID 中提及

    tp的作用是用於coordinator判斷該RPC是否是合法的,舉例:worker-1成功請求到map-1任務,但是因為worker-1節點硬體問題處理緩慢而導致coordinator檢測到該map-1任務超時,於是把map-1任務分配給了worker-2。等到某個時間點,已經完成所有map任務,coordinator進入到了reduce階段,但此時worker-1節點才剛運行完map-1任務並報告給coordinator,coordinator檢測到當前是reduce階段,但收到報告完成的rpc是map類型,不會對其進行任何操作。

    type CallTaskDoneArgs struct {
    	TaskID int
    	tp     taskType
    }
    type CallTaskDoneReply struct {
    }
    

3.2 Coordinator

3.2.1 結構體設計

type taskState int

const (
	spare taskState = iota
	executing
	finish
)

type task struct {
	fileName string
	id       int
	state    taskState
	start    time.Time
}

首先設計一個task struct,該結構體代表一個任務

  • filename:在map階段,用於coordinator告知worker要讀取的初始文件
  • id: 該任務的id,傳給worker,作用在RPC設計中提及
  • state:任務有三個狀態:空閑、執行中、已完成。若空閑則可以分配給worker;若執行中,則監視該任務是否超時
  • start:任務剛開始執行的時間
type Coordinator struct {
	// Your definitions here.
	mu         sync.Mutex
	state      taskType
	tasks      []*task
	mapChan    chan *task
	reduceChan chan *task
	nReduce    int
	nFiles     int
	finished   int
}

接著設計主要Coordinator結構體

  • state:當前系統的狀態,map階段(分配map任務)、reduce階段(分配reduce任務)、全部完成done(可以結束系統運行)
  • tasks: *task的切片,維護了一組任務
  • mapChanreduceChan:用於分發map、reduce任務的channel。map階段,若有空閑map任務,則放至channel中,當有worker請求任務時,則可取出來。reduce階段同理
  • finished:當前已完成任務的數量。map階段,若finished == nFiles,則表示所有map任務完成,可以進入reduce階段。reduce階段同理,進入done

3.2.2 初始化

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapPhase(files, nReduce)
	go c.watch()
	c.server()
	return &c
}

func (c *Coordinator) mapPhase(files []string, nReduce int) {
	c.state = mapType                 //設置系統狀態為map階段
	c.nReduce = nReduce        
	c.nFiles = len(files)
	c.tasks = make([]*task, c.nFiles)
	c.mapChan = make(chan *task, c.nFiles) // c.nFiles長度的map channel
	for i := 0; i < c.nFiles; i++ {
		c.tasks[i] = &task{fileName: files[i], id: i}
		c.mapChan <- c.tasks[i]            // 剛開始所有任務都是空閑狀態,放入channel中
	}
}

系統剛開始時即map階段,mapPhase初始化Coordinator結構體。然後啟動c.watch()協程,用於監視任務是否超時,放後面講

3.2.3 分配任務

func (c *Coordinator) CallTask(args *CallTaskArgs, reply *CallTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state == done {
		reply.Tp = done
	} else if c.state == mapType {
		switch len(c.mapChan) > 0 {
		case true:
			task := <-c.mapChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	} else {
		switch len(c.reduceChan) > 0 {
		case true:
			task := <-c.reduceChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	}
	return nil
}

func (c *Coordinator) setReply(t *task, reply *CallTaskReply) {
	if t.state == finish {
		reply.Tp = waitting
		return
	}
	reply.Tp = c.state
	reply.TaskID = t.id
	reply.NReduce = c.nReduce
	reply.NFiles = c.nFiles
	reply.FileName = t.fileName
	t.state = executing
	t.start = time.Now()
}

分配任務的主要函數,worker處會調用call("Coordinator.CallTask", &args, &reply)

  1. 若當前系統狀態為done,則返回done,告知worker可以退出了
  2. 若當前系統狀態為map階段:如果當前有任務可以分配len(c.mapChan) > 0,則取出一個task,調用c.setReply(task, reply),將任務的相關信息填入reply中,並把task的當前狀態設為執行中,開始時間設為time.Now()。如果沒有可分配的任務,則設reply.Tp = waitting,讓worker等待一會再請求任務
  3. 若當前系統狀態為reduce階段:同上

3.2.4 任務完成

func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state != args.Tp || c.state == done {
		return nil
	}
	if c.tasks[args.TaskID].state != finish {
		c.tasks[args.TaskID].state = finish
		c.finished++
		//fmt.Printf("task %v done\n", args.TaskID)
		if c.state == mapType && c.finished == c.nFiles {
			c.reducePhase()
		} else if c.state == reduceType && c.finished == c.nReduce {
			close(c.reduceChan)
			c.state = done
		}
	}
	return nil
}

func (c *Coordinator) reducePhase() {
	//fmt.Printf("reduce phase\n")
	close(c.mapChan)
	c.state = reduceType
	c.tasks = make([]*task, c.nReduce)
	c.finished = 0
	c.reduceChan = make(chan *task, c.nReduce)
	for i := 0; i < c.nReduce; i++ {
		c.tasks[i] = &task{id: i}
		c.reduceChan <- c.tasks[i]
	}
}

worker處會調用call("Coordinator.CallTaskDone", &args, &reply)來報告某任務的完成

首先判斷c.state != args.Tp,即報告完成的任務類型和當前系統狀態不匹配,可能發生在該情況:work-1請求了map-1任務,但是work-1運行太緩慢導致Coordinator監測到map-1任務超時,於是把map-1任務分配給了work-2。當所有map任務完成時,Coordinator進入了reduce階段,這時work-1才報告map-1任務完成,與當前系統狀態不匹配,故會直接返回

若該任務未完成,則將該任務標記未已完成,c.finished++

  1. 如果當前為map階段並且所有map任務已完成c.state == mapType && c.finished == c.nFiles,則進入reduce階段:
    1. 關閉map channel
    2. 將系統狀態設為reduce
    3. 重置c.tasks為一系列reduce任務
    4. 創建長度為c.nReduce的reduce channel
    5. 放入任務
  2. 如果當前為reduce階段並且所有map任務已完成c.state == reduceType && c.finished == c.nReduce,則進入done階段:
    1. 關閉reduce channel
    2. 將系統狀態設為done

3.2.5 監測超時任務goroutine

func (c *Coordinator) watch() {
	for {
		time.Sleep(time.Second)
		c.mu.Lock()
		if c.state == done {
			return
		}
		for _, task := range c.tasks {
			if task.state == executing && time.Since(task.start) > timeout {
				task.state = spare
				switch c.state {
				case mapType:
					c.mapChan <- task
				case reduceType:
					c.reduceChan <- task
				}
			}
		}
		c.mu.Unlock()
	}
}

如果當前系統狀態為done了,可以退出協程了

迴圈任務列表,如果該任務狀態是正在執行但是超時了time.Since(task.start) > timeout(time.Since可以計算系統當前時間距離start過去了多少時間),將該任務狀態重置為空閑狀態,並且根據系統當前狀態,把該任務重新放入對應的channel中

3.3 Worker

3.3.1 主流程

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		args := CallTaskArgs{}
		reply := CallTaskReply{}
		ok := call("Coordinator.CallTask", &args, &reply)
		//now := time.Now()
		if ok {
			switch reply.Tp {
			case mapType:
				executeMap(reply.FileName, reply.NReduce, reply.TaskID, mapf)
			case reduceType:
				executeReduce(reply.NFiles, reply.TaskID, reducef)
			case waitting:
				time.Sleep(time.Second * 2)
				continue
			case done:
				os.Exit(0)
			}
		} else {
			time.Sleep(time.Second * 2)
			continue
		}
		//fmt.Printf("finish task: %v %v use %v\n", reply.TaskID, rs(reply.Tp), time.Since(now).Seconds())
		a := CallTaskDoneArgs{reply.TaskID, reply.Tp}
		r := CallTaskDoneReply{}
		call("Coordinator.CallTaskDone", &a, &r)
		time.Sleep(time.Second * 2)
	}
}

首先向Coordinator發送請求任務rpc:

  1. map任務:執行
  2. reduce任務:執行
  3. waitting:當前Coordinator沒有空閑任務,sleep一段時間再請求
  4. done:所有任務已完成,退出

任務執行完成後,報告任務完成

3.3.2 執行map任務

func executeMap(fileName string, nReduce, taskID int, mapf func(string, string) []KeyValue) {
	file, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", fileName)
	}
	file.Close()
	kva := mapf(fileName, string(content))
    // 上面的代碼參考mrsequential.go
	files := []*os.File{}
	tmpFileNames := []string{}
	encoders := []*json.Encoder{}
	for i := 0; i < nReduce; i++ {
		tempFile, err := ioutil.TempFile("./", "")
		if err != nil {
			log.Fatalf("cannot open temp file")
		}
		files = append(files, tempFile)
		tmpFileNames = append(tmpFileNames, tempFile.Name())
		encoders = append(encoders, json.NewEncoder(tempFile))
	}
	for _, kv := range kva {
		n := ihash(kv.Key) % nReduce
		encoders[n].Encode(kv)
	}
	for i := 0; i < nReduce; i++ {
		files[i].Close()
		os.Rename(tmpFileNames[i], "./"+intermediateFileName(taskID, i))
	}
}

在當前目錄創建nReduce個臨時文件ioutil.TempFile("./", ""),使用該臨時文件創建json.Encoder(在hints第四條),使用ihash函數把每個key映射到哪個文件,寫入json格式,然後對每個臨時文件重命名為mr-x-y格式

生成中間文件名函數:

func intermediateFileName(x, y int) string {
	return fmt.Sprintf("mr-%v-%v", x, y)
}

3.3.3 執行reduce

func executeReduce(nFiles, taskID int, reducef func(string, []string) string) {
	kvs := []KeyValue{}
	for i := 0; i < nFiles; i++ {
		filename := intermediateFileName(i, taskID)
        // 讀取每個中間文件
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
        // 參考hints第四條,從文件中取出json格式的每條數據
		decoder := json.NewDecoder(file)
		for {
			var kv KeyValue
            // 已讀到文件末尾
			if err := decoder.Decode(&kv); err != nil {
				break
			}
			kvs = append(kvs, kv)
		}
		file.Close()
	}
    // 參考mrsequential.go
	oname := fmt.Sprintf("mr-out-%v", taskID)
	tempFile, _ := ioutil.TempFile("./", "")
	tempFileName := tempFile.Name()
	sort.Sort(ByKey(kvs))
	for i := 0; i < len(kvs); {
		j := i + 1
		for j < len(kvs) && kvs[j].Key == kvs[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvs[k].Value)
		}
		output := reducef(kvs[i].Key, values)
		fmt.Fprintf(tempFile, "%v %v\n", kvs[i].Key, output)
		i = j
	}
	tempFile.Close()
	os.Rename(tempFileName, "./"+oname)
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文旨在通過部署微前端項目的實踐過程中沉澱出一套部署方案,針對項目分別部署在不同的伺服器上的場景,就一些重點步驟、碰到的問題做了一些總結。 ...
  • 回到十年前,前端技術就像一名戴著厚重眼鏡的書呆子,總是小心翼翼,被各種各樣的瀏覽器相容性問題欺負(就像在小學被欺負一樣)。 ...
  • 本文使用Three.js實現一個可以應用到實際項目中的3D線上看房案例。通過閱讀本文和實踐案例,你將學到的知識包括:使用 Three.js 實現多個場景的載入和場景間的絲滑過渡切換、隨著空間一直和角度實時變化的房源小地圖、在全景場景中添加如地面指引、空間物體展示、房間標註等多種類型的交互熱點等。 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 瞭解排序演算法的優缺點和適用場景是非常重要的,因為在實際開發中,需要根據實際情況選擇最合適的排序演算法。不同的排序演算法適用於不同的場景,有的演算法適用於小規模的數據集,有的演算法適用於大規模的數據集,有的演算法適用於穩定排序,有的演算法適用於不穩定排 ...
  • 在開發中我們有時候需要每隔 一段時間發送一次電子郵件,或者在某個特定的時間進行發送郵件,無需手動去操作,基於這樣的情況下我們需要用到了定時任務,一般可以寫個定時器,來完成相應的需求,在 node.js 中自已實現也非常容易,接下來要介紹的是node-schedule來完成定時任務 ...
  • @Configuration 標註在類上,啟動 Spring 會自動掃描@Configuration註解的類,將其註冊到IOC容器並實例化bean對象。如果在@Configuration註解的類中使用@Bean註解某個類對象的方法,Spring也會自動將註解了@Bean的方法註冊到IOC容器,併進行 ...
  • 何為請求限流? 請求限流是一種控制API或其他Web服務的流量的技術。它的目的是限制客戶端對伺服器發出的請求的數量或速率,以防止伺服器過載或響應時間變慢,從而提高系統的可用性和穩定性。 中小型項目請求限流的需求 按IP、用戶、全局限流 基於不同實現的限流設計(基於Redis或者LRU緩存) 基於註解 ...
  • 最近在開發過程中遇到一個sqlalchemy lost connection的報錯,記錄解決方法。 報錯信息 python後端開發,使用的框架是Fastapi + sqlalchemy。在一個介面請求中報錯如下: [2023-03-24 06:36:35 +0000] [217] [ERROR] E ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...