MapReduce多種join實現實例分析(一)

来源:http://www.cnblogs.com/shsxt/archive/2017/12/15/8043904.html
-Advertisement-
Play Games

一、概述 對於RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分註意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由於hadoop的分散式設計理念的特殊性,因此對於這種join操作同樣也具備了一定的特殊性。本 ...


一、概述 

 
 
對於RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分註意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由於hadoop的分散式設計理念的特殊性,因此對於這種join操作同樣也具備了一定的特殊性。本文主要對MapReduce框架對錶之間的join操作的幾種實現方式進行詳細分析,並且根據我在實際開發過程中遇到的實際例子來進行進一步的說明。
 
 
二、實現原理



1、在Reudce端進行連接。
在Reudce端進行連接是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然後用連接欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
reduce端的主要工作:在reduce端以連接欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。原理非常簡單,下麵來看一個實例:
(1)自定義一個value返回類型:
  1. package com.mr.reduceSizeJoin;   
  2. import java.io.DataInput;   
  3. import java.io.DataOutput;   
  4. import java.io.IOException;   
  5. import org.apache.hadoop.io.Text;   
  6. import org.apache.hadoop.io.WritableComparable;   
  7. public class CombineValues implements WritableComparable<CombineValues>{   
  8.     //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class); 
  9.     private Text joinKey;//鏈接關鍵字 
  10.     private Text flag;//文件來源標誌 
  11.     private Text secondPart;//除了鏈接鍵外的其他部分 
  12.     public void setJoinKey(Text joinKey) {   
  13.         this.joinKey = joinKey;   
  14.     }   
  15.     public void setFlag(Text flag) {   
  16.         this.flag = flag;   
  17.     }   
  18.     public void setSecondPart(Text secondPart) {   
  19.         this.secondPart = secondPart;   
  20.     }   
  21.     public Text getFlag() {   
  22.         return flag;   
  23.     }   
  24.     public Text getSecondPart() {   
  25.         return secondPart;   
  26.     }   
  27.     public Text getJoinKey() {   
  28.         return joinKey;   
  29.     }   
  30.     public CombineValues() {   
  31.         this.joinKey =  new Text();   
  32.         this.flag = new Text();   
  33.         this.secondPart = new Text();   
  34.     }
  35.  
  36.     @Override
  37.     public void write(DataOutput out) throws IOException {   
  38.         this.joinKey.write(out);   
  39.         this.flag.write(out);   
  40.         this.secondPart.write(out);   
  41.     }   
  42.     @Override
  43.     public void readFields(DataInput in) throws IOException {   
  44.         this.joinKey.readFields(in);   
  45.         this.flag.readFields(in);   
  46.         this.secondPart.readFields(in);   
  47.     }   
  48.     @Override
  49.     public int compareTo(CombineValues o) {   
  50.         return this.joinKey.compareTo(o.getJoinKey());   
  51.     }   
  52.     @Override
  53.     public String toString() {   
  54.         // TODO Auto-generated method stub 
  55.         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
  56.     }   
 
(2) map、reduce主體代碼:
  1. package com.mr.reduceSizeJoin;   
  2. import java.io.IOException;   
  3. import java.util.ArrayList;   
  4. import org.apache.hadoop.conf.Configuration;   
  5. import org.apache.hadoop.conf.Configured;   
  6. import org.apache.hadoop.fs.Path;   
  7. import org.apache.hadoop.io.Text;   
  8. import org.apache.hadoop.mapreduce.Job;   
  9. import org.apache.hadoop.mapreduce.Mapper;   
  10. import org.apache.hadoop.mapreduce.Reducer;   
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  16. import org.apache.hadoop.util.Tool;   
  17. import org.apache.hadoop.util.ToolRunner;   
  18. import org.slf4j.Logger;   
  19. import org.slf4j.LoggerFactory;   
  20. /** 
  21.  * @author zengzhaozheng 
  22.  * 用途說明: 
  23.  * reudce side join中的left outer join 
  24.  * 左連接,兩個文件分別代表2個表,連接欄位table1的id欄位和table2的cityID欄位 
  25.  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show) 
  26.  * tb_dim_city.dat文件內容,分隔符為"|": 
  27.  * id     name  orderid  city_code  is_show 
  28.  * 0       其他        9999     9999         0 
  29.  * 1       長春        1        901          1 
  30.  * 2       吉林        2        902          1 
  31.  * 3       四平        3        903          1 
  32.  * 4       松原        4        904          1 
  33.  * 5       通化        5        905          1 
  34.  * 6       遼源        6        906          1 
  35.  * 7       白城        7        907          1 
  36.  * 8       白山        8        908          1 
  37.  * 9       延吉        9        909          1 
  38.  * -------------------------風騷的分割線------------------------------- 
  39.  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 
  40.  * tb_user_profiles.dat文件內容,分隔符為"|": 
  41.  * userID   network     flow    cityID 
  42.  * 1           2G       123      1 
  43.  * 2           3G       333      2 
  44.  * 3           3G       555      1 
  45.  * 4           2G       777      3 
  46.  * 5           3G       666      4 
  47.  * 
  48.  * -------------------------風騷的分割線------------------------------- 
  49.  *  結果: 
  50.  *  1   長春  1   901 1   1   2G  123 
  51.  *  1   長春  1   901 1   3   3G  555 
  52.  *  2   吉林  2   902 1   2   3G  333 
  53.  *  3   四平  3   903 1   4   2G  777 
  54.  *  4   松原  4   904 1   5   3G  666 
  55.  */
  56. public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
  57.     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
  58.     public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
  59.         private CombineValues combineValues = new CombineValues();   
  60.         private Text flag = new Text();   
  61.         private Text joinKey = new Text();   
  62.         private Text secondPart = new Text();   
  63.         @Override
  64.         protected void map(Object key, Text value, Context context)   
  65.                 throws IOException, InterruptedException {   
  66.             //獲得文件輸入路徑 
  67.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
  68.             //數據來自tb_dim_city.dat文件,標誌即為"0" 
  69.             if(pathName.endsWith("tb_dim_city.dat")){   
  70.                 String[] valueItems = value.toString().split("\\|");   
  71.                 //過濾格式錯誤的記錄 
  72.                 if(valueItems.length != 5){   
  73.                     return;   
  74.                 }   
  75.                 flag.set("0");   
  76.                 joinKey.set(valueItems[0]);   
  77.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
  78.                 combineValues.setFlag(flag);   
  79.                 combineValues.setJoinKey(joinKey);   
  80.                 combineValues.setSecondPart(secondPart);   
  81.                 context.write(combineValues.getJoinKey(), combineValues);
  82.  
  83.                 }//數據來自於tb_user_profiles.dat,標誌即為"1" 
  84.             else if(pathName.endsWith("tb_user_profiles.dat")){   
  85.                 String[] valueItems = value.toString().split("\\|");   
  86.                 //過濾格式錯誤的記錄 
  87.                 if(valueItems.length != 4){   
  88.                     return;   
  89.                 }   
  90.                 flag.set("1");   
  91.                 joinKey.set(valueItems[3]);   
  92.                 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
  93.                 combineValues.setFlag(flag);   
  94.                 combineValues.setJoinKey(joinKey);   
  95.                 combineValues.setSecondPart(secondPart);   
  96.                 context.write(combineValues.getJoinKey(), combineValues);   
  97.             }   
  98.         }   
  99.     }   
  100.     public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
  101.         //存儲一個分組中的左表信息 
  102.         private ArrayList<Text> leftTable = new ArrayList<Text>();   
  103.         //存儲一個分組中的右表信息 
  104.         private ArrayList<Text> rightTable = new ArrayList<Text>();   
  105.         private Text secondPar = null;   
  106.         private Text output = new Text();   
  107.         /** 
  108.          * 一個分組調用一次reduce函數 
  109.          */
  110.         @Override
  111.         protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
  112.                 throws IOException, InterruptedException {   
  113.             leftTable.clear();   
  114.             rightTable.clear();   
  115.             /** 
  116.              * 將分組中的元素按照文件分別進行存放 
  117.              * 這種方法要註意的問題: 
  118.              * 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM, 
  119.              * 在處理分散式問題之前最好先瞭解數據的分佈情況,根據不同的分佈採取最 
  120.              * 適當的處理方法,這樣可以有效的防止導致OOM和數據過度傾斜問題。 
  121.              */
  122.             for(CombineValues cv : value){   
  123.                 secondPar = new Text(cv.getSecondPart().toString());   
  124.                 //左表tb_dim_city 
  125.                 if("0".equals(cv.getFlag().toString().trim())){   
  126.                     leftTable.add(secondPar);   
  127.                 }   
  128.                 //右表tb_user_profiles 
  129.                 else if("1".equals(cv.getFlag().toString().trim())){   
  130.                     rightTable.add(secondPar);   
  131.                 }   
  132.             }   
  133.             logger.info("tb_dim_city:"+leftTable.toString());   
  134.             logger.info("tb_user_profiles:"+rightTable.toString());   
  135.             for(Text leftPart : leftTable){   
  136.                 for(Text rightPart : rightTable){   
  137.                     output.set(leftPart+ "\t" + rightPart);   
  138.                     context.write(key, output);   
  139.                 }   
  140.             }   
  141.         }   
  142.     }   
  143.     @Override
  144.     public int run(String[] args) throws Exception {   
  145.           Configuration conf=getConf(); //獲得配置文件對象 
  146.             Job job=new Job(conf,"LeftOutJoinMR");   
  147.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  148.             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 
  149.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
  150.             job.setMapperClass(LeftOutJoinMapper.class);   
  151.             job.setReducerClass(LeftOutJoinReducer.class);
  152.             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 
  153.             job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格格式
  154.  
  155.             //設置map的輸出key和value類型 
  156.             job.setMapOutputKeyClass(Text.class);   
  157.             job.setMapOutputValueClass(CombineValues.class);
  158.  
  159.             //設置reduce的輸出key和value類型 
  160.             job.setOutputKeyClass(Text.class);   
  161.             job.setOutputValueClass(Text.class);   
  162.             job.waitForCompletion(true);   
  163.             return job.isSuccessful()?0:1;   
  164.     }   
  165.     public static void main(String[] args) throws IOException,   
  166.             ClassNotFoundException, InterruptedException {   
  167.         try {   
  168.             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
  169.             System.exit(returnCode);   
  170.         } catch (Exception e) {   
  171.             // TODO Auto-generated catch block 
  172.             logger.error(e.getMessage());   
  173.         }   
  174.     }   
其中具體的分析以及數據的輸出輸入請看代碼中的註釋已經寫得比較清楚了,這裡主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,我們可以很明顯的看出原:因為整體數據被分割了,每個map task只處理一部分數據而不能夠獲取到所有需要的join欄位,因此我們需要在講join key作為reduce端的分組將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。

相關閱讀:
Hadoop偽分散式搭建操作步驟指南》;
HADOOP的本地庫(NATIVE LIBRARIES)簡介》;
基於Hadoop大數據分析應用場景與項目實戰演練》  
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、MVC簡介 MVC:Model-View-Controller(模型-視圖-控制器),MVC是一種軟體開發架構模式。 1、模型(Model) 模型對象是實現應用程式數據域邏輯的應用程式部件。 通常,模型對象會檢索模型狀態並將其存儲在資料庫中。 例如,Product 對象可能會從資料庫中檢索信息, ...
  • 背水一戰 Windows 10 之 控制項(自定義控制項): 自定義控制項的基礎知識,依賴屬性和附加屬性 ...
  • 1.在之前第36章里,我們學習了通過驅動的oops定位錯誤代碼行 第36章的oops代碼如下所示: 1.1那為什麼在上一章,我們用錯誤的應用程式,卻沒有列印oops,如下圖所示: 接下來,我們便來配置內核,從而列印應用程式的oops 2.首先來搜索oops里的:Unable to handle ke ...
  • 前言 使用VMware安裝虛擬機這個一般都知道,操作簡單。而本文主要講使用虛擬機的後續相關配置。並記錄使用過程中遇到的問題以及一些技巧。本篇文章以後回持續更新的。。。 安裝包准備 VM:12 Linux:CentOS 7.0 百度雲盤: 鏈接:https://pan.baidu.com/s/1geE ...
  • Win10筆記本如何禁用觸摸板呢?Win10筆記本如何設置“插入滑鼠自動禁止觸摸板功能”呢?雖然筆記本觸摸板在一定程度上可以方便我們的 操作,但是在以滑鼠和鍵盤做為重要的輸入設備的情況下,筆記本觸摸板有時由於觸摸、誤按等操作,導致造成一些不必要的後果。對此我們可以通過 以下方法實現筆記本觸摸板的禁用 ...
  • 最近突然想搭一個redis集群玩玩,因為公司的電腦同時開2個虛擬機就卡的不行,所以我就想到用Docker開啟多個redis-server來搭建。然後在網上找著找著發現,使用Docker,哪需要搭建啊,直接Docker pull一個鏡像就OK了。加上之前使用Docker,五分鐘搭建一個類似github ...
  • 會提到:“安裝程式無法與下載伺服器聯繫。請提供 Microsoft 機器學習伺服器安裝文件的位置,然後單擊“下一步”。可從以下位置下載安裝文件” 的解決方案 安裝過程和2016大體一致,機器學習這款更完善了。(其他錯誤看看往期的解決吧:http://www.cnblogs.com/dunitian/ ...
  • SQL Server 的全文搜索(Full-Text Search)是基於分詞的文本檢索功能,依賴於全文索引。全文索引不同於傳統的平衡樹(B-Tree)索引和列存儲索引,它是由數據表構成的,稱作倒轉索引(Invert Index),存儲分詞和行的唯一鍵的映射關係。倒轉索引是在創建全文索引或更新全文索 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...