一、概述 對於RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分註意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由於hadoop的分散式設計理念的特殊性,因此對於這種join操作同樣也具備了一定的特殊性。本 ...
一、概述
對於RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分註意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由於hadoop的分散式設計理念的特殊性,因此對於這種join操作同樣也具備了一定的特殊性。本文主要對MapReduce框架對錶之間的join操作的幾種實現方式進行詳細分析,並且根據我在實際開發過程中遇到的實際例子來進行進一步的說明。
二、實現原理
1、在Reudce端進行連接。
在Reudce端進行連接是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然後用連接欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
reduce端的主要工作:在reduce端以連接欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。原理非常簡單,下麵來看一個實例:
(1)自定義一個value返回類型:
- package com.mr.reduceSizeJoin;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- public class CombineValues implements WritableComparable<CombineValues>{
- //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
- private Text joinKey;//鏈接關鍵字
- private Text flag;//文件來源標誌
- private Text secondPart;//除了鏈接鍵外的其他部分
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- public Text getFlag() {
- return flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public CombineValues() {
- this.joinKey = new Text();
- this.flag = new Text();
- this.secondPart = new Text();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public int compareTo(CombineValues o) {
- return this.joinKey.compareTo(o.getJoinKey());
- }
- @Override
- public String toString() {
- // TODO Auto-generated method stub
- return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
- }
- }
(2) map、reduce主體代碼:
- package com.mr.reduceSizeJoin;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- * 用途說明:
- * reudce side join中的left outer join
- * 左連接,兩個文件分別代表2個表,連接欄位table1的id欄位和table2的cityID欄位
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
- * tb_dim_city.dat文件內容,分隔符為"|":
- * id name orderid city_code is_show
- * 0 其他 9999 9999 0
- * 1 長春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 遼源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------風騷的分割線-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件內容,分隔符為"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- *
- * -------------------------風騷的分割線-------------------------------
- * 結果:
- * 1 長春 1 901 1 1 2G 123
- * 1 長春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
- public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
- private CombineValues combineValues = new CombineValues();
- private Text flag = new Text();
- private Text joinKey = new Text();
- private Text secondPart = new Text();
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //獲得文件輸入路徑
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- //數據來自tb_dim_city.dat文件,標誌即為"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 5){
- return;
- }
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }//數據來自於tb_user_profiles.dat,標誌即為"1"
- else if(pathName.endsWith("tb_user_profiles.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 4){
- return;
- }
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }
- }
- }
- public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
- //存儲一個分組中的左表信息
- private ArrayList<Text> leftTable = new ArrayList<Text>();
- //存儲一個分組中的右表信息
- private ArrayList<Text> rightTable = new ArrayList<Text>();
- private Text secondPar = null;
- private Text output = new Text();
- /**
- * 一個分組調用一次reduce函數
- */
- @Override
- protected void reduce(Text key, Iterable<CombineValues> value, Context context)
- throws IOException, InterruptedException {
- leftTable.clear();
- rightTable.clear();
- /**
- * 將分組中的元素按照文件分別進行存放
- * 這種方法要註意的問題:
- * 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM,
- * 在處理分散式問題之前最好先瞭解數據的分佈情況,根據不同的分佈採取最
- * 適當的處理方法,這樣可以有效的防止導致OOM和數據過度傾斜問題。
- */
- for(CombineValues cv : value){
- secondPar = new Text(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles
- else if("1".equals(cv.getFlag().toString().trim())){
- rightTable.add(secondPar);
- }
- }
- logger.info("tb_dim_city:"+leftTable.toString());
- logger.info("tb_user_profiles:"+rightTable.toString());
- for(Text leftPart : leftTable){
- for(Text rightPart : rightTable){
- output.set(leftPart+ "\t" + rightPart);
- context.write(key, output);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //獲得配置文件對象
- Job job=new Job(conf,"LeftOutJoinMR");
- job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
- FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
- job.setMapperClass(LeftOutJoinMapper.class);
- job.setReducerClass(LeftOutJoinReducer.class);
- job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格格式
- //設置map的輸出key和value類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineValues.class);
- //設置reduce的輸出key和value類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
相關閱讀:
《Hadoop偽分散式搭建操作步驟指南》;
《HADOOP的本地庫(NATIVE LIBRARIES)簡介》;
《基於Hadoop大數據分析應用場景與項目實戰演練》