上一篇《MapReduce多種join實現實例分析(一)》,大家可以點擊回顧該篇文章。本文是MapReduce系列第二篇。 一、在Map端進行連接使用場景:一張表十分小、一張表很大。用法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然後從DistributeCache ...
上一篇《MapReduce多種join實現實例分析(一)》,大家可以點擊回顧該篇文章。本文是MapReduce系列第二篇。
一、在Map端進行連接
使用場景:一張表十分小、一張表很大。
用法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然後從DistributeCache中取出該小表進行join key / value解釋分割放到記憶體中(可以放大Hash Map等等容器中)。然後掃描大表,看大表中的每條記錄的join key /value值是否能夠在記憶體中找到相同join key的記錄,如果有則直接輸出結果。
直接上代碼,比較簡單:
package com.mr.mapSideJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author zengzhaozheng * * 用途說明: * Map side join中的left outer join * 左連接,兩個文件分別代表2個表,連接欄位table1的id欄位和table2的cityID欄位 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show), * 假設tb_dim_city文件記錄數很少,tb_dim_city.dat文件內容,分隔符為"|": * id name orderid city_code is_show * 0 其他 9999 9999 0 * 1 長春 1 901 1 * 2 吉林 2 902 1 * 3 四平 3 903 1 * 4 松原 4 904 1 * 5 通化 5 905 1 * 6 遼源 6 906 1 * 7 白城 7 907 1 * 8 白山 8 908 1 * 9 延吉 9 909 1 * -------------------------風騷的分割線------------------------------- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) * tb_user_profiles.dat文件內容,分隔符為"|": * userID network flow cityID * 1 2G 123 1 * 2 3G 333 2 * 3 3G 555 1 * 4 2G 777 3 * 5 3G 666 4 * -------------------------風騷的分割線------------------------------- * 結果: * 1 長春 1 901 1 1 2G 123 * 1 長春 1 901 1 3 3G 555 * 2 吉林 2 902 1 2 3G 333 * 3 四平 3 903 1 4 2G 777 * 4 松原 4 904 1 5 3G 666 */ public class MapSideJoinMain extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> { private HashMap<String,String> city_info = new HashMap<String, String>(); private Text outPutKey = new Text(); private Text outPutValue = new Text(); private String mapInputStr = null; private String mapInputSpit[] = null; private String city_secondPart = null; /** * 此方法在每個task開始之前執行,這裡主要用作從DistributedCache * 中取到tb_dim_city文件,並將裡邊記錄取出放到記憶體中。 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; //獲得當前作業的DistributedCache相關文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String cityInfo = null; for(Path p : distributePaths){ if(p.toString().endsWith("tb_dim_city.dat")){ //讀緩存文件,並放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(cityInfo=br.readLine())){ String[] cityPart = cityInfo.split("\\|",5); if(cityPart.length ==5){ city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]); } } } } } /** * Map端的實現相當簡單,直接判斷tb_user_profiles.dat中的 * cityID是否存在我的map中就ok了,這樣就可以實現Map Join了 */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //排掉空行 if(value == null || value.toString().equals("")){ return; } mapInputStr = value.toString(); mapInputSpit = mapInputStr.split("\\|",4); //過濾非法記錄 if(mapInputSpit.length != 4){ return; } //判斷鏈接欄位是否在map中存在 city_secondPart = city_info.get(mapInputSpit[3]); if(city_secondPart != null){ this.outPutKey.set(mapInputSpit[3]); this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]); context.write(outPutKey, outPutValue); } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); //獲得配置文件對象 DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//為該job添加緩存文件 Job job=new Job(conf,"MapJoinMR"); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑 job.setJarByClass(MapSideJoinMain.class); job.setMapperClass(LeftOutJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格式 //設置map的輸出key和value類型 job.setMapOutputKeyClass(Text.class); //設置reduce的輸出key和value類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new MapSideJoinMain(),args); System.exit(returnCode); } catch (Exception e) { // TODO Auto-generated catch block logger.error(e.getMessage()); } } }
這裡說說DistributedCache。DistributedCache是分散式緩存的一種實現,它在整個MapReduce框架中起著相當重要的作用,他可以支撐我們寫一些相當複雜高效的分散式程式。說回到這裡,JobTracker在作業啟動之前會獲取到DistributedCache的資源uri列表,並將對應的文件分發到各個涉及到該作業的任務的TaskTracker上。另外,關於DistributedCache和作業的關係,比如許可權、存儲路徑區分、public和private等屬性,接下來有用再整理研究一下寫一篇blog,這裡就不詳細說了。
另外還有一種比較變態的Map Join方式,就是結合HBase來做Map Join操作。這種方式完全可以突破記憶體的控制,使你毫無忌憚的使用Map Join,而且效率也非常不錯。
二、SemiJoin
SemiJoin就是所謂的半連接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數據,在網路中只傳輸參與連接的數據不參與連接的數據不必在網路中進行傳輸,從而減少了shuffle的網路傳輸量,使整體效率得到提高,其他思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發到相關節點,然後將其取出放到記憶體中(可以放到HashSet中),在map階段掃描連接表,將join key不在記憶體HashSet中的記錄過濾掉,讓那些參與join的記錄通過shuffle傳輸到reduce端進行join操作,其他的和reduce join都是一樣的。
看代碼:
package com.mr.SemiJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author zengzhaozheng * * 用途說明: * reudce side join中的left outer join * 左連接,兩個文件分別代表2個表,連接欄位table1的id欄位和table2的cityID欄位 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show) * tb_dim_city.dat文件內容,分隔符為"|": * id name orderid city_code is_show * 0 其他 9999 9999 0 * 1 長春 1 901 1 * 2 吉林 2 902 1 * 3 四平 3 903 1 * 4 松原 4 904 1 * 5 通化 5 905 1 * 6 遼源 6 906 1 * 7 白城 7 907 1 * 8 白山 8 908 1 * 9 延吉 9 909 1 * -------------------------風騷的分割線------------------------------- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) * tb_user_profiles.dat文件內容,分隔符為"|": * userID network flow cityID * 1 2G 123 1 * 2 3G 333 2 * 3 3G 555 1 * 4 2G 777 3 * 5 3G 666 4 * -------------------------風騷的分割線------------------------------- * joinKey.dat內容: * city_code * 1 * 2 * 3 * 4 * -------------------------風騷的分割線------------------------------- * 結果: * 1 長春 1 901 1 1 2G 123 * 1 長春 1 901 1 3 3G 555 * 2 吉林 2 902 1 2 3G 333 * 3 四平 3 903 1 4 2G 777 * 4 松原 4 904 1 5 3G 666 */ public class SemiJoin extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class); public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> { private CombineValues combineValues = new CombineValues(); private HashSet<String> joinKeySet = new HashSet<String>(); private Text flag = new Text(); private Text joinKey = new Text(); private Text secondPart = new Text(); /** * 將參加join的key從DistributedCache取出放到記憶體中,以便在map端將要參加join的key過濾出來。b */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; //獲得當前作業的DistributedCache相關文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String joinKeyStr = null; for(Path p : distributePaths){ if(p.toString().endsWith("joinKey.dat")){ //讀緩存文件,並放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(joinKeyStr=br.readLine())){ joinKeySet.add(joinKeyStr); } } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //獲得文件輸入路徑 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); //數據來自tb_dim_city.dat文件,標誌即為"0" if(pathName.endsWith("tb_dim_city.dat")){ String[] valueItems = value.toString().split("\\|"); //過濾格式錯誤的記錄 if(valueItems.length != 5){ return; } //過濾掉不需要參加join的記錄 if(joinKeySet.contains(valueItems[0])){ flag.set("0"); joinKey.set(valueItems[0]); secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }else{ return ; } }//數據來自於tb_user_profiles.dat,標誌即為"1" else if(pathName.endsWith("tb_user_profiles.dat")){ String[] valueItems = value.toString().split("\\|"); //過濾格式錯誤的記錄 if(valueItems.length != 4){ return; } //過濾掉不需要參加join的記錄 if(joinKeySet.contains(valueItems[3])){ flag.set("1"); joinKey.set(valueItems[3]); secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }else{ return ; } } } } public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> { //存儲一個分組中的左表信息 private ArrayList<Text> leftTable = new ArrayList<Text>(); //存儲一個分組中的右表信息 private ArrayList<Text> rightTable = new ArrayList<Text>(); private Text secondPar = null; private Text output = new Text(); /** * 一個分組調用一次reduce函數 */ @Override protected void reduce(Text key, Iterable<CombineValues> value, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); /** * 將分組中的元素按照文件分別進行存放 * 這種方法要註意的問題: * 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM, * 在處理分散式問題之前最好先瞭解數據的分佈情況,根據不同的分佈採取最 * 適當的處理方法,這樣可以有效的防止導致OOM和數據過度傾斜問題。 */ for(CombineValues cv : value){ secondPar = new Text(cv.getSecondPart().toString()); //左表tb_dim_city if("0".equals(cv.getFlag().toString().trim())){ leftTable.add(secondPar); } //右表tb_user_profiles else if("1".equals(cv.getFlag().toString().trim())){ rightTable.add(secondPar); } } logger.info("tb_dim_city:"+leftTable.toString()); logger.info("tb_user_profiles:"+rightTable.toString()); for(Text leftPart : leftTable){ for(Text rightPart : rightTable){ output.set(leftPart+ "\t" + rightPart); context.write(key, output); } } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); //獲得配置文件對象 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); Job job=new Job(conf,"LeftOutJoinMR"); job.setJarByClass(SemiJoin.class); FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑 job.setMapperClass(SemiJoinMapper.class); job.setReducerClass(SemiJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格式 //設置map的輸出key和value類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //設置reduce的輸出key和value類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new SemiJoin(),args); System.exit(returnCode); } catch (Exception e) { logger.error(e.getMessage()); } } }
這裡還說說SemiJoin也是有一定的適用範圍的,其抽取出來進行join的key是要放到記憶體中的,所以不能夠太大,容易在Map端造成OOM。
三、總結
blog介紹了三種join方式。這三種join方式適用於不同的場景,其處理效率上的相差還是蠻大的,其中主要導致因素是網路傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分散式大數據處理程式的時最好要對整體要處理的數據分佈情況作一個瞭解,這可以提高我們代碼的效率,使數據的傾斜度降到最低,使我們的代碼傾向性更好。
本文寫作過程中參考了上海尚學堂相關技術文章,在此感謝上海尚學堂老師的幫助。