本章主要介紹了 MapReduce 的開發原理及應用場景,講解如何利用 Combine、Partitioner、WritableComparable、WritableComparator 等組件對數據進行排序篩選聚合分組的功能。利用例子模仿 SQL 關係資料庫進行SELECT、WHERE、GROUP... ...
前言
本文主要介紹 MapReduce 的原理及開發,講解如何利用 Combine、Partitioner、WritableComparator等組件對數據進行排序篩選聚合分組的功能。
由於文章是針對開發人員所編寫的,在閱讀本文前,文章假設讀者已經對Hadoop的工作原理、安裝過程有一定的瞭解,因此對Hadoop的安裝就不多作說明。請確保源代碼運行在Hadoop 2.x以上版本,並以偽分佈形式安裝以方便進行調試(單機版會對 Partitioner 功能進行限制)。
文章主要利用例子介紹如何利用 MapReduce 模仿 SQL 關係資料庫進行SELECT、WHERE、GROUP、JOIN 等操作,並對 GroupingComparator、SortComparator 等功能進行說明。
希望本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。
目錄
五、WritableComparatable 自定義鍵值說明
一、MapReduce 工作原理簡介
對Hadoop有興趣的朋友相信對Hadoop的主要工作原理已經有一定的認識,在講解MapReduce的程式開發前,本文先針對Mapper、Reducer、Partitioner、Combiner、Suhffle、Sort的工作原理作簡單的介紹,以幫助各位更好地瞭解後面的內容。
圖 1.1
1.1 Mapper 階段
當系統對數據進行分片後,每個輸入分片會分配到一個Mapper任務來處理,預設情況下系統會以HDFS的一個塊大小64M作為分片大小,當然也可以通過配置文件設置塊的大小。隨後Mapper節點輸出的數據將保存到一個緩衝區中(緩衝區的大小預設為512M,可通過mapreduce.task.io.sort.mb屬性進行修改),緩衝區越大排序效率越高。當該緩衝區快要溢出時(緩衝區預設大小為80%,可通過mapreduce.map.sort.spill.percent屬性進行修改),系統會啟動一個後臺線程,將數據傳輸到會到本地的一個文件當中。
1.2 Partitioner 階段
在Mapper完成 KEY/VALUE 格式的數據操作後,Partitioner 將會被調用,由於真實環境中 Hadoop 可能會包含幾十個甚至上百個Reducer ,Partitioner 的主要作用就是根據自定義方式確定數據將被傳輸到哪一個Reducer進行處理。
1.3 Combiner 階段
如果系統定義了Combiner,在經過 Partitioner 排序處理後將會進行 Combiner處理。我們可以把 Combiner 看作為一個小型的 Reducer ,由於數據從 Mapper 通過網路傳送到 Reducer ,資源開銷很大,Combiner 目的就是在數據傳送到Reducer前作出初步聚集處理,減少伺服器的壓力。如果數據量太大,還可以把 mapred.compress.map.out 設置為 true,就可以將數據進行壓縮。(關於數據壓縮的內容已經超越本文的討論範圍,以後會有獨立的篇章針對數據壓縮進行專題討論,敬請期待)
1.4 Shuffle 階段
在 Shuffle 階段,每個 Reducer 會啟動 5 個線程(可通過 mapreduce.reduce.shuffle.parallelcopies 進行設置)通過HTTP協議獲取Mapper傳送過來的數據。每次數據發送到 Reducer 前,都會根據鍵先進行排序。開發人員也可通過自定義的 SortComparator 進行數據排序,也是根據 GroupComparator 按照數據的其他特性進行分組處理,下麵章節將會詳細舉例介紹。對數據進行混洗、排序完成後,將傳送到對應的Reducer進行處理。
1.5 Reducer 階段
當 Mapper 實例完成輸入的數據超過設定值後(可通過mapreduce.job.reduce.slowstart.completedmaps 進行設置), Reducer 就會開始執行。Reducer 會接收到不同 Mapper 任務傳來已經過排序的數據,並通過Iterable 介面進行處理。在 Partitioner 階段,系統已定義哪些數據將由個 Reducer 進行管理。當 Reducer 檢測到 KEY 時發生變化時,系統就會按照已定的規則生成一個新的 Reducer 對數據進行處理。
如果 Reducer 端接受的數據量較小,數據則可直接存儲在記憶體緩衝區中,方便後面的數據輸出(緩衝區大小可通過mapred.job.shuffle.input.buffer.percent 進行設置)
如果數據量超過了該緩衝區大小的一定比例(可以通過 mapred.job.shuffle.merge.percent 進行設置),數據將會被合併後寫到磁碟中。
二、MapReduce 開發實例
上一章節講解了 MapReduce 的主要流程,下麵將以幾個簡單的例子模仿 SQL 關係資料庫向大家介紹一下 MapReduce 的開發過程。
HDFS常用命令 (此處只介紹幾個常用命令,詳細內容可在網上查找)
- 創建目錄 hdfs dfs -mkdir -p 【Path】
- 複製文件 hdfs dfs -copyFromLocal 【InputPath】【OutputPath】
- 查看目錄 hdfs dfs -ls 【Path】
- 運行JAR hadoop jar 【Jar名稱】 【Main類全名稱】 【InputPath】 【OutputPath】
2.1 使用 SELECT 獲取數據
應用場景:假設在 hdfs 文件夾 / input / 20180509 路徑的 *.dat 類型文件中存放在著大量不同型號的 iPhone 手機當天在不同地區的銷售記錄,系統想對這些記錄進行統計,計算出不同型號手機的銷售總數。
計算時,在Mapper中獲取每一行的信息,並把iPhone名稱作為Key插入,把數據作為Value插入到Context當中。
當Reducer接收到相同Key數據後,再作統一處理。
註意 : 當前例子當中 Mapper 的輸入 Key 為 LongWritable 長類型
在此過程中要註意幾點: 例子中 SaleManager 繼承了 org.apache.hadoop.conf.Configured 類並實現了 org.apache.hadoop.util.Tool 介面的 public static int run(Configuration conf,Tool tool, String[] args) 方法,MapReduce的相關操作都在run裡面實現。由於 Configured 已經實現了 getConf() 與setConfig() 方法,創建Job時相關的配置信息就可通過getConf()方法讀入。
系統可以通過以下方法註冊Mapper及Reducer處理類
Job.setMapperClass(MyMapper.class);
Job.setReducerClass(MyReducer.class);
在整個運算過程當中,數據會經過篩選與計算,所以Mapper的讀入信息K1,V1與Reducer的輸出信息K3,V3不一定是同一格式。
org.apache.hadoop.mapreduce.Mapper<K1,V1,K2,V2>
org.apache.hadoop.mapreduce.Reducer<K2,V2,K3,V3>
當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型相同時,系統可以通過下麵方法設置轉出數據的格式
Job.setOutputKeyClass(K);
Job.setOutputValueClass(V);
當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型不相同時,系統則需要通過下麵方法設置Mapper轉出格式
Job.setMapOutputKeyClass(K);
Job.setMapOutputValueClass(V);
1 public class Phone { 2 public String type; 3 public Integer count; 4 public String area; 5 6 public Phone(String line){ 7 String[] data=line.split(","); 8 this.type=data[0].toString(); 9 this.count=Integer.valueOf(data[1].toString()); 10 this.area=data[2].toString(); 11 } 12 13 public String getType(){ 14 return this.type; 15 } 16 17 public Integer getCount(){ 18 return this.count; 19 } 20 21 public String getArea(){ 22 return this.area; 23 } 24 } 25 26 public class SaleManager extends Configured implements Tool{ 27 public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ 28 public void map(LongWritable key,Text value,Context context) 29 throws IOException,InterruptedException{ 30 String data=value.toString(); 31 Phone iPhone=new Phone(data); 32 //以iPhone型號作為Key,數量為作Value傳入 33 context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount())); 34 } 35 } 36 37 public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ 38 public void reduce(Text key,Iterable<IntWritable> values,Context context) 39 throws IOException,InterruptedException{ 40 int sum=0; 41 //對同一型號的iPhone數量進行統計 42 for(IntWritable val : values){ 43 sum+=val.get(); 44 } 45 context.write(key, new IntWritable(sum)); 46 } 47 } 48 49 public int run(String[] arg0) throws Exception { 50 // TODO 自動生成的方法存根 51 // TODO Auto-generated method stub 52 Job job=Job.getInstance(getConf()); 53 job.setJarByClass(SaleManager.class); 54 //註冊Key/Value類型為Text 55 job.setOutputKeyClass(Text.class); 56 job.setOutputValueClass(IntWritable.class); 57 //註冊Mapper及Reducer處理類 58 job.setMapperClass(MyMapper.class); 59 job.setReducerClass(MyReducer.class); 60 //輸入輸出數據格式化類型為TextInputFormat 61 job.setInputFormatClass(TextInputFormat.class); 62 job.setOutputFormatClass(TextOutputFormat.class); 63 //預設情況下Reducer數量為1個(可忽略) 64 job.setNumReduceTasks(1); 65 //獲取命令參數 66 String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs(); 67 //設置讀入文件路徑 68 FileInputFormat.setInputPaths(job,new Path(args[0])); 69 //設置轉出文件路徑 70 FileOutputFormat.setOutputPath(job,new Path(args[1])); 71 boolean status=job.waitForCompletion(true); 72 if(status) 73 return 0; 74 else 75 return 1; 76 } 77 78 public static void main(String[] args) throws Exception{ 79 Configuration conf=new Configuration(); 80 ToolRunner.run(new SaleManager(), args); 81 } 82 }
計算結果
2.2 使用 WHERE 對數據進行篩選
在計算過程中,並非所有的數據都適用於Reduce的計算,由於海量數據是通過網路傳輸的,所消耗的 I/O 資源巨大,所以可以嘗試在Mapper過程中提前對數據進行篩選。以上面的數據為例,當前系統只需要計算輸入參數地區的銷售數據。此時只需要修改一下Mapper類,重寫setup方法,通過Configuration類的 public String[] Configuration.getStrings(參數名,預設值) 方法獲取命令輸入的參數,再對數據進行篩選。
1 public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ 2 private String area; 3 4 @Override 5 public void setup(Context context){ 6 this.area=context.getConfiguration().getStrings("area", "BeiJing")[0]; 7 } 8 9 public void map(LongWritable key,Text value,Context context) 10 throws IOException,InterruptedException{ 11 String data=value.toString(); 12 Phone iPhone=new Phone(data); 13 if(this.area.equals(iPhone.area)) 14 context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount())); 15 } 16 }
執行命令 hadoop jar 【Jar名稱】 【Main類全名稱】-D 【參數名=參數值】 【InputPath】 【OutputPath】
例如:hadoop jar hadoopTest-0.2.jar sun.hadoopTest.SaleManager -D area=BeiJing /tmp/input/050901 /tmp/output/050901
此時系統將選擇 area 等於BeiJing 的數據進行統計
計算結果
三、利用 Partitioner 控制鍵值分配
3.1 深入分析 Partitioner
Partitioner 類在 org.apache.hadoop.mapreduce.Partitioner 中,通過 Job.setPartitionerClass(Class<? extends Partitioner> cls) 方法可綁定自定義的 Partitioner。若用戶沒有實現自定義Partitioner 時,系統將自動綁定 Hadoop 的預設類 org.apache.hadoop.mapreduce.lib.partiton.HashPartitioner 。Partitioner 包含一個主要方法是 int getPartition(K key,V value,int numReduceTasks) ,功能是控制將哪些鍵分配到哪個 Reducer。此方法的返回值是 Reducer 的索引值,若系統定義了4個Reducer,其返回值為0~3。numReduceTasks 側是當前系統的 Reducer 數量,此數量可通過Job.setNumReduceTasks(int tasks) 進行設置,在偽分佈環境下,其預設值為1。
註意:
在單機環境下,系統只會使用一個 Reducer,這將導致 Partitioner 缺乏意義,這也是在本文引言中強調要使用偽分佈環境進行調試的原因 。
通過反編譯查看 HashPartitioner ,可見系統是通過(key.hashCode() & Interger.MAX_VALUE )%numReduceTasks 方法,根據 KEY 的 HashCode 對 Reducer 數量求餘方式,確定數據分配到哪一個 Reducer 進行處理的。但如果想根據用戶自定義的邏輯把數據分配到對應 Reducer,單依靠 HashPartitioner 是無法實現的,此時側需要自定義 Partitioner 。
1 public class HashPartitioner<K, V> extends Partitioner<K, V> { 2 3 public int getPartition(K key, V value, int numReduceTasks) { 4 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 5 } 6 }
3.2 自定義 Partitioner
在例子當中,假設系統需要把北、上、廣、深4個不同的地區的iPhone銷售情況分別交付給不同 Reducer 進行統計處理。我們可以自定義一個 MyPartitioner, 通過 Job.setPartitionerClass( MyPartitioner.class ) 進行綁定。通過 Job.setNumReduceTasks(4) 設置4個Reducer 。以手機類型作為KEY,把銷售數據與地區作為VALUE。在 int getPartition(K key,V value,int numReduceTasks) 方法中,根據 VALUE 值的不同返回不同的索引值。
1 public class Phone { 2 public String type; 3 public Integer count; 4 public String area; 5 6 public Phone(String line){ 7 String[] data=line.split(","); 8 this.type=data[0].toString(); 9 this.count=Integer.valueOf(data[1].toString()); 10 this.area=data[2].toString(); 11 } 12 13 public String getType(){ 14 return this.type; 15 } 16 17 public Integer getCount(){ 18 return this.count; 19 } 20 21 public String getArea(){ 22 return this.area; 23 } 24 } 25 26 public class MyPatitional extends Partitioner<Text,Text> { 27 28 @Override 29 public int getPartition(Text arg0, Text arg1, int arg2) { 30 // TODO 自動生成的方法存根 31 String area=arg1.toString().split(",")[0]; 32 // 根據不同的地區返回不同的索引值 33 if(area.contentEquals("BeiJing")) 34 return 0; 35 if(area.contentEquals("GuangZhou")) 36 return 1; 37 if(area.contentEquals("ShenZhen")) 38 return 2; 39 if(area.contentEquals("ShangHai")) 40 return 3; 41 return 0; 42 } 43 } 44 45 public class SaleManager extends Configured implements Tool{ 46 public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{ 47 48 public void map(LongWritable key,Text value,Context context) 49 throws IOException,InterruptedException{ 50 String data=value.toString(); 51 Phone iPhone=new Phone(data); 52 context.write(new Text(iPhone.getType()), new Text(iPhone.getArea()+","+iPhone.getCount().toString())); 53 } 54 } 55 56 public static class MyReducer extends Reducer<Text,Text,Text,IntWritable>{ 57 58 public void reduce(Text key,Iterable<Text> values,Context context) 59 throws IOException,InterruptedException{ 60 int sum=0; 61 //對同一型號的iPhone數量進行統計 62 for(Text value : values){ 63 String count=value.toString().split(",")[1]; 64 sum+=Integer.valueOf(count).intValue(); 65 } 66 context.write(key, new IntWritable(sum)); 67 } 68 } 69 70 public int run(String[] arg0) throws Exception { 71 // TODO 自動生成的方法存根 72 // TODO Auto-generated method stub 73 Job job=Job.getInstance(getConf()); 74 job.setJarByClass(SaleManager.class); 75 //註冊Key/Value類型為Text 76 job.setOutputKeyClass(Text.class); 77 job.setOutputValueClass(IntWritable.class); 78 //若Map的轉出Key/Value不相同是需要分別註冊 79 job.setMapOutputKeyClass(Text.class); 80 job.setMapOutputValueClass(Text.class); 81 //註冊Mapper及Reducer處理類 82 job.setMapperClass(MyMapper.class); 83 job.setReducerClass(MyReducer.class); 84 //輸入輸出數據格式化類型為TextInputFormat 85 job.setInputFormatClass(TextInputFormat.class); 86 job.setOutputFormatClass(TextOutputFormat.class); 87 //設置Reduce數量為4個,偽分散式情況下不設置時預設為1 88 job.setNumReduceTasks(4); 89 //註冊自定義Partitional類 90 job.setPartitionerClass(MyPatitional.class); 91 //獲取命令參數 92 String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs(); 93 //設置讀入文件路徑 94 FileInputFormat.setInputPaths(job,new Path(args[0])); 95 //設置轉出文件路徑 96 FileOutputFormat.setOutputPath(job,new Path(args[1])); 97 boolean status=job.waitForCompletion(true); 98 if(status) 99 return 0; 100 else 101 return 1; 102 } 103 104 public static void main(String[] args) throws Exception{ 105 Configuration conf=new Configuration(); 106 ToolRunner.run(new SaleManager(), args); 107 } 108 }
計算結果
四、利用 Combiner 提高系統性能
在前面幾節所描述的例子當中,我們都是把所有的數據完整發送到 Reducer 中再作統計。試想一下,在真實環境當中,iPhone 的銷售記錄數以千萬計,如此巨大的數據需要在 Mapper/Reducer 當中進行傳輸,將會耗費多少的網路資源。這麼多年來 iPhone 出品的機型不過十多個,系統能否先針對同類的機型在Mapper端作出初步的聚合計算,再把計算結果發送到 Reducer。如此一來,傳到 Reducer 端的數據量將會大大減少,只要在適當的情形下使用將有利於系統的性能提升。
針對此類問題,Combiner 應運而生,我們可以把 Combiner 看作為一個小型的 Reducer ,它的目的就是在數據傳送到Reducer前在Mapper中作出初步聚集處理,減少伺服器之間的 I/O 數據傳輸壓力。Combiner 也繼承於Reducer,通過Job.setCombinerClass(Class<? extends Reducer> cls) 方法進行註冊。
下麵繼續以第3節的例子作為參考,系統想要在同一個Reducer中計算所有地區不同型號手機的銷售情況。我們可以把地區名作為KEY,把銷售數量和手機類型轉換成 MapWritable 作為 VALUE。當數據輸入後,不是直接把數據傳輸到 Reducer ,而是通過Combiner 把Mapper中不同的型號手機的銷售數量進行聚合計算,把5種型號手機的銷售總數算好後傳輸給Reducer。在Reducer中再把來源於不同 Combiner 的數據進行求和,得出最後結果。
註意 :
MapWritable 是 系統自帶的 Writable 集合類中的其中一個,它實現了 java.util.Map<Writable,Writable> 介面,以單位元組充當類型數據的索引,常用於枚舉集合的元素。
1 public class SaleManager extends Configured implements Tool{ 2 private static IntWritable TYPE=new IntWritable(0); 3 private static IntWritable VALUE=new IntWritable(1); 4 private static IntWritable IPHONE7=new IntWritable(2); 5 private static IntWritable IPHONE7_PLUS=new IntWritable(3); 6 private static IntWritable IPHONE8=new IntWritable(4); 7 private static IntWritable IPHONE8_PLUS=new IntWritable(5); 8 private static IntWritable IPHONEX=new IntWritable(6); 9 10 public static class MyMapper extends Mapper<LongWritable,Text,Text,MapWritable>{ 11 12 public void map(LongWritable key,Text value,Context context) 13 throws IOException,InterruptedException{ 14 String data=value.toString(); 15 Phone iPhone=new Phone(data); 16 context.write(new Text(iPhone.getArea()), getMapWritable(iPhone.getType(), iPhone.getCount())); 17 } 18 19 private MapWritable getMapWritable(String type,Integer count){ 20 Text _type=new Text(type); 21 Text _count=new Text(count.toString()); 22 MapWritable mapWritable=new MapWritable(); 23 mapWritable.put(TYPE,_type); 24 mapWritable.put(VALUE,_count); 25 return mapWritable; 26 } 27 } 28 29 public static class MyCombiner extends Reducer<Text,MapWritable,Text,MapWritable> { 30 public void reduce(Text key,Iterable<MapWritable> values,Context context) 31 throws IOException, InterruptedException{ 32 int iPhone7=0; 33 int iPhone7_PLUS=0; 34 int iPhone8=0; 35 int iPhone8_PLUS=0; 36 int iPhoneX=0; 37 //對同一個Mapper所處理的不同型號的手機數據進行初步統計 38 for(MapWritable value:values){ 39 String type=value.get(TYPE).toString(); 40 Integer count=Integer.valueOf(value.get(VALUE).toString()); 41 if(type.contentEquals("iPhone7")) 42 iPhone7+=count; 43 if(type.contentEquals("iPhone7_PLUS")) 44 iPhone7_PLUS+=count; 45 if(type.contentEquals("iPhone8")) 46 iPhone8+=count; 47 if(type.contentEquals("iPhone8_PLUS")) 48 iPhone8_PLUS+=count; 49 if(type.contentEquals("iPhoneX")) 50 iPhoneX+=count; 51 } 52 MapWritable mapWritable=new MapWritable(); 53 mapWritable.put(IPHONE7, new IntWritable(iPhone7)); 54 mapWritable.put(IPHONE7_PLUS, new IntWritable(iPhone7_PLUS)); 55 mapWritable.put(IPHONE8, new IntWritable(iPhone8)); 56 mapWritable.put(IPHONE8_PLUS, new IntWritable(iPhone8_PLUS)); 57 mapWritable.put(IPHONEX, new IntWritable(iPhoneX)); 58 context.write(key,mapWritable); 59 } 60 } 61 62 public static class MyReducer extends Reducer<Text,MapWritable,Text,Text>{ 63 public void reduce(Text key,Iterable<MapWritable> values,Context context) 64 throws IOException,InterruptedException{ 65 int iPhone7=0; 66 int iPhone7_PLUS=0; 67 int iPhone8=0; 68 int iPhone8_PLUS=0; 69 int iPhoneX=0; 70 71 //對同一地區不同型的iPhone數量進行統計 72 for(MapWritable value : values){ 73 iPhone7+=Integer.parseInt(value.get(IPHONE7).toString()); 74 iPhone7_PLUS+=Integer.parseInt(value.get(IPHONE7_PLUS).toString()); 75 iPhone8+=Integer.parseInt(value.get(IPHONE8).toString()); 76 iPhone8_PLUS+=Integer.parseInt(value.get(IPHONE8_PLUS).toString()); 77 iPhoneX+=Integer.parseInt(value.get(IPHONEX).toString()); 78 } 79 80 StringBuffer data=new StringBuffer(); 81 data.append("iPhone7:"+iPhone7+" "); 82 data.append("iPhone7_PLUS:"+iPhone7_PLUS+" "); 83 data.append("iPhone8:"+iPhone8+" "); 84 data.append("iPhone8_PLUS:"+iPhone8_PLUS+" "); 85 data.append("iPhoneX:"+iPhoneX+" "); 86 context.write(key, new Text(data.toString())); 87 } 88 } 89 90 public int run(String[] arg0) throws Exception { 91 // TODO 自動生成的方法存根 92 // TODO Auto-generated method stub 93 Job job=Job.getInstance(getConf()); 94 job.setJarByClass(SaleManager.class); 95 //註冊Key/Value類型為Text 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(Text.class); 98 //若Map的轉出Key/Value不相同是需要分別註冊 99 job.setMapOutputKeyClass(Text.class); 100 job.setMapOutputValueClass(MapWritable.class); 101 //註冊Mapper及Reducer處理類 102 job.setMapperClass(MyMapper.class); 103 job.setReducerClass(MyReducer.class); 104 //註冊Combiner處理類 105 job.setCombinerClass(MyCombiner.class); 106 //輸入輸出數據格式化類型為TextInputFormat 107 job.setInputFormatClass(TextInputFormat.class); 108 job.setOutputFormatClass(TextOutputFormat.class); 109 //偽分散式情況下不設置時預設為1 110 job.setNumReduceTasks(1); 111 //獲取命令參數 112 String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs(); 113 //設置讀入文件路徑 114 FileInputFormat.setInputPaths(job,new Path(args[0])); 115 //設