MapReduce 的輸入輸出 MapReduce 框架運轉在<key,value> 鍵值對上,也就是說,框架把作業的輸入看成是一組<key,value>鍵值對,同樣也產生一組<key,value>鍵值對作為作業的輸出,這兩組鍵值對可能是不同的。 一個 MapReduce 作業的輸入和輸出類型如下圖 ...
MapReduce 的輸入輸出
MapReduce 框架運轉在<key,value> 鍵值對上,也就是說,框架把作業的輸入看成是一組<key,value>鍵值對,同樣也產生一組<key,value>鍵值對作為作業的輸出,這兩組鍵值對可能是不同的。
一個 MapReduce 作業的輸入和輸出類型如下圖所示:可以看出在整個標準的流程中,會有三組<key,value>鍵值對類型的存在。
MapReduce的處理流程
1. Mapper任務執行過程詳解
第一階段是把輸入目錄下文件按照一定的標準逐個進行邏輯切片,形成切片規劃。預設情況下,Split size = Block size。每一個切片由一個MapTask 處理。(getSplits)
第二階段是對切片中的數據按照一定的規則解析成<key,value>對。預設規則是把每一行文本內容解析成鍵值對。key 是每一行的起始位置(單位是位元組),value 是本行的文本內容。(TextInputFormat)
第三階段是調用 Mapper 類中的 map 方法。上階段中每解析出來的一個<k,v>,調用一次 map 方法。每次調用 map 方法會輸出零個或多個鍵值對。
第四階段是按照一定的規則對第三階段輸出的鍵值對進行分區。預設是只有一個區。分區的數量就是 Reducer 任務運行的數量。預設只有一個Reducer 任務。
第五階段是對每個分區中的鍵值對進行排序。首先,按照鍵進行字典序排序,對於鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。
如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到文件中。
第六階段是對數據進行局部聚合處理,也就是 combiner 處理。鍵相等的鍵值對會調用一次 reduce 方法。經過這一階段,數據量會減少。 本階段預設是沒有的
2. Reducer 任務 任務 執行過程詳解
第一階段是 Reducer 任務會主動從 Mapper 任務複製其輸出的鍵值對。Mapper 任務可能會有很多,因此 Reducer 會複製多個 Mapper 的輸出。
第二階段是把複製到 Reducer 本地數據,全部進行合併,即把分散的數據合併成一個大的數據。再對合併後的數據排序。
第三階段是對排序後的鍵值對調用 reduce 方法。鍵相等的鍵值對調用一次reduce 方法,每次調用會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到 HDFS 文件中。
在整個MapReduce 程式的開發過程中,我們最大的工作量是覆蓋map 函數和覆蓋reduce 函數。
Mapreduce的combiner
每一個 map 都可能會產生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次合併,以減少在 map 和 reduce 節點之間的數據傳輸量,以提高網路 IO 性能,是 MapReduce 的一種優化手段之一。
- combiner 是 MR 程式中 Mapper 和 Reducer 之外的一種組件
- combiner 組件的父類就是 Reducer
- combiner 和 reducer 的區別在於運行的位置:
- Combiner 是在每一個 maptask 所在的節點運行
- Reducer 是接收全局所有 Mapper 的輸出結果;
- combiner 的意義就是對每一個 maptask 的輸出進行局部彙總,以減小網路傳輸量
- 具體實現步驟: 1、自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
- combiner 能夠應用的前提是不能影響最終的業務邏輯,而且,combiner 的輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來
- 2、在 job 中設置: job.setCombinerClass(CustomCombiner.class)
combiner本質上就是reduce 只不過是局部的reduce 進行局部彙總