大數據-Hadoop生態(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分組

来源:https://www.cnblogs.com/duoduotouhenying/archive/2018/12/12/10110459.html
-Advertisement-
Play Games

1.排序概述 2.排序分類 3.WritableComparable案例 這個文件,是大數據-Hadoop生態(12)-Hadoop序列化和源碼追蹤的輸出文件,可以看到,文件根據key,也就是手機號進行了字典排序 欄位含義分別為手機號,上行流量,下行流量,總流量 需求是根據總流量進行排序 Bean對 ...


1.排序概述

2.排序分類

 

3.WritableComparable案例

這個文件,是大數據-Hadoop生態(12)-Hadoop序列化和源碼追蹤的輸出文件,可以看到,文件根據key,也就是手機號進行了字典排序

13470253144    180    180    360
13509468723    7335    110349    117684
13560439638    918    4938    5856
13568436656    3597    25635    29232
13590439668    1116    954    2070
13630577991    6960    690    7650
13682846555    1938    2910    4848
13729199489    240    0    240
13736230513    2481    24681    27162
13768778790    120    120    240
13846544121    264    0    264
13956435636    132    1512    1644
13966251146    240    0    240
13975057813    11058    48243    59301
13992314666    3008    3720    6728
15043685818    3659    3538    7197
15910133277    3156    2936    6092
15959002129    1938    180    2118
18271575951    1527    2106    3633
18390173782    9531    2412    11943
84188413    4116    1432    5548

欄位含義分別為手機號,上行流量,下行流量,總流量

需求是根據總流量進行排序

 

Bean對象,需要實現序列化,反序列化和Comparable介面

package com.nty.writableComparable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 16:33
 */

/**
 * 實現WritableComparable介面
 * 原先將bean序列化時,需要實現Writable介面,現在再實現Comparable介面
 * 
 * public interface WritableComparable<T> extends Writable, Comparable<T>
 * 
 * 所以我們可以實現Writable和Comparable兩個介面,也可以實現WritableComparable介面
 */
public class Flow implements WritableComparable<Flow> {

  private long upflow;
  private long downflow;
  private long total;

    public long getUpflow() {
        return upflow;
    }

    public void setUpflow(long upflow) {
        this.upflow = upflow;
    }

    public long getDownflow() {
        return downflow;
    }

    public void setDownflow(long downflow) {
        this.downflow = downflow;
    }

    public long getTotal() {
        return total;
    }

    public void setTotal(long total) {
        this.total = total;
    }

    //快速賦值
    public void setFlow(long upflow, long downflow){
        this.upflow = upflow;
        this.downflow = downflow;
        this.total = upflow + downflow;
    }

    @Override
    public String toString() {
        return upflow + "\t" + downflow + "\t" + total;
    }

    //重寫compareTo方法
    @Override
    public int compareTo(Flow o) {
        return Long.compare(o.total, this.total);
    }

    //序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upflow);
        out.writeLong(downflow);
        out.writeLong(total);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
        upflow = in.readLong();
        downflow = in.readLong();
        total = in.readLong();
    }
}

Mapper類

package com.nty.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 16:47
 */
public class FlowMapper extends Mapper<LongWritable, Text, Flow, Text> {

    private Text phone = new Text();

    private Flow flow = new Flow();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //13470253144    180    180    360
        //分割行數據
        String[] flieds = value.toString().split("\t");

        //賦值
        phone.set(flieds[0]);

        flow.setFlow(Long.parseLong(flieds[1]), Long.parseLong(flieds[2]));

        //寫出
        context.write(flow, phone);
    }
}

Reducer類

package com.nty.writableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 16:47
 */
//註意一下輸出類型
public class FlowReducer extends Reducer<Flow, Text, Text, Flow> {

    @Override
    protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            //輸出
            context.write(value,key);
        }
    }
}

Driver類

package com.nty.writableComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * author nty
 * date time 2018-12-12 16:47
 */
public class FlowDriver {

    public static void main(String[] args) throws  Exception {
        //1. 獲取Job實例
        Configuration configuration = new Configuration();
        Job instance = Job.getInstance(configuration);

        //2. 設置類路徑
        instance.setJarByClass(FlowDriver.class);


        //3. 設置Mapper和Reducer
        instance.setMapperClass(FlowMapper.class);
        instance.setReducerClass(FlowReducer.class);

        //4. 設置輸出類型
        instance.setMapOutputKeyClass(Flow.class);
        instance.setMapOutputValueClass(Text.class);

        instance.setOutputKeyClass(Text.class);
        instance.setOutputValueClass(Flow.class);

        //5. 設置輸入輸出路徑
        FileInputFormat.setInputPaths(instance, new Path("d:\\Hadoop_test"));
        FileOutputFormat.setOutputPath(instance, new Path("d:\\Hadoop_test_out"));

        //6. 提交
        boolean b = instance.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

 

結果

 

 4.GroupingComparator案例

     訂單id           商品id          商品金額        

0000001    Pdt_01    222.8
0000002    Pdt_05    722.4
0000001    Pdt_02    33.8
0000003    Pdt_06    232.8
0000003    Pdt_02    33.8
0000002    Pdt_03    522.8
0000002    Pdt_04    122.4

求出每一個訂單中最貴的商品

需求分析:

1) 將訂單id和商品金額作為key,在Map階段先用訂單id升序排序,如果訂單id相同,再用商品金額降序排序

2) 在Reduce階段,用groupingComparator按照訂單分組,每一組的第一個即是最貴的商品

 

先定義bean對象,重寫序列化反序列話排序方法

package com.nty.groupingComparator;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 18:07
 */
public class Order implements WritableComparable<Order> {

    private String orderId;

    private String productId;

    private double price;

    public String getOrderId() {
        return orderId;
    }

    public Order setOrderId(String orderId) {
        this.orderId = orderId;
        return this;
    }

    public String getProductId() {
        return productId;
    }

    public Order setProductId(String productId) {
        this.productId = productId;
        return this;
    }

    public double getPrice() {
        return price;
    }

    public Order setPrice(double price) {
        this.price = price;
        return this;
    }

    @Override
    public String toString() {
        return orderId + "\t" + productId + "\t" + price;
    }


    @Override
    public int compareTo(Order o) {
        //先按照訂單排序,正序
        int compare = this.orderId.compareTo(o.getOrderId());
        if(0 == compare){
            //訂單相同,再比較價格,倒序
            return Double.compare( o.getPrice(),this.price);
        }
        return compare;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.price = in.readDouble();
    }
}

Mapper類

package com.nty.groupingComparator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 18:07
 */
public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {

    private Order order = new Order();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //0000001    Pdt_01    222.8
        //分割行數據
        String[] fields = value.toString().split("\t");

        //為order賦值
        order.setOrderId(fields[0]).setProductId(fields[1]).setPrice(Double.parseDouble(fields[2]));

        //寫出
        context.write(order,NullWritable.get());
    }
}

GroupingComparator類

package com.nty.groupingComparator;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * author nty
 * date time 2018-12-12 18:08
 */
public class OrderGroupingComparator extends WritableComparator {

    //用作比較的對象的具體類型
    public OrderGroupingComparator() {
        super(Order.class,true);
    }

    //重寫的方法要選對哦,一共有三個,選擇參數為WritableComparable的方法
    //預設的compare方法調用的是a,b對象的compare方法,但是現在我們排序和分組的規則不一致,所以要重寫分組規則
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Order oa = (Order) a;
        Order ob = (Order) b;
        //按照訂單id分組
        return oa.getOrderId().compareTo(ob.getOrderId());
    }
}

Reducer類

package com.nty.groupingComparator;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 18:07
 */
public class OrderReducer extends Reducer<Order, NullWritable,Order, NullWritable> {

    @Override
    protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //每一組的第一個即是最高價商品,不需要遍歷
        context.write(key, NullWritable.get());
    }
}

Driver類

package com.nty.groupingComparator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 18:07
 */
public class OrderDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1獲取實例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2設置類路徑
        job.setJarByClass(OrderDriver.class);

        //3.設置Mapper和Reducer
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        //4.設置自定義分組類
        job.setGroupingComparatorClass(OrderGroupingComparator.class);

        //5. 設置輸出類型
        job.setMapOutputKeyClass(Order.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Order.class);
        job.setOutputValueClass(NullWritable.class);

        //6. 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));

        //7. 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

輸出結果

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • php7連接mysql測試代碼 ...
  • 簡介 從命令的名字上來看,會讓人誤以為這是一個和安裝相關的命令。 其實不然,install命令用於複製文件(cp)或創建空目錄(mkdir)並設置相關的屬性(chown、chmod)。 這裡的屬性包含了ownership、許可權以及時間戳(保留時間戳,而不是修改)。 語法格式 單源複製。一般省略掉-T ...
  • 在上章學習33.Linux-實現U盤自動掛載(詳解)後,只是講解了普通U盤掛載,並沒有涉及到多分區U盤,接下來本章來繼續學習 1.多分區U盤和普通U盤區別 1)U盤插上只會創建一個/dev/sda文件,這種一般表示該U盤沒有分區,這個sda文件便代表該U盤總大小,我們只需要掛載/dev/sda即可 ...
  • ubuntu16.4系統查看自啟服務: 需要自行安裝一個sysv-rc-conf的工具來查看: 查看自啟命令: Gentos6.8系統查看自啟服務:(註意這個只是查看用RPM包安裝的預設的獨立服務) 查看基於Xinetd的服務: 需要安裝Xinetd: 查詢系統中開啟的服務: ...
  • SQL Server系統表sysobjects 介紹 sysobjects 表結構: 列名 數據類型 描述 name sysname 對象名,常用列 id int 對象標識號 xtype char(2) 對象類型。常用列。xtype可以是下列對象類型中的一種: C = CHECK 約束 D = 預設 ...
  • 一. 概述 上一篇我們介紹瞭如何將數據從 mysql 拋到 kafka,這次我們就專註於利用 storm 將數據寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 數據充當數據源,下章再進行整合。這裡預設你是擁有 ...
  • 刪除數據分為兩種:一種是刪除索引(數據和表結構同時刪除,作用同SQLSERVER 中 DROP TABLE "表格名" ),另一種是刪除數據(不刪除表結構,作用同SQLSERVER中Delete 語句)。 一:刪除索引: 刪除單個索引可以使用命令 【DELETE /索引名稱】 刪除多個索引可以使用命 ...
  • mysql是使用最廣泛的開源關係資料庫之一,大多數開發人員只會簡單的用sql語句操作數據,而不理解服務端架構和sql查詢語句在服務端的執行過程,本文會從sql語句執行路徑來介紹mysql服務端架構,包括連接器,查詢緩存,分析器,優化器等。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...