輔助排序和Mapreduce整體流程

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/03/28/10617937.html
-Advertisement-
Play Games

一、輔助排序 需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。 思路:1.封裝訂單類OrderBean,實現WritableComparable介面; 2.自定義Mapp ...


一、輔助排序

  需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。

  思路:1.封裝訂單類OrderBean,實現WritableComparable介面;

     2.自定義Mapper類,確定輸入輸出數據類型,寫業務邏輯;

     3.自定義分區,根據不同的訂單id返回不同的分區值;

     4.自定義Reducer類;

     5.輔助排序類OrderGroupingComparator繼承WritableComparator類,並定義無參構成方法、重寫compare方法;

     6.書寫Driver類;

  代碼如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/25, 21:42
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class OrderBean implements WritableComparable<OrderBean> {
    private int orderId;
    private double orderPrice;

    public OrderBean() {
    }

    public OrderBean(int orderId, double orderPrice) {
        this.orderId = orderId;
        this.orderPrice = orderPrice;
    }

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public double getOrderPrice() {
        return orderPrice;
    }

    public void setOrderPrice(double orderPrice) {
        this.orderPrice = orderPrice;
    }

    @Override
    public String toString() {
        return orderId + "\t" + orderPrice;
    }

    @Override
    public int compareTo(OrderBean o) {
        int rs ;
        if (this.orderId > o.getOrderId()){
            rs = 1;
        }else if (this.orderId < o.getOrderId()){
            rs = -1;
        }else {
            rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
        }
        return rs;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(orderId);
        out.writeDouble(orderPrice);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        orderId = in.readInt();
        orderPrice = in.readDouble();
    }
}

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取數據
        String line = value.toString();

        //切割數據
        String[] fields = line.split("\t");

        //封裝數據
        int orderId = Integer.parseInt(fields[0]);
        double orderPrice = Double.parseDouble(fields[2]);
        OrderBean orderBean = new OrderBean(orderId, orderPrice);

        //發送數據
        context.write(orderBean,NullWritable.get());
    }
}

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //構造參數中i的值為reducetask的個數
        return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
    }
}

public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

public class OrderGrouptingComparator extends WritableComparator {
    //必須使用super調用父類的構造方法來定義對比的類為OrderBean
    protected OrderGrouptingComparator(){
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean)a;
        OrderBean bBean = (OrderBean)b;

        int rs ;
        if (aBean.getOrderId() > bBean.getOrderId()){
            rs = 1;
        }else if (aBean.getOrderId() < bBean.getOrderId()){
            rs = -1;
        }else {
            rs = 0;
        }
        return rs;
    }
}

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //配置信息,Job對象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //執行類
        job.setJarByClass(OrderBean.class);

        //設置Mapper、Reducer類
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        //設置Mapper輸出數據類型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //設置Reducer輸出數據類型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        //設置輔助排序
        job.setGroupingComparatorClass(OrderGrouptingComparator.class);

        //設置分區類
        job.setPartitionerClass(OrderPartitioner.class);

        //設置reducetask數量
        job.setNumReduceTasks(3);

        //設置文件輸入輸出流
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));

        //提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }
    }
}

  由於這是敲了很多次的代碼,沒有加太多註釋,請諒解!

 

二、Mapreduce整體的流程

  1.有一塊200M的文本文件,首先將待處理的數據提交客戶端;

  2.客戶端會向Yarn平臺提交切片信息,然後Yarn計算出所需要的maptask的數量為2;

  3.程式預設使用FileInputFormat的TextInputFormat方法將文件數據讀到maptask;

  4.maptask運行業務邏輯,然後將數據通過InputOutputContext寫入到環形緩衝區;

  5.環形緩衝區其實是記憶體開闢的一塊空間,就是記憶體,當環形緩衝區內數據達到預設大小100M的80%時,發生溢寫;

  6.溢寫出的數據會進行多次的分區排序(shuffle機制,下一個隨筆詳細解釋)

  7.分區排序後的數據塊可以選擇進行Combiner合併,然後寫入本地磁碟;

  8.reducetask等maptask完全運行完畢後,開始從磁碟中讀取maptask產出寫出的數據,然後進行合併文件,歸併排序(這時就是進行上面輔助排序的時候);

  9.Reducer一次讀取一組數據,然後使用預設的TextOutputFormat方法將數據寫出到結果文件。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、vstapd配置 vsftpd 服務(a、匿名公開 b、系統本地賬戶驗證c、虛擬專用用戶驗證) iptables -F (清空防火牆) service iptables save (保存防火牆配置) yum install vsftpd systemctl restart vsftpd syst ...
  • FTP服務安裝與埠說明 FTP埠修改安裝部署windowswindows 2012文件服務 1. FTP服務介紹 1.1 什麼是FTP FTP(File Transfer Protocol)是文件傳送協議的英文縮寫,是用於Internet上的控制文件的雙向傳輸的協議。同時,它也是一個應用程式。用 ...
  • 下載redis_exporter插件 代理插件不一定非要安裝在redis端 解壓 啟動redis_exporter登陸redis redis預設埠是6379 查看redis_exporte是否開啟 修改prometheus配置文件 重啟prometheus grafana配置 配置promethe ...
  • Linux 發展史 Linux 是一種完全免費並對全世界開放源碼的操作系統,其火熱程度源於其理念--open、share、free,人們可以自由的安裝,並可以修改和完善軟體的源程式。這一切要歸功於Linux最初的設計者——Linus Torvalds,是他將Linux這個偉大的作品無償的獻給了世界, ...
  • 停止或者重新啟動Apache有兩種發送信號的方法 第一種方法: 直接使用linux的kill命令向運行中的進程發送信號。你也許你會註意到你的系統里運行著很多httpd進程。但你不應該直接對它們中的任何一個發送信號,而只要對已經在PidFile中記載下了自身PID的父進程發送信號。也就是說,你不必對父 ...
  • 一、shuffle機制概述 shuffle機制就是發生在MR程式中,Mapper之後,Reducer之前的一系列分區排序的操作。shuffle的作用是為了保證Reducer收到的數據都是按鍵排序的。 二、shuffle機制的流程 還是按照上個隨筆MR整體流程的需求來做參考: 1.Mapper中con ...
  • Solr的時區為什麼總是UTC? 從資料庫同步數據到Solr, 為什麼時間總是少了8小時? 要怎樣修改Solr的預設時區?這篇文章來做個實踐和探討. ...
  • 下載鏈接: ※ 如果沒有 直接複製url 到瀏覽器下載 1:下載完成 2:解壓 3: 複製 放到use/local目錄下 4:進入 redis 目錄 ( 確認 /url/local/redis/ 目錄下的文件是否存在, INSTALL Makefile redis.conf ) 5: 生成 6: 測 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...