Mapjoin和Reducejoin案例

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/04/08/10668306.html
-Advertisement-
Play Games

一、Mapjoin案例 1.需求:有兩個文件,分別是訂單表、商品表, 訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表), 商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於載入到記憶體), 要求結果文件為在訂單表中的每一行最後添加商品id對應的商品名稱。 2.解決思 ...


一、Mapjoin案例

  1.需求:有兩個文件,分別是訂單表、商品表,

  訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表),

  商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於載入到記憶體),

  要求結果文件為在訂單表中的每一行最後添加商品id對應的商品名稱。

  2.解決思路:

  將商品表載入到記憶體中,然後再map方法中將訂單表中的商品id對應的商品名稱添加到該行的最後,不需要Reducer,併在Driver執行類中設置setCacheFile和numReduceTask。

  3.代碼如下:

public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	HashMap<String, String> pdMap = new HashMap<>();
	//1.商品表載入到記憶體
	protected void setup(Context context) throws IOException {
		
		//載入緩存文件
		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));
		
		String line;
		
		while(StringUtils.isNotEmpty(line = br.readLine()) ) {
			
			//切分
			String[] fields = line.split("\t");
			
			//緩存
			pdMap.put(fields[0], fields[1]);
			
		}
		
		br.close();
	
	}
		
		
		
	//2.map傳輸
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//獲取數據
		String line = value.toString();
		
		//切割
		String[] fields = line.split("\t");
		
		//獲取訂單中商品id
		String pid = fields[1];
		
		//根據訂單商品id獲取商品名
		String pName = pdMap.get(pid);
		
		//拼接數據
		line = line + "\t" + pName;
		
		//輸出
		context.write(new Text(line), NullWritable.get());
	}
}

public class CacheDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		// 1.獲取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2.獲取jar包
		job.setJarByClass(CacheDriver.class);

		// 3.獲取自定義的mapper與reducer類
		job.setMapperClass(CacheMapper.class);

		// 5.設置reduce輸出的數據類型(最終的數據類型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 6.設置輸入存在的路徑與處理後的結果路徑
		FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));
		FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));
		
		//載入緩存商品數據
		job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));
		
		//設置一下reducetask的數量
		job.setNumReduceTasks(0);

		// 7.提交任務
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs ? 0 : 1);
	}
}

  

二、Reducejoin案例

  1.需求:同上的兩個數據文件,要求將訂單表中的商品id替換成對應的商品名稱。

  2.解決思路:封裝TableBean類,包含屬性:時間、商品id、訂單id、商品名稱、flag(flag用來判斷是哪張表),

    使用Mapper讀兩張表,通過context對象獲取切片對象,然後通過切片獲取切片名稱和路徑的字元串來判斷是哪張表,再將切片的數據封裝到TableBean對象,最後以產品id為key、TableBean對象為value傳輸到Reducer端;

    Reducer接收數據後通過flag判斷是哪張表,因為一個reduce中的所有數據的key是相同的,將商品表的商品id和商品名稱讀入到一個TableBean對象中,然後將訂單表的中的數據讀入到TableBean類型的ArrayList對象中,然後將ArrayList中的每個TableBean的商品id替換為商品名稱,然後遍歷該數組以TableBean為key輸出。

  3.代碼如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/30, 2:37
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TableBean implements Writable {
    private String timeStamp;
    private String productId;
    private String orderId;
    private String productName;
    private String flag;

    public TableBean() {
    }

    public String getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(String timeStamp) {
        this.timeStamp = timeStamp;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(timeStamp);
        out.writeUTF(productId);
        out.writeUTF(orderId);
        out.writeUTF(productName);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        timeStamp = in.readUTF();
        productId = in.readUTF();
        orderId = in.readUTF();
        productName = in.readUTF();
        flag = in.readUTF();
    }

    @Override
    public String toString() {
        return timeStamp + "\t" + productName + "\t" + orderId;
    }
}


public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //通過切片獲取文件信息
        FileSplit split = (FileSplit) context.getInputSplit();
        String name = split.getPath().getName();

        //獲取一行數據、定義TableBean對象
        String line = value.toString();
        TableBean tb = new TableBean();
        Text t = new Text();

        //判斷是哪一張表
        if (name.contains("order.txt")){
            String[] fields = line.split("\t");
            tb.setTimeStamp(fields[0]);
            tb.setProductId(fields[1]);
            tb.setOrderId(fields[2]);
            tb.setProductName("");
            tb.setFlag("0");
            t.set(fields[1]);
        }else {
            String[] fields = line.split("\t");
            tb.setTimeStamp("");
            tb.setProductId(fields[0]);
            tb.setOrderId("");
            tb.setProductName(fields[1]);
            tb.setFlag("1");
            t.set(fields[0]);
        }
        context.write(t,tb);
    }
}

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //分別創建用來存儲訂單表和產品表的集合
        ArrayList<TableBean> orderBean = new ArrayList<>();
        TableBean productBean = new TableBean();

        //遍歷values,通過flag判斷是產品表還是訂單表
        for (TableBean v:values){
            if (v.getFlag().equals("0")){
                TableBean tableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tableBean,v);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBean.add(tableBean);
            }else {
                try {
                    BeanUtils.copyProperties(productBean,v);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //拼接表
        for (TableBean ob:orderBean) {
            ob.setProductName(productBean.getProductName());
            context.write(ob,NullWritable.get());
        }
    }
}

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //jar包
        job.setJarByClass(TableDriver.class);

        //Mapper、Reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        //Mapper輸出數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        //Reducer輸出數據類型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        //輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out"));

        //提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }
    }
}

  

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這個pxe-e55" 錯誤表示 pxe 客戶端已向埠4011上的代理 dhcp 伺服器發送請求, 但未收到答覆。似乎只有在dhcp 伺服器上設置了 dhcp 類標識符選項 #60, 但同一臺電腦上沒有在埠4011上運行的代理 dhcp 服務時,才會出現此問題。 如果在不同的電腦上操作 dhc ...
  • DNS分離解析技術 yum install bind-chroot systemctl restart named systemctl enable named vim /etc/named.conf vim /etc/named.rfc1912.zones 配置網卡 cd /var/named/ ...
  • Linux終端 進入編輯IP地址命令:vi /etc/sysconfig/network-scripts/ifcfg-eth0 按鍵“i”:進行編輯 按鍵“ESC”:退出編輯 按鍵“:”:輸入wq,退出 重啟網路服務命令:/etc/init.d/network restart 虛擬終端 minget ...
  • 一、NFS(網路文件系統,實現linux系統上文件共用) 伺服器配置 yum install nfs-utils (安裝NFS軟體包) iptables -F (清空防火牆) service iptables save (保存防火牆配置) mkdir /nfsfile (創建共用文件夾) chmod ...
  • 簡單的ss搭建腳本 #!/bin/bash #if [ "$(yum search all python-setuptools | grep python-setuptools| wc -l)" -lt "2" ]; then #if [ "${1}" == "log" ];thenlogger - ...
  • Ansible是一款優秀的自動化IT運維工具,具有遠程安裝、遠程部署應用、遠程管理能力,支持Windows、Linux、Unix、macOS和大型機等多種操作系統。 這篇隨筆以CentOS 7.6為主機操作系統,演示Ansible工具的安裝過程,將一個本地應用例子安裝到遠程主機,併在遠程主機上運行應... ...
  • 我們在ifconfig 查看網卡配置時或者嵌入式開發的時候,經常會看到rx/tx縮寫,其含義如下: RX==receive,接收,從開啟到現在接收封包的情況,是下行流量。 TX==Transmit,發送,從開啟到現在發送封包的情況,是上行流量。 保持更新,轉載請註明出處。 ...
  • 有些工具,值得學習學習: 網路 iftop IO iotop 系統 top htop iftop iotop top htop 保持更新,轉載請註明出處。 https://www.cnblogs.com/xuyaowen/p/linux-performance-tools.html ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...