03初識MapReduce

来源:https://www.cnblogs.com/touch-fish/archive/2023/01/25/17067245.html
-Advertisement-
Play Games

一、std::string 的底層實現 1、深拷貝 1 class String{ 2 public: 3 String(const String &rhs):m_pstr(new char[strlen(rhs) + 1]()){ 4 } 5 private: 6 char* m_pstr; 7 ...


初識MapReduce

一、什麼是MapReduce

MapReduce是一種編程範式,它藉助Map將一個大任務分解成多個小任務,再藉助Reduce歸併Map的結果。MapReduce雖然原理很簡單,但是使用MapReduce設計出一個解決問題的應用卻不是一件簡單的事情。下麵通過一個簡單的小例子來介紹MapReduce。

二、使用MapReduce尋找銷售人員業績最大值

《Hadoop權威指南》的例子是尋找天氣最大值,需要去下載數據。但是我們並不需要完全復刻他的場景,所以這裡用了另外一個例子。假設有一批銷售日誌數據文件,它的一部分是這樣的。

66$2021-01-01$5555
67$2021-01-01$5635

每一行代表某一位銷售人員某個日期的銷售數量,具體格式為

銷售用戶id$統計日期$銷售數量

我們需要尋找每一個銷售用戶的銷售最大值是多少。需要說明的是,這裡僅僅是舉一個很簡單的示例,便於學習MapReduce。

1、數據解析器

我首先寫了一個解析器來識別每一行的文本,它的作用是將每一行文本轉換為數據實體,數據實體這裡偷了個懶,欄位全部設置成了public。代碼片段如下:

/**
 * 銷售數據解釋器
 * 銷售數據格式為
 * userId$countDate(yyyy-MM-dd)$saleCount
 */
public class SaleDataParse implements TextParse<SaleDataEntity> {

    @Override
    public SaleDataEntity parse(String text) {
        if (text == null) {
            return null;
        }
        text = text.trim();
        if (text.isEmpty()) {
            return null;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String[] split = text.split("\\$");
        SaleDataEntity data = new SaleDataEntity();
        data.userId = Long.valueOf(split[0]);
        data.countDate = sdf.parse(split[1], new ParsePosition(0));
        data.saleCount = Integer.valueOf(split[2]);
        return data;
    }
}

/**
 * 銷售數據實體
 */
public class SaleDataEntity {
    /**
     * 銷售用戶id
     */
    public Long userId;
    /**
     * 銷售日期
     */
    public Date countDate;
    /**
     * 銷售總數
     */
    public Integer saleCount;
}

2、Map函數

Mapper是一個泛型類,它需要4個泛型參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Mapper<輸入鍵, 輸入值, 輸出鍵, 輸出值>

其中輸入鍵和輸入值的格式是由InputFormatClass決定的,關於輸入格式的討論之後會展開討論。MapReduce預設會把文件按行拆分,然後偏移量(輸入鍵)->行文本(輸入值)的映射傳遞給Mapper的map方法。輸出鍵和輸出值則由用戶進行指定。

這裡由於是找每一個用戶的最大銷售數量,Mapper的功能是接收並解析每行數據。所以輸出鍵我設成了銷售人員id->銷售數量的映射。所以實際的Mapper實現看起來像這樣:

/**
 * 解析輸入的文本數據
 */
public class MaxSaleMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    protected TextParse<SaleDataEntity> saleDataParse = new SaleDataParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        SaleDataEntity data = saleDataParse.parse(s);
        if (data != null) {
            //寫入輸出給Reducer
            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));
        }
    }
}

其中LongWritable相當於java里的long,Text相當於java里的String,IntWritable相當於java里的int。

這裡你可能會想到,既然已經解析成了數據實體,為什麼不直接把實體設置成輸出值?因為map函數和reduce函數不一定運行在同一個進程里,所以會涉及到序列化和反序列化,這裡先不展開。

3、Reduce函數

Reducer也是一個泛型類,它也需要4個參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Reducer<輸入鍵, 輸入值, 輸出鍵, 輸出值>

與Mapper不同的是,輸入鍵和輸入值來源於Mapper的輸出,也就是Mapper實現中的context.write()。

輸出鍵和輸出值也是由用戶指定,預設的輸出會寫到文件中,關於Reducer的輸出以後會討論。

Reducer的功能是尋找每個用戶的最大值,所以Reducer的實現看起來像這樣:

/**
 * 查找每一個用戶的最大銷售值
 */
public class MaxSaleReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable value : values) {
            if (value.get() > max) {
                max = value.get();
            }
        }
        context.write(key, new IntWritable(max));
    }
}

你可能會奇怪,為什麼reduce方法的第二個參數是一個迭代器。簡單來說,Mapper會把映射的值進行歸併,然後再傳遞給Reducer。

4、驅動程式

我們已經完成了map和reduce函數的實現,現在我們需要把它們組裝起來。我們需要寫一個Main類,它看起來像這樣

public class MaxSale {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MaxSale.class);
        job.setJobName("MaxSale");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxSaleMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MaxSaleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置Reduce任務數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

這裡解釋一下

  • 首先我們創建了一個Job
  • 然後設置輸入目錄和輸出目錄,它們分別是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath
  • 使用setMapperClass設置了map函數,setMapOutputKeyClass設置了map函數的輸入鍵類型,setMapOutputValueClass設置了輸出鍵類型
  • 使用setReducerClass設置了reduce函數,setOutputKeyClass設置了輸出鍵類型,setOutputValueClass設置了輸出值類型
  • 然後使用setNumReduceTasks設置reduce任務個數為1,每個reduce任務都會輸出一個文件,這裡是為了方便查看
  • 最後job.waitForCompletion(true)啟動並等待任務結束

5、運行結果

使用maven package打包,會生成一個jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依賴,需要設置一下HADOOP_CLASSPATH,然後把依賴放到HADOOP_CLASSPATH目錄中。

最後輸入啟動命令,格式為:hadoop jar 生成的jar.jar 輸入數據目錄 輸出數據目錄。這裡給出我使用的命令示例:

Windows:
set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output

然後你會看到程式有如下輸出,這裡截取的部分:

23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
        File System Counters
                FILE: Number of bytes read=33569210
                FILE: Number of bytes written=21132276
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1473110
                Map output records=1473110
                Map output bytes=17677320
                Map output materialized bytes=20623546
                Input split bytes=122
                Combine input records=0
                Spilled Records=1473110
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=36
                Total committed heap usage (bytes)=268435456
        File Input Format Counters
                Bytes Read=33558528
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

等待程式執行結束,output文件夾會有輸出part-r-00000,文件里每一行是每一個用戶的id和他銷售最大值。

0	9994
1	9975
2	9987
3	9985
4	9978
5	9998

三、MapReduce執行流程

簡單分析一下這個示常式度的執行流程:

  1. 首先輸入文件被按行切分,輸入到各個maper
  2. maper的輸出按輸出鍵進行分類,經過shuffle操作後輸入到reducer
  3. reducer收到maper的輸出後,執行尋找最大值操作,然後輸出
  4. 輸出會被預設的輸出格式格式化後輸出到文件part-r-00000

四、示例代碼說明

本文所有的代碼放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop

下麵是項目目錄說明:

  • maxSaleMapReduce模塊是Map函數和Reduce的實現,這個模塊依賴common模塊。所以運行的時候需要把common模塊生成的jar添加到HADOOP_CLASSPATH中
  • common模塊是公共模塊,裡面有一個SaleDataGenerator的數據生成器,可以生成本次示例代碼使用的生成數據

最後需要說明的是,項目代碼主要用於學習,代碼風格並非代表本人實際風格,不完善之處請輕噴。

五、常見問題

  1. java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: xxxxxxxxxxxx\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

    這個是因為沒有下載winutils.exe和hadoop.dll,具體可以參考《安裝一個最小化的Hadoop》中windows額外說明

  2. 運行出現異常java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)

    ​ ......

    這個和問題1類似,Hadoop在Windows需要winutils.exe和hadoop.dll訪問文件,這兩個文件通過org.apache.hadoop.util.Shell#getQualifiedBinPath這個方法獲取,而這個方法又依賴Hadoop的安裝目錄。

    設置HADOOP_HOME環境變數,或者傳入系統參數hadoop.home.dir為Hadoop程式目錄,具體參見《安裝一個最小化的Hadoop》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2023-01-24 一、NoSQL資料庫 1、NoSQL資料庫的簡介 NoSQL(NoSQL=Not Only SQL),即“不僅僅是SQL”,泛指非關係型的資料庫。NosQL不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了資料庫的擴展能力。 (1)不遵循SQL標準 ...
  • 1、首先是adb版本需要新的,老的adb不支持無線連接,如:“adb pair 192.168.3.x:xxxxx”不支持pair這個參數,這個參數表示驗證配對碼!!!新的adb才支持。 無法識別pair參數 2、支持無線調試的手機需要和用於調試的電腦在同一個區域網下。 3、通過adb連接: 【1】 ...
  • JavaScript 中有兩種類型轉換:隱式類型轉換和顯式類型轉換。 隱式類型轉換指 JavaScript 在運行時自動將一種類型轉換為另一種類型。例如,在數學運算中,JavaScript 會將字元串轉換為數字。 顯式類型轉換指在代碼中使用內置函數或全局對象將一種類型顯式地轉換為另一種類型。例如,使 ...
  • JavaScript 中有多種方法可以實現數組去重,下麵是幾種常用的方法: 1、使用 Set 去重:Set 數據結構中不能有重覆元素,可以將數組轉成 Set 類型,再轉回數組。 let arr = [1,2,3,4,5,6,2,3,4]; let uniqueArr = [...new Set(ar ...
  • 前言 為何要學習代碼?為何要學習這個代碼?怎麼學習這個代碼?可不可以學習這個代碼? 能做什麼,有什麼目標? ​ 基本的手機應用、簡單的PC游戲應用、大數據平臺;目標是以愛好為基準做一些游戲和程式; 與Python優缺幾何? ​ 入門難度有區別;一個更面向程式員群體一個則面向費程式員群體;各有優缺; ...
  • 本文作者通過分析微服務的常見優點能解決的問題,提出如何使用單體應用來緩解這些問題,最終指出採用微服務還是單體架構要根據團隊實際情況,而不是為了微服務而微服務。作者最後給出建議,中小團隊和新型團隊,建議採用單體架構,大中型團隊,可以採用微服務架構,但要充分權衡。 在 Web 軟體架構方面,微服務... ...
  • 這篇文章主要關註健康檢測相關的內容,包括服務提供方可能得狀態以及撞見間轉化、什麼是可用率、檢測程式的部署方式等。 ...
  • 題目來源 343. 整數拆分 題目詳情 給定一個正整數 n ,將其拆分為 k 個 正整數 的和( k >= 2 ),並使這些整數的乘積最大化。 返回 你可以獲得的最大乘積 。 示例 1: 輸入: n = 2 輸出: 1 解釋: 2 = 1 + 1, 1 × 1 = 1。 示例 2: 輸入: n = ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...