這次來聊聊Hadoop中使用廣泛的分散式計算方案——MapReduce。MapReduce是一種編程模型,還是一個分散式計算框架。 MapReduce作為一種編程模型功能強大,使用簡單。運算內容不只是常見的數據運算,幾乎大數據中常見的計算需求都可以通過它來實現。使用的時候僅僅需要通過實現Map和 ...
這次來聊聊Hadoop中使用廣泛的分散式計算方案——MapReduce。MapReduce是一種編程模型,還是一個分散式計算框架。
MapReduce作為一種編程模型功能強大,使用簡單。運算內容不只是常見的數據運算,幾乎大數據中常見的計算需求都可以通過它來實現。使用的時候僅僅需要通過實現Map和Reduce介面的方式來完成計算邏輯,其中Map的輸入是一對<Key, Value>,經過計算後輸出一對<Key, Value>;然後將相同Key合併,形成<Key, Value>集合;再將這個集合輸入Reduce。
下麵,就以WordCount為例,熟悉一下MapReduce:
WordCount是為了統計文本中不用辭彙出現的次數。如果統計一篇文本的內容,只需要寫一個程式將文本數據讀入記憶體,創建一個字典,記錄每個詞出現的次數就可以了。但是如果想統計互聯網中網頁的辭彙數量,就需要用MapReduce來解決。
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }
從上面代碼不難發現,MapReduce核心是一個map函數和一個reduce函數。map函數的輸入主要是一個<Key, Value>對,在這個例子里,Value是要統計的所有文本中的一行數據,Key在一般計算中都不會用到。map函數將文本中的單詞提取出來,針對每個單詞輸出一個<word, 1>。MapReduce計算框架會將這些<word , 1>,合併成<word , <1,1,1,1,1,1,1…>>,然後傳給reduce函數。reduce函數將這個集合里的1求和,再將word和這個sum組成一個<Key, Value>,也就是<word, sum>輸出。apReduce框架為每個數據塊分配一個map函數去計算,從而實現大數據的分散式計算。
MapReduce在運行過程中有三個關鍵進程,分別是Driver進程、JobTracker進程、TaskTracker進程。
1.Driver進程:是啟動MapReduce的主入口,主要是實現Map和Reduce類、輸入輸出文件路徑等,並提交作業給Hadoop集群,也就是下麵的JobTracker進程。
2.JobTracker進程:根據輸入數據數量,命令TaskTracker進程啟動相應數量的Map和Reduce進程,並管理整個生命周期的任務調度和監控。
3.TaskTracker進程:負責啟動和管理Map以及Reduce進程。因為需要每個數據塊都有對應的map函數,TaskTracker進程通常和HDFS的DataNode進程啟動在同一個伺服器。
JobTracker進程和TaskTracker進程是主從關係,同一時間提供服務的主伺服器通常只有一臺,從伺服器有多台,所有的從伺服器聽從主伺服器的控制和調度安排。主伺服器負責為應用程式分配伺服器資源以及作業執行的調度,而具體的計算操作則在從伺服器上完成。MapReduce的主伺服器就是JobTracker,從伺服器就是TaskTracker。
1.JobClient將包含MapReduce的JAR包存儲在HDFS中,將來這些JAR包會分發給Hadoop集群中的伺服器執行計算。
2.向JobTracker提交Job。
3.JobTracker根據調度策略創建JobInProcess樹,每個作業都會有一個自己的JobInProcess樹。
4.JobInProcess根據輸入數據的塊數和配置中的Reduce數目創建相應數量的TaskInProcess。
5.TaskTracker和JobTracker進行心跳通信。
6.如果TaskTracker有空閑的計算資源,JobTracker就會給它分配任務。
7.TaskTracker收到任務類型(是Map還是Reduce)和任務參數(JAR包路徑、輸入數據文件路徑),啟動相應的進程。
8.Map或者Reduce進程啟動後,檢查本地是否有要執行任務的JAR包文件,如果沒有,就去HDFS上下載,然後載入Map或者Reduce代碼開始執行。
9.如果是Map進程,從HDFS讀取數據;如果是Reduce進程,將結果寫出到HDFS。
我們僅僅是編寫一個map函數和一個reduce函數就可以了,不用關心這兩個函數是如何被分佈啟動到集群上的,也不用關心數據塊又是如何分配給計算任務的。
MapReduce框架要將一個相對簡單的程式,在分散式的大規模伺服器集群上並行執行起來卻並不簡單。理解MapReduce作業的啟動和運行機制,對理解大數據的核心原理,做到真正意義上把握大數據作用巨大。