寫了關於Hadoop的Map側join 和Reduce的join,今天我們就來在看另外一種比較中立的Join。 SemiJoin,一般稱為半鏈接,其原理是在Map側過濾掉了一些不需要join的數據,從而大大減少了reduce的shffule時間,因為我們知道,如果僅僅使用Reduce側連接,那麼如果 ...
寫了關於Hadoop的Map側join
和Reduce的join,今天我們就來在看另外一種比較中立的Join。
SemiJoin,一般稱為半鏈接,其原理是在Map側過濾掉了一些不需要join的數據,從而大大減少了reduce的shffule時間,因為我們知道,如果僅僅使用Reduce側連接,那麼如果一份數據中,存在大量的無效數據,而這些數據,在join中,並不需要,但是因為沒有做過預處理,所以這些數據,直到真正的執行reduce函數時,才被定義為無效數據,而這時候,前面已經執行過shuffle和merge和sort,所以這部分無效的數據,就浪費了大量的網路IO和磁碟IO,所以在整體來講,這是一種降低性能的表現,如果存在的無效數據越多,那麼這種趨勢,就越明顯。
之所以會出現半連接,這其實也是reduce側連接的一個變種,只不過我們在Map側,過濾掉了一些無效的數據,所以減少了reduce過程的shuffle時間,所以能獲取一個性能的提升。
具體的原理也是利用DistributedCache將小表的的分發到各個節點上,在Map過程的setup函數里,讀取緩存裡面的文件,只將小表的鏈接鍵存儲在hashset里,在map函數執行時,對每一條數據,進行判斷,如果這條數據的鏈接鍵為空或者在hashset裡面不存在,那麼則認為這條數據,是無效的數據,所以這條數據,並不會被partition分區後寫入磁碟,參與reduce階段的shuffle和sort,所以在一定程式上,提升了join性能。需要註意的是如果
小表的key依然非常巨大,可能會導致我們的程式出現OOM的情況,那麼這時候我們就需要考慮其他的鏈接方式了。
測試數據如下:
模擬小表數據:
1,三劫散仙,13575468248
2,鳳舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
模擬大表數據:
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10
代碼如下:
- package com.semijoin;
- import java.io.BufferedReader;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- /***
- *
- * Hadoop1.2的版本
- *
- * hadoop的半鏈接
- *
- * SemiJoin實現
- *
- * @author qindongliang
- *
- * 大數據交流群:376932160
- * 搜索技術交流群:324714439
- *
- *
- *
- * **/
- public class Semjoin {
- /**
- *
- *
- * 自定義一個輸出實體
- *
- * **/
- private static class CombineEntity implements WritableComparable<CombineEntity>{
- private Text joinKey;//連接key
- private Text flag;//文件來源標誌
- private Text secondPart;//除了鍵外的其他部分的數據
- public CombineEntity() {
- // TODO Auto-generated constructor stub
- this.joinKey=new Text();
- this.flag=new Text();
- this.secondPart=new Text();
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public Text getFlag() {
- return flag;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public int compareTo(CombineEntity o) {
- // TODO Auto-generated method stub
- return this.joinKey.compareTo(o.joinKey);
- }
- }
- private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
- private CombineEntity combine=new CombineEntity();
- private Text flag=new Text();
- private Text joinKey=new Text();
- private Text secondPart=new Text();
- /**
- * 存儲小表的key
- *
- *
- * */
- private HashSet<String> joinKeySet=new HashSet<String>();
- @Override
- protected void setup(Context context)throws IOException, InterruptedException {
- //讀取文件流
- BufferedReader br=null;
- String temp;
- // 獲取DistributedCached裡面 的共用文件
- Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
- for(Path p:path){
- if(p.getName().endsWith("a.txt")){
- br=new BufferedReader(new FileReader(p.toString()));
- //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
- while((temp=br.readLine())!=null){
- String ss[]=temp.split(",");
- //map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
- joinKeySet.add(ss[0]);//加入小表的key
- }
- }
- }
- }
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- //獲得文件輸入路徑
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- if(pathName.endsWith("a.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- * 在這裡過濾必須要的連接字元
- *
- * */
- if(joinKeySet.contains(valueItems[0])){
- //設置標誌位
- flag.set("0");
- //設置鏈接鍵
- joinKey.set(valueItems[0]);
- //設置第二部分
- secondPart.set(valueItems[1]+"\t"+valueItems[2]);
- //封裝實體
- combine.setFlag(flag);//標誌位
- combine.setJoinKey(joinKey);//鏈接鍵
- combine.setSecondPart(secondPart);//其他部分
- //寫出
- context.write(combine.getJoinKey(), combine);
- }else{
- System.out.println("a.txt里");
- System.out.println("在小表中無此記錄,執行過濾掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }else if(pathName.endsWith("b.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- *
- * 判斷是否在集合中
- *
- * */
- if(joinKeySet.contains(valueItems[0])){
- //設置標誌位
- flag.set("1");
- //設置鏈接鍵
- joinKey.set(valueItems[0]);
- //設置第二部分註意不同的文件的列數不一樣
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
- //封裝實體
- combine.setFlag(flag);//標誌位
- combine.setJoinKey(joinKey);//鏈接鍵
- combine.setSecondPart(secondPart);//其他部分
- //寫出
- context.write(combine.getJoinKey(), combine);
- }else{
- //執行過濾 ......
- System.out.println("b.txt里");
- System.out.println("在小表中無此記錄,執行過濾掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }
- }
- }
- private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
- //存儲一個分組中左表信息
- private List<Text> leftTable=new ArrayList<Text>();
- //存儲一個分組中右表信息
- private List<Text> rightTable=new ArrayList<Text>();
- private Text secondPart=null;
- private Text output=new Text();
- //一個分組調用一次
- @Override
- protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
- throws IOException, InterruptedException {
- leftTable.clear();//清空分組數據
- rightTable.clear();//清空分組數據
- /**
- * 將不同文件的數據,分別放在不同的集合
- * 中,註意數據量過大時,會出現
- * OOM的異常
- *
- * **/
- for(CombineEntity ce:values){
- this.secondPart=new Text(ce.getSecondPart().toString());
- //左表
- if(ce.getFlag().toString().trim().equals("0")){
- leftTable.add(secondPart);
- }else if(ce.getFlag().toString().trim().equals("1")){
- rightTable.add(secondPart);
- }
- }
- //=====================
- for(Text left:leftTable){
- for(Text right:rightTable){
- output.set(left+"\t"+right);//連接左右數據
- context.write(key, output);//輸出
- }
- }
- }
- }
- public static void main(String[] args)throws Exception {
- //Job job=new Job(conf,"myjoin");
- JobConf conf=new JobConf(Semjoin.class);
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- conf.setJar("tt.jar");
- //小表共用
- String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
- //添加到共用cache里
- DistributedCache.addCacheFile(new URI(bpath), conf);
- Job job=new Job(conf, "aaaaa");
- job.setJarByClass(Semjoin.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- //設置Map和Reduce自定義類
- job.setMapperClass(JMapper.class);
- job.setReducerClass(JReduce.class);
- //設置Map端輸出
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineEntity.class);
- //設置Reduce端的輸出
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileSystem fs=FileSystem.get(conf);
- Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");
- if(fs.exists(op)){
- fs.delete(op, true);
- System.out.println("存在此輸出路徑,已刪除!!!");
- }
- FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
- FileOutputFormat.setOutputPath(job, op);
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
- package com.semijoin;
- import java.io.BufferedReader;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- /***
- *
- * Hadoop1.2的版本
- *
- * hadoop的半鏈接
- *
- * SemiJoin實現
- *
- * @author qindongliang
- *
- * 大數據交流群:376932160
- * 搜索技術交流群:324714439
- *
- *
- *
- * **/
- public class Semjoin {
- /**
- *
- *
- * 自定義一個輸出實體
- *
- * **/
- private static class CombineEntity implements WritableComparable<CombineEntity>{
- private Text joinKey;//連接key
- private Text flag;//文件來源標誌
- private Text secondPart;//除了鍵外的其他部分的數據
- public CombineEntity() {
- // TODO Auto-generated constructor stub
- this.joinKey=new Text();
- this.flag=new Text();
- this.secondPart=new Text();
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public Text getFlag() {
- return flag;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- &