[TOC] 本章通過幾個案例詳細講解MapReduce程式的編寫與運行。 5.1 案例分析:單詞計數 假如有這樣一個例子,需要統計過去10年電腦論文中出現次數最多的幾個單詞,以分析當前的熱點研究議題是什麼。那麼,在將論文樣本收集完畢之後,接下來應該怎樣做呢? 這一經典的單詞計數案例可以採用MapR ...
目錄
本章通過幾個案例詳細講解MapReduce程式的編寫與運行。
5.1 案例分析:單詞計數
假如有這樣一個例子,需要統計過去10年電腦論文中出現次數最多的幾個單詞,以分析當前的熱點研究議題是什麼。那麼,在將論文樣本收集完畢之後,接下來應該怎樣做呢?
這一經典的單詞計數案例可以採用MapReduce處理。MapReduce中已經自帶了一個單詞計數程式WordCount,如同Java中的經典程式“Hello World”一樣,WordCount是MapReduce中統計單詞出現次數的Java類,是MapReduce的入門程式。該程式要求計算出文件中單詞的出現次數,並將輸出結果輸出到HDFS文件系統中,且按照單詞的字母順序進行排序,每個單詞和其出現次數占一行,單詞與出現次之間有間隔。
例如,輸入內容如下的文件:
hello world
hello hadoop
bye hadoop
其符合要求的輸出結果如下:
bye 1
hadoop 2
hello 2
world 1
下麵進一步對上述WordCount程式進行分析。
1.設計思路
WordCount對於單詞計數問題的解決方案很直接:先將文件內容切分成單詞,然後將所有相同的單詞聚集到一起,最後計算各個單詞出現的次數,將計算結果排序輸出。
根據MapReduce並行設計的原則可知:解決方案中的內容切分步驟和內容不相關,可以並行化處理,每個拿到原始數據的節點只需要將輸入數據切分成單詞就可以了,因此可由Mapping階段完成單詞切分的任務;另外,不同單詞之間的頻數也不相關,所以對相同單詞頻數的計算也可以並行化處理,將相同的單詞交由同一節點來計算頻數,然後輸出最終結果,該任務可由Reduce階段完成;至於將Mapping階段的輸出結果根據不同單詞進行分組,然後再發送給Reduce節點的任務,可由MapReduce中的Shuffle階段完成。
由於MapReduce中傳遞的數據都是鍵值對形式的,而且Shuffle的排序、聚集和分發也是按照鍵值進行的,因此,可將Map的輸出結果設置為以單詞作為鍵,1作為值的形式,表示某單詞出現了1次(輸入Map的數據則採用Hadoop預設的輸入格式,即文件的一行作為值,行號作為鍵)。由於Reduce的輸入是Map的輸出聚集後的結果,因此格式為<key, value-list>,也就是<word, {1,1,1,1,…}>;Reduce的輸出則可由設置成與Map輸出相同的形式,只是後面的數值不再是固定的1,而是具體計算出的某單詞所對應的頻數。
WordCount程式的執行流程如下圖:
2.程式源代碼
WordCount程式類的源代碼如下所示:
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static void main(String[] args)
throws Exception
{
//初始化Configuration類
Configuration conf = new Configuration();
//通過實例化對象GenericOptionsParser可以獲得程式執行所傳入的參數
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
//構建任務對象
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//設置輸出結果的數據類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
//設置需要統計的文件的輸入路徑
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//設置統計結果的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
//提交任務給Hadoop集群
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException
{
//統計單詞總數
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
//輸出統計結果
context.write(key, this.result);
}
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
//預設根據空格分割字元串
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {//迴圈輸出每個單詞與數量
this.word.set(itr.nextToken());
//輸出單詞與數量
context.write(this.word, one);
}
}
}
}
3.程式解讀
main函數中的job.setCombinerClass(IntSumReducer.class);指定了Map規約的類,也可以不使用Reduce類而進行自定義。這裡的規約combine的含義是,將Map結果進行一次本地的reduce操作,從而減輕遠程reduce的壓力。例如,Map階段有隻是把兩個相同的hello進行規約,由此輸入給reduce的就變成了<hello,2>。在實際的Hadoop集群操作中,我們是由多台主機一起進行MapReduce的,如果加入規約操作,每一臺主機會在reduce之前進行一次對本機數據的規約,然後在通過集群進行reduce操作,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。
4.程式運行
下麵以統計Hadoop安裝目錄下的LICENSE.txt文件中的單詞頻數為例,講解如何運行上述單詞計數程式WordCount,操作步驟如下:
(1)執行以下命令,在HDFS根目錄下創建文件目錄input:
hadoop fs –mkdir /input
(2)進入Hadoop安裝目錄,找到文件LICENSE.txt,然後執行以下命令,將其放到HDFS的/input目錄下:
hadoop fs –put LICENSE.txt /input
(3)執行以下命令,運行Hadoop自帶的WordCount程式:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar wordcount /input /output
需要註意的是,HDFS根目錄不應存在文件夾output,程式會自動創建。若存在則會報錯。
若輸出以下信息,表示程式運行正常:
16/09/05 22:51:27 INFO mapred.LocalJobRunner: reduce > reduce
16/09/05 22:51:27 INFO mapred.Task: Task 'attempt_local1035441982_0001_r_000000_0' done.
16/09/05 22:51:27 INFO mapred.LocalJobRunner: Finishing task: attempt_local1035441982_0001_r_000000_0
16/09/05 22:51:27 INFO mapred.LocalJobRunner: reduce task executor complete.
16/09/05 22:51:28 INFO mapreduce.Job: map 100% reduce 100%
16/09/05 22:51:28 INFO mapreduce.Job: Job job_local1035441982_0001 completed successfully
16/09/05 22:51:28 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=569202
FILE: Number of bytes written=1134222
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=30858
HDFS: Number of bytes written=8006
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=289
Map output records=2157
Map output bytes=22735
Map output materialized bytes=10992
Input split bytes=104
Combine input records=2157
Combine output records=755
Reduce input groups=755
Reduce shuffle bytes=10992
Reduce input records=755
Reduce output records=755
Spilled Records=1510
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=221
Total committed heap usage (bytes)=242360320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=15429
File Output Format Counters
Bytes Written=8006
(4)程式運行的結果以文件的形式存放在HDFS的/output目錄下,執行以下命令,可以將運行結果下載到本地查看:
hadoop fs –get hdfs:/output
5.2 案例分析:數據去重
數據去重是通過並行化思想來對數據進行有意義的篩選。許多看似龐雜的任務,如統計大數據集上的數據種類個數、從網站日誌中計算訪問地點等都會涉及數據去重。
本例中,已知有兩個文件file1.txt和file2.txt,需要對這兩個文件中的數據進行合併去重,文件中的每行是一個整體。
file1.txt的內容如下:
2017-3-1 a
2017-3-2 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-7 c
2017-3-3 c
file2.txt的內容如下:
2017-3-1 b
2017-3-2 a
2017-3-3 b
2017-3-4 d
2017-3-5 a
2017-3-6 c
2017-3-7 d
2017-3-3 c
期望的輸出結果:
2017-3-1 a
2017-3-1 b
2017-3-2 a
2017-3-2 b
2017-3-3 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-6 c
2017-3-7 c
2017-3-7 d
1.設計思路
數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。本例中,每個數據代表輸入文件中的一行內容。採用Hadoop預設輸入方式輸入到Map的鍵值對<key,value>中,key是數據所在文件的位置下標,value是數據的內容。而Map階段的任務就是將接收到的value設置為key,並直接輸出(輸出數據中的value任意)。Map輸出的鍵值對經過Shuffle過程會聚集成<key,value- list>後交給Reduce,此時所有的key實際上已經做了去重處理。因此,在Reduce階段,當Reduce接收到一個<key,value- list>時,不用管每個key有多少個value,直接將其中的key複製到輸出的key中,並將value設置成空值,就可以得到數據去重的結果。
2.編寫程式
在eclipse中新建一個Maven項目,在項目的pom.xml文件中加入項目的Hadoop依賴庫,代碼如下:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
然後,在項目中新建包com.hadoop.mr,併在包中寫入數據去重的程式代碼:
public class Dedup {
//map將輸入中的value複製到輸出數據的key上,並直接輸出
public static class Map extends Mapper<Object,Text,Text,Text>{
private static Text line=new Text();//每行數據
//實現map函數
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
line=value;
context.write(line, new Text(""));
}
}
//reduce將輸入中的key複製到輸出數據的key上,並直接輸出
public static class Reduce extends Reducer<Text,Text,Text,Text>{
//實現reduce函數
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf, "Data Deduplication");
job.setJarByClass(Dedup.class);
// 設置Map、Combine(規約)和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path("/input/"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.程式解讀
上述程式中,程式的map函數將接收到的鍵值對中的value直接賦給了函數輸出鍵值對中的key,而輸出的value則被置為了空字元串。程式的reduce函數將接收到的鍵值對數據中的key直接作為函數輸出鍵值對中的key,而輸出的value則置為了空字元串。
4.程式運行
該程式需要在Hadoop集群環境下運行,步驟如下:
(1)在eclipse中將完成的MapReduce項目導出為jar包,命名為Dedup.jar,然後上傳到Hadoop伺服器的相應位置。本例將其上傳到Hadoop的安裝目錄下。
(2)在HDFS根目錄下創建input文件夾,命令如下:
hadoop fs –mkdir /input
(3)將準備好的示例文件file1.txt和file2.txt使用SSH工具上傳到Hadoop安裝目錄下,併進入Hadoop安裝目錄,執行以下命令,將示例文件上傳到HDFS中新建的/input目錄下:
hadoop fs –put file1.txt /input
hadoop fs –put file2.txt /input
(4)在Hadoop安裝目錄中,執行以下命令,運行寫好的MapReduce數據去重程式:
hadoop jar Dedup.jar com.hadoop.mr.Dedup
(5)程式運行完畢後,會在HDFS的根目錄下生成output目錄,併在output目錄中生成part-r-00000文件,程式執行結果即存放於此文件中。可以執行以下命令,查看程式執行結果:
hadoop fs –cat /output/*
如果能正確顯示預期結果,則表明程式編寫無誤。
5.3 案例分析:求平均分
本例通過對輸入文件中的學生三科成績進行計算,得出每個學生的平均成績。輸入文件中的每行內容均為一個學生的姓名和他相應的成績,每門學科為一個文件。要求在輸出中每行有兩個數據,其中,第一個代表學生的姓名,第二個代表其平均成績。
輸入文件內容如下:
math.txt:
張三 88
李四 99
王五 66
趙六 77
chinese.txt:
張三 78
李四 89
王五 96
趙六 67
english.txt:
張三 80
李四 82
王五 84
趙六 86
期望輸出結果如下:
張三 82
李四 90
王五 82
趙六 76
1.設計思路
我們都知道,Mapper最終處理的結果對<key,value>,會送到Reducer中進行合併,合併的時候,有相同key的<key,value>對則送到同一個 Reducer上。Reducer是所有用戶定製Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有 Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到文件中。
MapReduce經典的WordCount(單詞計數)例子是將接收到的每一個value-list進行求和,進而得到所需的結果。而本例中,我們將reduce階段接收到的value-list進行求平均分後,作為reduce要輸出的value值即可,reduce要輸出的key值仍然為接收到的key。
整個求平均分的流程如下圖:
2.程式源碼
項目的新建及jar包的引入見上方的數據去重案例。
完整程式源代碼如下:
public class Score {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
// 實現map函數
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將輸入的一行數據,處理中文亂碼後轉化成String
String line=new String(value.getBytes(),0,value.getLength(),"GBK");
// 將輸入的一行數據首先按空格進行分割
StringTokenizer itr = new StringTokenizer(line);
// 獲取分割後的字元串
String strName = itr.nextToken();// 學生姓名部分
String strScore = itr.nextToken();// 成績部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
// 輸出姓名和成績
context.write(name, new IntWritable(scoreInt));
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// 實現reduce函數
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();// 計算總分
count++;// 統計總的科目數
}
int average = (int) sum / count;// 計算平均成績
//輸出姓名和平均成績
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Score Average");
job.setJarByClass(Score.class);
// 設置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 將輸入的數據集分割成小數據塊splites,提供一個RecordReder的實現
job.setInputFormatClass(TextInputFormat.class);
// 提供一個RecordWriter的實現,負責數據輸出
job.setOutputFormatClass(TextOutputFormat.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path("/input/"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
註意的問題:
Hadoop在涉及編碼時預設使用的是UTF-8,如果文件編碼格式是其它類型(如GBK),則會出現亂碼。此時只需在mapper或reducer程式中讀取Text時,進行一下轉碼,確保都是以UTF-8的編碼方式在運行即可。轉碼的核心代碼如下:
String line=new String(value.getBytes(),0,value.getLength(),"GBK");
若直接以String line=value.toString(); 進行轉碼也會輸出亂碼,原因是由Text這個Writable類型造成的。雖然Text是String的Writable類的封裝,但它倆還是有區別的。Text是一種UTF-8格式的Writable,而Java中的String是Unicode字元。所以直接使用value.toString會預設其中的字元都是UTF-8編碼過的,因而GBK編碼的數據直接使用該方法就會變成亂碼。
原創文章,轉載請註明出處!!