Mapreduce的排序(全局排序、分區加排序、Combiner優化)

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/03/24/10591267.html
-Advertisement-
Play Games

一、MR排序的分類 1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件記憶體都是經過排序的; 2.全局排序; 3.輔助排序:再第一次排序後經過分區再排序一次; 4.二次排序:經過一次排序後又根據業務邏輯再次進行排序。 二、MR排序的介面——WritableComparabl ...


一、MR排序的分類

  1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件記憶體都是經過排序的;

  2.全局排序;

  3.輔助排序:再第一次排序後經過分區再排序一次;

  4.二次排序:經過一次排序後又根據業務邏輯再次進行排序。

 

二、MR排序的介面——WritableComparable

  該介面繼承了Hadoop的Writable介面和Java的Comparable介面,實現該介面要重寫write、readFields、compareTo三個方法。

 

三、流量統計案例的排序與分區

/**
 * @author: PrincessHug
 * @date: 2019/3/24, 15:36
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FlowSortBean implements WritableComparable<FlowSortBean> {
    private long upFlow;
    private long dwFlow;
    private long flowSum;

    public FlowSortBean() {
    }

    public FlowSortBean(long upFlow, long dwFlow) {
        this.upFlow = upFlow;
        this.dwFlow = dwFlow;
        this.flowSum = upFlow + dwFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDwFlow() {
        return dwFlow;
    }

    public void setDwFlow(long dwFlow) {
        this.dwFlow = dwFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dwFlow);
        out.writeLong(flowSum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dwFlow = in.readLong();
        flowSum = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dwFlow + "\t" + flowSum;
    }

    @Override
    public int compareTo(FlowSortBean o) {
        return this.flowSum > o.getFlowSum() ? -1:1;
    }
}

public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取數據
        String line = value.toString();

        //切分數據
        String[] fields = line.split("\t");

        //封裝數據
        long upFlow = Long.parseLong(fields[1]);
        long dwFlow = Long.parseLong(fields[2]);

        //傳輸數據
        context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
    }
}

public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
    @Override
    protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(values.iterator().next(),key);
    }
}

public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
    @Override
    public int getPartition(FlowSortBean key, Text value, int i) {
        String phoneNum = value.toString().substring(0, 3);

        int partition = 4;
        if ("135".equals(phoneNum)){
            return 0;
        }else if ("137".equals(phoneNum)){
            return 1;
        }else if ("138".equals(phoneNum)){
            return 2;
        }else if ("139".equals(phoneNum)){
            return 3;
        }
        return partition;
    }
}

public class FlowSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //設置配置,初始化Job類
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //設置執行類
        job.setJarByClass(FlowSortDriver.class);

        //設置Mapper、Reducer類
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);

        //設置Mapper輸出數據類型
        job.setMapOutputKeyClass(FlowSortBean.class);
        job.setMapOutputValueClass(Text.class);

        //設置Reducer輸出數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowSortBean.class);

        //設置自定義分區
        job.setPartitionerClass(FlowSortPartitioner.class);
        job.setNumReduceTasks(5);

        //設置文件輸入輸出類型
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));

        //提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }

    }
}

  註意:再寫Mapper類的時候,要註意KV對輸出的數據類型,Key的類型一定要為FlowSortBean,因為在Mapper和Reducer之間進行的排序(只是排序)是通過Mapper輸出的Key來進行排序的,而分區可以指定是通過Key或者Value。

 

四、Combiner合併

  Combiner是在MR之外的一個組件,可以用來在maptask輸出到環形緩衝區溢寫之後,分區排序完成時進行局部的彙總,可以減少網路傳輸量,進而優化MR程式。

  Combiner是用在當數據量到達一定規模之後的,小的數據量並不是很明顯。

  例如WordCount程式,當單詞文件的大小到達一定程度,可以使用自定義Combiner進行優化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
	protected void reduce(Text key,Iterable<IntWritable> values,Context context){
		//計數
		int count = 0;
		
		//累加求和
		for(IntWritable v:values){
			count += v.get();
		}
		//輸出
		context.write(key,new IntWritable(count));
	}
}

  然後再Driver類中設置使用Combiner類

job.setCombinerClass(WordCountCombiner.class);

  如果仔細觀察,WordCount的自定義Combiner類與Reducer類是完全相同的,因為他們的邏輯是相同的,即在maptask之後的分區內先進行一次累加求和,然後到reducer後再進行總的累加求和,所以在設置Combiner時也可以這樣:

job.setCombinerClass(WordCountReducer.class);

 

  註意:Combiner的應用一定要註意不能影響最終業務邏輯的情況下使用,比如在求平均值的時候:

  mapper輸出兩個分區:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合併輸出:  5,4     =>avg=4.5  但是實際應該為4.6,錯誤!

  所以在使用Combiner時要註意其不會影響最中的結果!!!

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 前言 本文只講解實戰應用,不會涉及原理講解。如果想要瞭解iptables的工作流程或原理可參考如下博文。 具體操作是在PC機的VMware虛擬機上進行的,因此涉及的地址都是內網IP。在實際工作中也是一樣的操作流程,只需要把涉及外網的地址改為公網IP即可。 文章參考:iptables nat及端 ...
  • 一、虛擬主機 A、基於IP地址 B、基於功能變數名稱 C、基於埠號 複習yum倉庫掛載 mkdir /media/cdrom mount /dev/cdrom /media/cdrom/ vim /etc/fstab /dev/cdrom /media/cdrom iso9960 defaults 0 0 ...
  • 周期性任務丶find 文件查找:find命令 locate :在資料庫中查找,非實時查找,精確度不高,查找速度快,模糊查找 /tmp/passwad/a.textfind:實時查找:速度慢 ,精確匹配find [option] [查找路徑][查找標準][執行動作](預設為顯示)查找路徑:預設為當前路 ...
  • 在Win7、Win8或者Win10系統中,如果要實現電腦的自動定時關機,不需要借用任何的外部程式,直接系統自帶的任務計劃程式即可實現電腦的定時自動關機,支持設定電腦關機時間以及執行頻率次數,如固定每天都執行電腦自動定時關機操作。 (1)首先在開始菜單處輸入任務計劃程式,然後進入任務計劃程式頁面。 ( ...
  • 主、從功能變數名稱伺服器配置 一、實驗環境 主功能變數名稱伺服器:ns1.topsec.com,192.168.120.119 從功能變數名稱伺服器:ns2.topsec.com,192.168.120.120 二、實驗步驟 1.安裝bind yum install bind -y 2.伺服器主DNS IP配置 3.從DN ...
  • mysql是世界上最流行的關係型資料庫管理系統,由瑞典MySQL AB公司開發,目前屬於Oracle公司所有。今天我將記錄一下如何在Linux centos7系統上安裝和配置MySQL。 目錄 + 環境準備 + 安裝包 + 基本設置 + 語法操作 環境準備 + linux centos7操作系統 + ...
  • 什麼是ZooKeeper的腦裂? 為什麼會發生腦裂現象? ZooKeeper如何解決腦裂? 本篇文章告訴你答案~ ...
  • MySQL 初始化 mysqld --initialize 的時候會有密碼,就這個樣子, 可是畢竟總有人跟我一樣,不熟悉安裝過程,沒有註意這一密碼這一項,導致你現在不知道密碼的尷尬處境,或者說你是正常用了某一天腦子短路忘了密碼, 那麼網上查到很多方法會告訴你,往 ini 里配置 skip-grant ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...