原創,轉發請註明出處。 MapReduce是hadoop這隻大象的核心,Hadoop 中,數據處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的數據集切分為若幹獨立的數據塊,由 map任務(task)以完全並行的方式處理它們。框架會對map的輸出 ...
原創,轉發請註明出處。
MapReduce是hadoop這隻大象的核心,Hadoop 中,數據處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的數據集切分為若幹獨立的數據塊,由 map任務(task)以完全並行的方式處理它們。框架會對map的輸出先進行排序, 然後把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。因此,我們的編程中心主要是 mapper階段和reducer階段。
下麵來從零開發一個MapReduce程式,併在hadoop集群上運行。
mapper代碼 map.py:
import sys for line in sys.stdin: word_list = line.strip().split(' ') for word in word_list: print '\t'.join([word.strip(), str(1)])View Code
reducer代碼 reduce.py:
import sys cur_word = None sum = 0 for line in sys.stdin: ss = line.strip().split('\t') if len(ss) < 2: continue word = ss[0].strip() count = ss[1].strip() if cur_word == None: cur_word = word if cur_word != word: print '\t'.join([cur_word, str(sum)]) cur_word = word sum = 0 sum += int(count) print '\t'.join([cur_word, str(sum)]) sum = 0View Code
資源文件 src.txt(測試用,在集群中跑時,記得上傳到hdfs上):
hello ni hao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni haoao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao Dad would get out his mandolin and play for the family Dad loved to play the mandolin for his family he knew we enjoyed singing I had to mature into a man and have children of my own before I realized how much he had sacrificed I had to,mature into a man and,have children of my own before.I realized how much he had sacrificedView Code
首先本地調試查看結果是否正確,輸入命令以下:
cat src.txt | python map.py | sort -k 1 | python reduce.py
命令行中輸出的結果:
a 2 and 2 and,have 1 ao 1 before 1 before.I 1 children 2 Dad 2 enjoyed 1 family 2 for 2 get 1 had 4 hao 33 haoao 1 haoni 3 have 1 he 3 hello 1 his 2 how 2 I 3 into 2 knew 1 loved 1 man 2 mandolin 2 mature 1 much 2 my 2 ni 34 of 2 out 1 own 2 play 2 realized 2 sacrificed 2 singing 1 the 2 to 2 to,mature 1 we 1 would 1View Code
通過調試發現本地調試,代碼是OK的。下麵扔到集群上面跑。為了方便,專門寫了一個腳本 run.sh,解放勞動力嘛。
HADOOP_CMD="/home/hadoop/hadoop/bin/hadoop" STREAM_JAR_PATH="/home/hadoop/hadoop/contrib/streaming/hadoop-streaming-1.2.1.jar" INPUT_FILE_PATH="/home/input/src.txt" OUTPUT_PATH="/home/output" $HADOOP_CMD fs -rmr $OUTPUT_PATH $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH \ -output $OUTPUT_PATH \ -mapper "python map.py" \ -reducer "python reduce.py" \ -file ./map.py \ -file ./reduce.py
下麵解析下腳本:
HADOOP_CMD: hadoop的bin的路徑 STREAM_JAR_PATH:streaming jar包的路徑 INPUT_FILE_PATH:hadoop集群上的資源輸入路徑 OUTPUT_PATH:hadoop集群上的結果輸出路徑。(註意:這個目錄不應該存在的,因此在腳本加了先刪除這個目錄。**註意****註意****註意**:若是第一次執行,沒有這個目錄,會報錯的。可以先手動新建一個新的output目錄。) $HADOOP_CMD fs -rmr $OUTPUT_PATH $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH \ -output $OUTPUT_PATH \ -mapper "python map.py" \ -reducer "python reduce.py" \ -file ./map.py \ -file ./reduce.py #這裡固定格式,指定輸入,輸出的路徑;指定mapper,reducer的文件;並分發mapper,reducer角色的我們用戶寫的代碼文件,因為集群其他的節點還沒有mapper、reducer的可執行文件。
輸入以下命令查看經過reduce階段後輸出的記錄:
cat src.txt | python map.py | sort -k 1 | python reduce.py | wc -l
命令行中輸出:43
在瀏覽器輸入:master:50030 查看任務的詳細情況。
Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts map 100.00% 2 0 0 2 0 0 / 0 reduce 100.00% 1 0 0 1 0 0 / 0
Map-Reduce Framework中看到這個。
Counter Map Reduce Total Reduce output records 0 0 43
證明整個過程成功。第一個hadoop程式開髮結束。