MapReduce筆記——技術點彙總

来源:http://www.cnblogs.com/netoxi/archive/2017/07/28/7223090.html
-Advertisement-
Play Games

目錄 · 概況 · 原理 · MapReduce編程模型 · MapReduce過程 · 容錯機制 · API · 概況 · WordCount示例 · Writable介面 · Mapper類 · Reducer類 · Partitioner抽象類 · WritableComparator介面 · ...


目錄

· 概況

· 原理

    · MapReduce編程模型

    · MapReduce過程

    · 容錯機制

· API

    · 概況

    · WordCount示例

    · Writable介面

    · Mapper類

    · Reducer類

    · Partitioner抽象類

    · WritableComparator介面

    · 示例:連接

    · 示例:二次排序


 

概況

1. 起源:一篇Google論文。

2. 特點

    a) 開發簡單:用戶可不考慮進程通信、套接字編程,無需高深技巧,只需符合MapReduce編程模型。

    b) 伸縮性:當集群資源無法滿足計算需求時,可通過增加節點達到線性伸縮集群的目的。

    c) 容錯性:節點故障導致的作業失敗,計算框架自動將作業安排到健康節點重新執行,直到任務完成。

3. MapReduce含義:MapReduce編程模型;MapReduce運行環境(YARN)。

4. 局限性

    a) 執行速度慢:普通MapReduce作業一般分鐘級別完成,複雜作業或數據量更大時可能花費一小時或更多。MapReduce通常時數據密集型作業,大量中間結果寫到磁碟並通過網路傳輸,消耗大量時間。

    b) 過於底層:與SQL相比,過於底層。對於習慣關係資料庫的用戶,或數據分析師,編寫mapreduce函數無疑頭疼。

    c) 無法實現所有演算法。

原理

MapReduce編程模型

1. MapReduce起源:LISP和其他函數式編程語言中的古老映射和化簡操作。

2. MapReduce操作數據最小單位:鍵值對。

3. MapReduce模型執行過程

(Key1, Value1)  →  (Key1, List<Value2>)  →  (Key3, Value3)

    a) 將數據抽象為鍵值對形式作為map函數輸入;

    b) 經過map函數處理,生成一系列新鍵值對作為中間結果輸出到本地;

    c) 計算框架自動將中間結果按鍵聚合,並將鍵相同的數據分發到reduce函數,以鍵和對應指的集合作為reduce函數輸入;

    d) 經過reduce函數處理,生成一系列鍵值對作為最終輸出。

4. WordCount舉例

MapReduce過程

1. 過程描述:inputmapcombinereduceoutput五個階段,其中combine階段不一定發生,map函數輸出的中間結果被被分發到reduce函數的過程稱為shuffleshuffle階段還會發生copysort

1. MapReduce任務:一個作業被分成MapReduce計算兩個階段,分別由一個或多個Map任務和Reduce任務組成。

2. input階段

    a) input過程:如果使用HDFS上文件作為MapReduce輸入,計算框架以“org.apache.hadoop.mapreduce.InputFormat”抽象類的子類“org.apache.hadoop.mapreduce.lib.input.FileInputFormat”作為該文件的InputSplit,每個InputSplit作為一個Map任務的輸入,再將InputSplit解析成鍵值對。

    b) InputSplit對數據影響:InputSplit只在邏輯上對數據切分,不影響磁碟上存儲的文件。

    c) InputSplit包含信息:分片的元數據信息,包括起始位置、長度和所在節點列表等。

 1 package org.apache.hadoop.mapreduce.lib.input;
 2 
 3 // ...
 4 
 5 public class FileSplit extends InputSplit implements Writable {
 6 
 7   // ...
 8 
 9   /** Constructs a split with host information
10    *
11    * @param file the file name
12    * @param start the position of the first byte in the file to process
13    * @param length the number of bytes in the file to process
14    * @param hosts the list of hosts containing the block, possibly null
15    */
16   public FileSplit(Path file, long start, long length, String[] hosts) {
17     this.file = file;
18     this.start = start;
19     this.length = length;
20     this.hosts = hosts;
21   }
22 
23   // ...
24 }

    d) InputSplit大小計算:minSize取自mapred-site.xml參數“mapreduce.input.fileinputformat.split.minsize”,預設1;maxSize取自mapred-site.xml參數“mapreduce.input.fileinputformat.split.maxsize”,預設Long.MAX_VALUEblockSize取自hdfs-site.xml參數“dfs.block.size”。所以,使用預設配置時,InputSplit大小為塊大小。

 1 package org.apache.hadoop.mapreduce.lib.input;
 2 
 3 // ...
 4 
 5 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
 6 
 7   // ...
 8 
 9   protected long computeSplitSize(long blockSize, long minSize,
10                                   long maxSize) {
11     return Math.max(minSize, Math.min(maxSize, blockSize));
12   }
13 
14   // ...
15 }

    e) input數量計算:文件大小÷InputSplit大小。

    f) 對齊:任務調度時,優先考慮本節點的數據,如果本節點沒有可處理的數據或還需其他節點數據,Map任務所在節點會從其他節點將數據網路傳輸給自己。當InputSplit大小大於塊大小時,Map任務會從其他節點讀取一部分數據,就無法實現完全數據本地化,所以InputSplit大小應等於塊大小。

    g) map函數輸入:通過InputFormat.createRecordReader()方法將InputSplit解析為鍵值對。

 1 package org.apache.hadoop.mapreduce.lib.input;
 2 
 3 // ...
 4 
 5 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 6 
 7   // ...
 8 
 9   @Override
10   public RecordReader<LongWritable, Text> 
11     createRecordReader(InputSplit split,
12                        TaskAttemptContext context) {
13     String delimiter = context.getConfiguration().get(
14         "textinputformat.record.delimiter");
15     byte[] recordDelimiterBytes = null;
16     if (null != delimiter)
17       recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
18     return new LineRecordReader(recordDelimiterBytes);
19   }
20 
21   // ...
22 }

4. map階段

    a) 寫緩衝區:map函數輸出時,先寫到記憶體環形緩衝區,並做一次預排序;每個Map任務都有一個環形緩衝區,大小取自mapred-site.xml參數“mapreduce.task.io.sort.mb”,預設100MB

    b) 緩衝區溢寫磁碟:當環形緩衝區達到閥值(mapred-site.xml參數“mapreduce.map.sort.spill.percent”,預設0.80)時,一個後臺線程將緩衝區內容溢寫(spill)到磁碟mapred-site.xml參數“mapreduce.cluster.local.dir”指定的目錄,預設${hadoop.tmp.dir}/mapred/local

    c) 溢寫磁碟前排序:溢寫磁碟前,線程根據數據要傳送的Reducer對緩衝區數據分區(預設按鍵),每個分區內再按鍵排序。

    d) Combiner:當已指定Combiner且溢寫次數至少3次時,在溢寫磁碟前執行Combiner;效果是對map函數輸出的中間結果進行一次合併,作用與reduce函數一樣;目的是降低中間結果數據量(中間結果要寫磁碟且通過網路傳至Reducer),提升運行效率。

    e) 壓縮:對中間結果壓縮,目的與Combiner相同;mapred-site.xml參數“mapreduce.map.output.compress”,預設false,參數“mapreduce.map.output.compress.codec”。

壓縮格式

演算法

文件擴展名

可切分

codec

說明

Deflate

Deflate

.deflate

org.apache.hadoop.io.compress.DeflateCodec

 

gzip

Deflate

.gz

org.apache.hadoop.io.compress.GzipCodec

 

bzip2

bzip2

.bz2

org.apache.hadoop.io.compress.BZip2Codec

 

LZO

LZOP

.lzo

com.hadoop.compression.lzo.LzopCodec

 

Snappy

Snappy

.snappy

org.apache.hadoop.io.compress.SnappyCodec

高壓縮、高速,推薦

    f) 中間結果傳輸:中間結果通過HTTP方式傳至Reducer,傳輸工作線程數配置mapred-site.xml參數“mapreduce.tasktracker.http.threads”,預設40

5. shuffle階段

    a) copy:一個Reduce任務可能需多個Map任務輸出,而每個Map任務完成時間很可能不同,當只要有一個Map任務完成,Reduce任務即開始複製其輸出;複製線程數配置mapred-site.xml參數“mapreduce.reduce.shuffle.parallelcopies”,預設5

    b) copy的緩衝區:如果map輸出相當小,數據先被覆制到Reducer所在節點的記憶體緩衝區(大小配置mapred-site.xml參數“mapreduce.reduce.shuffle.input.buffer.percent”,預設0.70),當記憶體緩衝區大小達到閥值(mapred-site.xml參數“mapreduce.reduce.shuffle.merge.percent”,預設0.66)或記憶體緩衝區文件數達到閥值(mapred-site.xml參數“mapreduce.reduce.merge.inmem.threshold”,預設1000)時,則合併後溢寫磁碟。否則,map輸出被覆制到磁碟。

    c) copy的合併:隨複製到磁碟的文件增多,後臺線程將其合併為更大、有序的文件,為後續合併節約時間。合併時,壓縮的中間結果將在記憶體中解壓縮。

    d) sort:複製完成所有map輸出後,合併map輸出文件並歸併排序。

    e) sort的合併:將map輸出文件合併,直至合併因數(mapred-site.xml參數“mapreduce.task.io.sort.factor”,預設10)。例如,有50map輸出文件,進行5次合併,每次將10各文件合併成一個文件,最後5個文件。

6. reduce階段

    a) reduce函數輸入:經過shuffle的文件都是按鍵分區且有序,相同分區的文件調用一次reduce函數。

    b) reduce函數輸出:一般為HDFS

7. 排序

    a) MapReduce中的排序演算法:快速排序、歸併排序。

    b) MapReduce發生的3次排序:map階段的緩衝區排序(快速排序演算法);map階段溢寫磁碟前,對溢寫文件合併時的排序(歸併排序演算法);shuffle階段的文件合併sort(歸併排序演算法)。

容錯機制

1. 任務錯誤:對錯誤任務不斷重試,直到總嘗試次數超過N次認為徹底失敗;Map任務、Reduce任務總嘗試次數分別為mapred-site.xml參數“mapreduce.map.maxattempts”和“mapreduce.reduce.maxattempts”,預設均為4

2. ApplicationMaster錯誤:通過YARN容錯機制完成。

3. NodeManager錯誤:通過YARN容錯機制完成。

4. ResourceManager錯誤:通過YARN容錯機制完成。

API

概況

1. 新舊API

    a) 舊API包:org.apache.hadoop.mapred

    b) 新API包:org.apache.hadoop.mapreduce

2. Maven依賴

1 <dependency>
2     <groupId>org.apache.hadoop</groupId>
3     <artifactId>hadoop-client</artifactId>
4     <version>2.6.5</version>
5 </dependency>

WordCount示例

1. Mapper

 1 package mr.wordcount;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 
10 public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
11 
12     private final static IntWritable one = new IntWritable(1);
13     
14     private Text word = new Text();
15 
16     @Override
17     protected void map(LongWritable key, Text value,
18             Mapper<LongWritable, Text, Text, IntWritable>.Context context)
19             throws IOException, InterruptedException {
20         StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
21         while (stringTokenizer.hasMoreTokens()) {
22             word.set(stringTokenizer.nextToken());
23             context.write(word, one);
24         }
25     }
26     
27 }

2. Reducer類

 1 package mr.wordcount;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Reducer;
 7 
 8 public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 9 
10     private IntWritable result = new IntWritable();
11     
12     @Override
13     protected void reduce(Text key, Iterable<IntWritable> values,
14             Reducer<Text, IntWritable, Text, IntWritable>.Context context)
15             throws IOException, InterruptedException {
16         int sum = 0;
17         for (IntWritable value : values) {
18             sum += value.get();
19         }
20         result.set(sum);
21         context.write(key, result);
22     }
23 
24 }

3. main方法

 1 package mr.wordcount;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 public class WordCount {
12 
13     public static void main(String[] args) throws Exception {
14         if (args.length != 2) {
15             System.err.println("Usage: wordcount <in> <out>");
16             System.exit(2);
17         }
18         
19         Configuration conf = new Configuration();
20         Job job = Job.getInstance(conf, "WordCount");
21         job.setJarByClass(WordCount.class);
22         job.setMapperClass(TokenizerMapper.class);
23         job.setCombinerClass(IntSumReducer.class);
24         job.setReducerClass(IntSumReducer.class);
25         job.setOutputKeyClass(Text.class);
26         job.setOutputValueClass(IntWritable.class);
27         FileInputFormat.addInputPath(job, new Path(args[0]));
28         FileOutputFormat.setOutputPath(job, new Path(args[1]));
29         System.exit(job.waitForCompletion(true) ? 0 : 1);
30     }
31 
32 }

4. 執行命令

hadoop jar wordcount.jar mr.wordcount.WordCount /test/wordcount/in /test/wordcount/out
hadoop fs -ls /test/wordcount/out
hadoop fs -cat /test/wordcount/out/part-r-00000

Writable介面

1. 職責:Hadoop序列化格式。

2. Hadoop序列化場景:IPC(進程間通信)、數據持久化。

3. 源碼(2個重要方法)

 1 package org.apache.hadoop.io;
 2 
 3 // ...
 4 
 5 public interface Writable {
 6   /** 
 7    * Serialize the fields of this object to <code>out</code>.
 8    * 
 9    * @param out <code>DataOuput</code> to serialize this object into.
10    * @throws IOException
11    */
12   void write(DataOutput out) throws IOException;
13 
14   /** 
15    * Deserialize the fields of this object from <code>in</code>.  
16    * 
17    * <p>For efficiency, implementations should attempt to re-use storage in the 
18    * existing object where possible.</p>
19    * 
20    * @param in <code>DataInput</code> to deseriablize this object from.
21    * @throws IOException
22    */
23   void readFields(DataInput in) throws IOException;
24 }

4. 內置實現類

    a) 包:org.apache.hadoop.io

    b) 類圖

    c) 與Java類型對照

Java類型

Writable實現

備註

null

NullWritable

序列化長度為0,充當占位符

String

Text

 

boolean

BooleanWritable

 

byte

ByteWritable

 

int

IntWritableVIntWritable

V開頭表示變長,否則定長

long

LongWritableVLongWritable

V開頭表示變長,否則定長

float

FloatWritable

 

double

DoubleWritable

 

5. 自定義實現類 

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.io.WritableComparable;
 6 
 7 public class TextPair implements WritableComparable<TextPair> {
 8     
 9     private Text first;
10     
11     private Text second;
12 
13     public TextPair() {
14         this(new Text(), new Text());
15     }
16 
17     public TextPair(Text first, Text second) {
18         this.first = first;
19         this.second = second;
20     }
21 
22     public void write(DataOutput out) throws IOException {
23         first.write(out);
24         second.write(out);
25     }
26 
27     public void readFields(DataInput in) throws IOException {
28         first.readFields(in);
29         second.readFields(in);
30     }
31 
32     // 用於MapReduce過程中的排序
33     public int compareTo(TextPair o) {
34         int result = first.compareTo(o.first);
35         if (result == 0) {
36             result = second.compareTo(o.second);
37         }
38         return result;
39     }
40 
41     public Text getFirst() {
42         return first;
43     }
44 
45     public void setFirst(Text first) {
46         this.first = first;
47     }
48 
49     public Text getSecond() {
50         return second;
51     }
52 
53     public void setSecond(Text second) {
54         this.second = second;
55     }
56 
57 }

Mapper類

 1. 源碼(4個重要方法)

 1 package org.apache.hadoop.mapreduce;
 2 
 3 // ...
 4 
 5 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 6 
 7   /**
 8    * The <code>Context</code> passed on to the {@link Mapper} implementations.
 9    */
10   public abstract class Context
11     implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
12   }
13   
14   /**
15    * Called once at the beginning of the task.
16    */
17   protected void setup(Context context
18                        ) throws IOException, InterruptedException {
19     // NOTHING
20   }
21 
22   /**
23    * Called once for each key/value pair in the input split. Most applications
24    * should override this, but the default is the identity function.
25    */
26   @SuppressWarnings("unchecked")
27   protected void map(KEYIN key, VALUEIN value, 
28                      Context context) throws IOException, InterruptedException {
29     context.write((KEYOUT) key, (VALUEOUT) value);
30   }
31 
32   /**
33    * Called once at the end of the task.
34    */
35   protected void cleanup(Context context
36                          ) throws IOException, InterruptedException {
37     // NOTHING
38   }
39   
40   /**
41    * Expert users can override this method for more complete control over the
42    * execution of the Mapper.
43    * @param context
44    * @throws IOException
45    */
46   public void run(Context context) throws IOException, InterruptedException {
47     setup(context);
48     try {
49       while (context.nextKeyValue()) {
50         map(context.getCurrentKey(), context.getCurrentValue(), context);
51       }
52     } finally {
53       cleanup(context);
54     }
55   }
56 }