03初識MapReduce

来源:https://www.cnblogs.com/touch-fish/archive/2023/01/25/17067245.html
-Advertisement-
Play Games

一、std::string 的底層實現 1、深拷貝 1 class String{ 2 public: 3 String(const String &rhs):m_pstr(new char[strlen(rhs) + 1]()){ 4 } 5 private: 6 char* m_pstr; 7 ...


初識MapReduce

一、什麼是MapReduce

MapReduce是一種編程範式,它藉助Map將一個大任務分解成多個小任務,再藉助Reduce歸併Map的結果。MapReduce雖然原理很簡單,但是使用MapReduce設計出一個解決問題的應用卻不是一件簡單的事情。下麵通過一個簡單的小例子來介紹MapReduce。

二、使用MapReduce尋找銷售人員業績最大值

《Hadoop權威指南》的例子是尋找天氣最大值,需要去下載數據。但是我們並不需要完全復刻他的場景,所以這裡用了另外一個例子。假設有一批銷售日誌數據文件,它的一部分是這樣的。

66$2021-01-01$5555
67$2021-01-01$5635

每一行代表某一位銷售人員某個日期的銷售數量,具體格式為

銷售用戶id$統計日期$銷售數量

我們需要尋找每一個銷售用戶的銷售最大值是多少。需要說明的是,這裡僅僅是舉一個很簡單的示例,便於學習MapReduce。

1、數據解析器

我首先寫了一個解析器來識別每一行的文本,它的作用是將每一行文本轉換為數據實體,數據實體這裡偷了個懶,欄位全部設置成了public。代碼片段如下:

/**
 * 銷售數據解釋器
 * 銷售數據格式為
 * userId$countDate(yyyy-MM-dd)$saleCount
 */
public class SaleDataParse implements TextParse<SaleDataEntity> {

    @Override
    public SaleDataEntity parse(String text) {
        if (text == null) {
            return null;
        }
        text = text.trim();
        if (text.isEmpty()) {
            return null;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String[] split = text.split("\\$");
        SaleDataEntity data = new SaleDataEntity();
        data.userId = Long.valueOf(split[0]);
        data.countDate = sdf.parse(split[1], new ParsePosition(0));
        data.saleCount = Integer.valueOf(split[2]);
        return data;
    }
}

/**
 * 銷售數據實體
 */
public class SaleDataEntity {
    /**
     * 銷售用戶id
     */
    public Long userId;
    /**
     * 銷售日期
     */
    public Date countDate;
    /**
     * 銷售總數
     */
    public Integer saleCount;
}

2、Map函數

Mapper是一個泛型類,它需要4個泛型參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Mapper<輸入鍵, 輸入值, 輸出鍵, 輸出值>

其中輸入鍵和輸入值的格式是由InputFormatClass決定的,關於輸入格式的討論之後會展開討論。MapReduce預設會把文件按行拆分,然後偏移量(輸入鍵)->行文本(輸入值)的映射傳遞給Mapper的map方法。輸出鍵和輸出值則由用戶進行指定。

這裡由於是找每一個用戶的最大銷售數量,Mapper的功能是接收並解析每行數據。所以輸出鍵我設成了銷售人員id->銷售數量的映射。所以實際的Mapper實現看起來像這樣:

/**
 * 解析輸入的文本數據
 */
public class MaxSaleMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    protected TextParse<SaleDataEntity> saleDataParse = new SaleDataParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        SaleDataEntity data = saleDataParse.parse(s);
        if (data != null) {
            //寫入輸出給Reducer
            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));
        }
    }
}

其中LongWritable相當於java里的long,Text相當於java里的String,IntWritable相當於java里的int。

這裡你可能會想到,既然已經解析成了數據實體,為什麼不直接把實體設置成輸出值?因為map函數和reduce函數不一定運行在同一個進程里,所以會涉及到序列化和反序列化,這裡先不展開。

3、Reduce函數

Reducer也是一個泛型類,它也需要4個參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Reducer<輸入鍵, 輸入值, 輸出鍵, 輸出值>

與Mapper不同的是,輸入鍵和輸入值來源於Mapper的輸出,也就是Mapper實現中的context.write()。

輸出鍵和輸出值也是由用戶指定,預設的輸出會寫到文件中,關於Reducer的輸出以後會討論。

Reducer的功能是尋找每個用戶的最大值,所以Reducer的實現看起來像這樣:

/**
 * 查找每一個用戶的最大銷售值
 */
public class MaxSaleReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable value : values) {
            if (value.get() > max) {
                max = value.get();
            }
        }
        context.write(key, new IntWritable(max));
    }
}

你可能會奇怪,為什麼reduce方法的第二個參數是一個迭代器。簡單來說,Mapper會把映射的值進行歸併,然後再傳遞給Reducer。

4、驅動程式

我們已經完成了map和reduce函數的實現,現在我們需要把它們組裝起來。我們需要寫一個Main類,它看起來像這樣

public class MaxSale {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MaxSale.class);
        job.setJobName("MaxSale");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxSaleMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MaxSaleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置Reduce任務數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

這裡解釋一下

  • 首先我們創建了一個Job
  • 然後設置輸入目錄和輸出目錄,它們分別是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath
  • 使用setMapperClass設置了map函數,setMapOutputKeyClass設置了map函數的輸入鍵類型,setMapOutputValueClass設置了輸出鍵類型
  • 使用setReducerClass設置了reduce函數,setOutputKeyClass設置了輸出鍵類型,setOutputValueClass設置了輸出值類型
  • 然後使用setNumReduceTasks設置reduce任務個數為1,每個reduce任務都會輸出一個文件,這裡是為了方便查看
  • 最後job.waitForCompletion(true)啟動並等待任務結束

5、運行結果

使用maven package打包,會生成一個jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依賴,需要設置一下HADOOP_CLASSPATH,然後把依賴放到HADOOP_CLASSPATH目錄中。

最後輸入啟動命令,格式為:hadoop jar 生成的jar.jar 輸入數據目錄 輸出數據目錄。這裡給出我使用的命令示例:

Windows:
set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output

然後你會看到程式有如下輸出,這裡截取的部分:

23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
        File System Counters
                FILE: Number of bytes read=33569210
                FILE: Number of bytes written=21132276
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1473110
                Map output records=1473110
                Map output bytes=17677320
                Map output materialized bytes=20623546
                Input split bytes=122
                Combine input records=0
                Spilled Records=1473110
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=36
                Total committed heap usage (bytes)=268435456
        File Input Format Counters
                Bytes Read=33558528
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

等待程式執行結束,output文件夾會有輸出part-r-00000,文件里每一行是每一個用戶的id和他銷售最大值。

0	9994
1	9975
2	9987
3	9985
4	9978
5	9998

三、MapReduce執行流程

簡單分析一下這個示常式度的執行流程:

  1. 首先輸入文件被按行切分,輸入到各個maper
  2. maper的輸出按輸出鍵進行分類,經過shuffle操作後輸入到reducer
  3. reducer收到maper的輸出後,執行尋找最大值操作,然後輸出
  4. 輸出會被預設的輸出格式格式化後輸出到文件part-r-00000

四、示例代碼說明

本文所有的代碼放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop

下麵是項目目錄說明:

  • maxSaleMapReduce模塊是Map函數和Reduce的實現,這個模塊依賴common模塊。所以運行的時候需要把common模塊生成的jar添加到HADOOP_CLASSPATH中
  • common模塊是公共模塊,裡面有一個SaleDataGenerator的數據生成器,可以生成本次示例代碼使用的生成數據

最後需要說明的是,項目代碼主要用於學習,代碼風格並非代表本人實際風格,不完善之處請輕噴。

五、常見問題

  1. java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: xxxxxxxxxxxx\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

    這個是因為沒有下載winutils.exe和hadoop.dll,具體可以參考《安裝一個最小化的Hadoop》中windows額外說明

  2. 運行出現異常java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)

    ​ ......

    這個和問題1類似,Hadoop在Windows需要winutils.exe和hadoop.dll訪問文件,這兩個文件通過org.apache.hadoop.util.Shell#getQualifiedBinPath這個方法獲取,而這個方法又依賴Hadoop的安裝目錄。

    設置HADOOP_HOME環境變數,或者傳入系統參數hadoop.home.dir為Hadoop程式目錄,具體參見《安裝一個最小化的Hadoop》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2023-01-24 一、NoSQL資料庫 1、NoSQL資料庫的簡介 NoSQL(NoSQL=Not Only SQL),即“不僅僅是SQL”,泛指非關係型的資料庫。NosQL不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了資料庫的擴展能力。 (1)不遵循SQL標準 ...
  • 1、首先是adb版本需要新的,老的adb不支持無線連接,如:“adb pair 192.168.3.x:xxxxx”不支持pair這個參數,這個參數表示驗證配對碼!!!新的adb才支持。 無法識別pair參數 2、支持無線調試的手機需要和用於調試的電腦在同一個區域網下。 3、通過adb連接: 【1】 ...
  • JavaScript 中有兩種類型轉換:隱式類型轉換和顯式類型轉換。 隱式類型轉換指 JavaScript 在運行時自動將一種類型轉換為另一種類型。例如,在數學運算中,JavaScript 會將字元串轉換為數字。 顯式類型轉換指在代碼中使用內置函數或全局對象將一種類型顯式地轉換為另一種類型。例如,使 ...
  • JavaScript 中有多種方法可以實現數組去重,下麵是幾種常用的方法: 1、使用 Set 去重:Set 數據結構中不能有重覆元素,可以將數組轉成 Set 類型,再轉回數組。 let arr = [1,2,3,4,5,6,2,3,4]; let uniqueArr = [...new Set(ar ...
  • 前言 為何要學習代碼?為何要學習這個代碼?怎麼學習這個代碼?可不可以學習這個代碼? 能做什麼,有什麼目標? ​ 基本的手機應用、簡單的PC游戲應用、大數據平臺;目標是以愛好為基準做一些游戲和程式; 與Python優缺幾何? ​ 入門難度有區別;一個更面向程式員群體一個則面向費程式員群體;各有優缺; ...
  • 本文作者通過分析微服務的常見優點能解決的問題,提出如何使用單體應用來緩解這些問題,最終指出採用微服務還是單體架構要根據團隊實際情況,而不是為了微服務而微服務。作者最後給出建議,中小團隊和新型團隊,建議採用單體架構,大中型團隊,可以採用微服務架構,但要充分權衡。 在 Web 軟體架構方面,微服務... ...
  • 這篇文章主要關註健康檢測相關的內容,包括服務提供方可能得狀態以及撞見間轉化、什麼是可用率、檢測程式的部署方式等。 ...
  • 題目來源 343. 整數拆分 題目詳情 給定一個正整數 n ,將其拆分為 k 個 正整數 的和( k >= 2 ),並使這些整數的乘積最大化。 返回 你可以獲得的最大乘積 。 示例 1: 輸入: n = 2 輸出: 1 解釋: 2 = 1 + 1, 1 × 1 = 1。 示例 2: 輸入: n = ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...