MapReduce是Hadoop2.x的一個計算框架,利用分治的思想,將一個計算量很大的作業分給很多個任務,每個任務完成其中的一小部分,然後再將結果合併到一起。將任務分開處理的過程為map階段,將每個小任務的結果合併到一起的過程為reduce階段。下麵先從巨集觀上介紹一下客戶端提交一個作業時,Hado ...
MapReduce是Hadoop2.x的一個計算框架,利用分治的思想,將一個計算量很大的作業分給很多個任務,每個任務完成其中的一小部分,然後再將結果合併到一起。將任務分開處理的過程為map階段,將每個小任務的結果合併到一起的過程為reduce階段。下麵先從巨集觀上介紹一下客戶端提交一個作業時,Hadoop2.x各個組件之間的聯繫及處理流程。然後我們再具體看看MapReduce計算框架在執行一個作業時,做了些什麼。
YARN
YARN是Hadoop2.x框架下的資源管理系統,其組成部分為:
1)全局資源管理器(global resource manager):整個系統的資源管理和調配。
2)節點管理器(node manager)(每個節點都有一個)負責任務的啟動、配置及其資源的監控
3)針對每個應用程式的應用程式管理器(application-specific application master)(因為Hadoop2.x支持的計算框架有很多,不只是MapReduce,還有像storm、spark、Tez不同處理機制的計算框架,所以MapReduce是一種應用程式,每個MapReduce作業是MapReduce類型程式的一個實例)
4)調度器(scheduler)(在資源管理器里)
5)容器(container):一部分CPU和記憶體組成一個容器,最為資源使用,一個應用程式運行在一組容器中。
在瞭解了各個組件的功能之後,藉助下圖,我們看一下提交一個作業的流程:
1)客戶端向資源管理器提交作業程式,作業程式的類型決定了使用哪種應用程式管理器(MapReduce、storm、Tez...)
2)資源管理器協調資源,在一個節點上獲取一個運行應用程式管理器實例的容器
3)應用程式管理器(application master)在資源管理器中註冊
4)應用程式管理器通過資源請求與資源管理器協商資源,包括該容器所在的節點和該容器的詳細說明(CPU核數量和記憶體大小)
5)和 6)應用程式管理器在一個節點上或者多個節點上運行其Map Task和Reduce Task
7)在容器中運行的應用程式嚮應用程式管理器彙報執行度
8)應用程式執行完畢,應用程式管理器就會從資源管理器中取消註冊,作業占用的資源會釋放到系統中
MapReduce計算框架
MapReduce總的可以分為map階段、shuffle階段和reduce階段。
map階段
1)從HDFS中將輸入值傳輸到Mapper節點
除了傳輸之外,在讀取過程中,還需要做一個轉換過程,將數據轉換為鍵值對的形式(MapReduce處理的輸入必須為鍵值對的形式),這個過程通過InputFormat完成(預設為TextInputFormat)
2)Mapper
根據自己寫的Mapper函數對文件進行處理,同樣輸出的是鍵值對(如wordcount中統計收到的數據中每個詞出現的次數)
3)Partitioner
Patitioner根據Reducer的數量和自定義的劃分方法(沒有自定義的話,Hadoop有預設實現)去劃分Mapper的輸出;劃分的結果會按照Mapper輸出的鍵進行排序。
4)Combiner(這一步是可選的)
經過Partitioner排序後,如果作業中配置了Combiner,就會調用Combiner,Combiner就好像在Mapper端提前進行一下Reducer一樣。
那為什麼要提前進行呢?這是為了儘量減少對網路帶寬的需求,比如經典的wordcount程式,在Mapper端處理之後,我們可能得到一個像key = apple,value = {1,1,1,1,1,1}的結果,如果我們能先對其進行一下Combiner,那麼就能得到key = apple,value = 6的結果,傳輸這樣的數據,肯定是要比key = apple,value = {1,1,1,1,1,1}的數據節省帶寬的。
那既然能夠節省傳輸帶寬,為什麼又是可選的呢?何不每次都預設執行Combiner?這是因為並不是每一個Mapper都能進行Combiner;比如現在我們的任務要統計一段時間內的每天的最高氣溫,假設開始有兩個Mapper,輸出為(0,10,20)和(15,25),那麼提前進行Combiner可以使得傳遞給Reducer端的數據為(20, 25)這樣最後的結果還是為25,且傳輸的數據量變小;但是假如我們要求一段時間內的平均溫度呢?如果開始就在Mapper端進行Combiner求平均溫度,那麼Reducer端得到的數據為(10, 20),算出的平均溫度為15,但是實際上的平均溫度為(0, 10, 20 , 15 ,25)的平均,為14;所以需要搞清楚Combiner合適不合適提前進行。
shuffle階段
shuffle階段要做的事就是保證Mapper輸出的數據傳輸到合適的Reducer進行處理,如下圖所示:
shuffle階段,每個Reducer都會使用HTTP協議從Mapper節點獲得自己的劃分(Reducer通過Application Master來獲取自己應該查詢哪些Mapper節點來獲取自己劃分的信息,因為每個Mapper實例完成後,會通知Application Master運行階段產生的劃分)
reduce階段
1)Reducer
根據自己寫的reduce程式對數據進行處理(如wordcount中將每個單詞出現的次數加起來得到總和)
2)將處理結果輸出到HDFS
通過OutputFormat完成(預設是TextOutputFormat)
總結
通過對Hadoop2.x框架的處理流程和MapReduce計算框架的處理流程的梳理,可以在進行程式編寫時有一個更清楚的認識,下一步應該具體做些什麼。
參考:《Hadoop權威指南》