Mapreduce的序列化和流量統計程式開發

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/03/24/10590705.html
-Advertisement-
Play Games

一、Hadoop數據序列化的數據類型 Java數據類型 => Hadoop數據類型 int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text boolean BooleanWrita ...


一、Hadoop數據序列化的數據類型

  Java數據類型 => Hadoop數據類型

  int         IntWritable

  float        FloatWritable

  long        LongWritable

  double         DoubleWritable

  String       Text

  boolean      BooleanWritable

  byte        ByteWritable

  map          MapWritable

  array        ArrayWritable

二、Hadoop的序列化

  1.什麼是序列化?

   在java中,序列化介面是Serializable,它下麵又實現了很多的序列化介面,所以java的序列化是一個重量級的序列化框架,一個對象被java序列化之後會附帶很多額外的信息(校驗信息、header、繼承體系等),不便於在網路中進行高效的傳輸,所以Hadoop開發了一套自己的序列化框架——Writable。

      序列化就是把記憶體當中的對象,轉化為位元組序列以便於存儲和網路傳輸;

   反序列化是將收到的位元組序列或硬碟當中的持續化數據,轉換成記憶體中的對象。

  2.序列化的理解方法(自己悟的,不對勿噴~~)

    比如下麵流量統計案例中,流量的封裝類FlowBean實現了Writable介面,其中定義了變數upFlow、dwFlow、flowSum;

    在Mapper和Reducer類中初始化封裝類FlowBean時,記憶體會分配空間載入這些對象,而這些對象不便於在網路中高效的傳輸,這是封裝類FlowBean中的序列化方法將這些對象轉換為位元組序列,方便了存儲和傳輸;

    當Mapper或Reducer需要將這些對象的位元組序列寫出到磁碟時,封裝類FlowBean中的反序列化方法將位元組序列轉換為對象,然後寫道磁碟中。

  3.序列化特點

   序列化與反序列化時分散式數據處理當中經常會出現的,比如hadoop通信是通過遠程調用(rpc)實現的,這個過程就需要序列化。

  特點:1)緊湊;

     2)快速

     3)可擴展

     4)可互操作

三、Mapreduce的流量統計程式案例

  1.代碼

/**
 * @author: PrincessHug
 * @date: 2019/3/23, 23:38
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FlowBean implements Writable {
    private long upFlow;
    private long dwFlow;
    private long flowSum;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDwFlow() {
        return dwFlow;
    }

    public void setDwFlow(long dwFlow) {
        this.dwFlow = dwFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }

    public FlowBean() {
    }

    public FlowBean(long upFlow, long dwFlow) {
        this.upFlow = upFlow;
        this.dwFlow = dwFlow;
        this.flowSum = upFlow + dwFlow;
    }

    /**
     * 序列化
     * @param out 輸出流
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dwFlow);
        out.writeLong(flowSum);
    }

    /**
     * 反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dwFlow = in.readLong();
        flowSum = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dwFlow + "\t" + flowSum;
    }
}

public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取數據
        String line = value.toString();

        //切分數據
        String[] fields = line.split("\t");

        //封裝數據
        String phoneNum = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dwFlow = Long.parseLong(fields[fields.length - 2]);

        //發送數據
        context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));
    }
}

public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        //聚合數據
        long upFlow_sum = 0;
        long dwFlow_sum = 0;
        for (FlowBean f:values){
            upFlow_sum += f.getUpFlow();
            dwFlow_sum += f.getDwFlow();
        }
        //發送數據
        context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));
    }
}


public class FlowPartitioner extends Partitioner<Text,FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int i) {
        //獲取用來分區的電話號碼前三位
        String phoneNum = key.toString().substring(0, 3);
        //設置分區邏輯
        int partitionNum = 4;
        if ("135".equals(phoneNum)){
            return 0;
        }else if ("137".equals(phoneNum)){
            return 1;
        }else if ("138".equals(phoneNum)){
            return 2;
        }else if ("139".equals(phoneNum)){
            return 3;
        }
        return partitionNum;
    }
}
public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //獲取配置,定義工具
        Configuration conf = new Configuration();
        Job job = Job.getInstance();

        //設置運行類
        job.setJarByClass(FlowCountDriver.class);

        //設置Mapper類及Mapper輸出數據類型
        job.setMapperClass(FlowCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //設置Reducer類及其輸出數據類型
        job.setReducerClass(FlowCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //設置自定義分區
        job.setPartitionerClass(FlowPartitioner.class);
        job.setNumReduceTasks(5);

        //設置文件輸入輸出流
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout"));

        //返回運行完成
        if (job.waitForCompletion(true)){
            System.out.println("運行完畢!");
        }else {
            System.out.println("運行出錯!");
        }
    }
}

 

 

 

  


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、虛擬主機 A、基於IP地址 B、基於功能變數名稱 C、基於埠號 複習yum倉庫掛載 mkdir /media/cdrom mount /dev/cdrom /media/cdrom/ vim /etc/fstab /dev/cdrom /media/cdrom iso9960 defaults 0 0 ...
  • 周期性任務丶find 文件查找:find命令 locate :在資料庫中查找,非實時查找,精確度不高,查找速度快,模糊查找 /tmp/passwad/a.textfind:實時查找:速度慢 ,精確匹配find [option] [查找路徑][查找標準][執行動作](預設為顯示)查找路徑:預設為當前路 ...
  • 在Win7、Win8或者Win10系統中,如果要實現電腦的自動定時關機,不需要借用任何的外部程式,直接系統自帶的任務計劃程式即可實現電腦的定時自動關機,支持設定電腦關機時間以及執行頻率次數,如固定每天都執行電腦自動定時關機操作。 (1)首先在開始菜單處輸入任務計劃程式,然後進入任務計劃程式頁面。 ( ...
  • 主、從功能變數名稱伺服器配置 一、實驗環境 主功能變數名稱伺服器:ns1.topsec.com,192.168.120.119 從功能變數名稱伺服器:ns2.topsec.com,192.168.120.120 二、實驗步驟 1.安裝bind yum install bind -y 2.伺服器主DNS IP配置 3.從DN ...
  • mysql是世界上最流行的關係型資料庫管理系統,由瑞典MySQL AB公司開發,目前屬於Oracle公司所有。今天我將記錄一下如何在Linux centos7系統上安裝和配置MySQL。 目錄 + 環境準備 + 安裝包 + 基本設置 + 語法操作 環境準備 + linux centos7操作系統 + ...
  • 什麼是ZooKeeper的腦裂? 為什麼會發生腦裂現象? ZooKeeper如何解決腦裂? 本篇文章告訴你答案~ ...
  • MySQL 初始化 mysqld --initialize 的時候會有密碼,就這個樣子, 可是畢竟總有人跟我一樣,不熟悉安裝過程,沒有註意這一密碼這一項,導致你現在不知道密碼的尷尬處境,或者說你是正常用了某一天腦子短路忘了密碼, 那麼網上查到很多方法會告訴你,往 ini 里配置 skip-grant ...
  • 一、MR排序的分類 1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件記憶體都是經過排序的; 2.全局排序; 3.輔助排序:再第一次排序後經過分區再排序一次; 4.二次排序:經過一次排序後又根據業務邏輯再次進行排序。 二、MR排序的介面——WritableComparabl ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...