03初識MapReduce

来源:https://www.cnblogs.com/touch-fish/archive/2023/01/25/17067245.html
-Advertisement-
Play Games

一、std::string 的底層實現 1、深拷貝 1 class String{ 2 public: 3 String(const String &rhs):m_pstr(new char[strlen(rhs) + 1]()){ 4 } 5 private: 6 char* m_pstr; 7 ...


初識MapReduce

一、什麼是MapReduce

MapReduce是一種編程範式,它藉助Map將一個大任務分解成多個小任務,再藉助Reduce歸併Map的結果。MapReduce雖然原理很簡單,但是使用MapReduce設計出一個解決問題的應用卻不是一件簡單的事情。下麵通過一個簡單的小例子來介紹MapReduce。

二、使用MapReduce尋找銷售人員業績最大值

《Hadoop權威指南》的例子是尋找天氣最大值,需要去下載數據。但是我們並不需要完全復刻他的場景,所以這裡用了另外一個例子。假設有一批銷售日誌數據文件,它的一部分是這樣的。

66$2021-01-01$5555
67$2021-01-01$5635

每一行代表某一位銷售人員某個日期的銷售數量,具體格式為

銷售用戶id$統計日期$銷售數量

我們需要尋找每一個銷售用戶的銷售最大值是多少。需要說明的是,這裡僅僅是舉一個很簡單的示例,便於學習MapReduce。

1、數據解析器

我首先寫了一個解析器來識別每一行的文本,它的作用是將每一行文本轉換為數據實體,數據實體這裡偷了個懶,欄位全部設置成了public。代碼片段如下:

/**
 * 銷售數據解釋器
 * 銷售數據格式為
 * userId$countDate(yyyy-MM-dd)$saleCount
 */
public class SaleDataParse implements TextParse<SaleDataEntity> {

    @Override
    public SaleDataEntity parse(String text) {
        if (text == null) {
            return null;
        }
        text = text.trim();
        if (text.isEmpty()) {
            return null;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String[] split = text.split("\\$");
        SaleDataEntity data = new SaleDataEntity();
        data.userId = Long.valueOf(split[0]);
        data.countDate = sdf.parse(split[1], new ParsePosition(0));
        data.saleCount = Integer.valueOf(split[2]);
        return data;
    }
}

/**
 * 銷售數據實體
 */
public class SaleDataEntity {
    /**
     * 銷售用戶id
     */
    public Long userId;
    /**
     * 銷售日期
     */
    public Date countDate;
    /**
     * 銷售總數
     */
    public Integer saleCount;
}

2、Map函數

Mapper是一個泛型類,它需要4個泛型參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Mapper<輸入鍵, 輸入值, 輸出鍵, 輸出值>

其中輸入鍵和輸入值的格式是由InputFormatClass決定的,關於輸入格式的討論之後會展開討論。MapReduce預設會把文件按行拆分,然後偏移量(輸入鍵)->行文本(輸入值)的映射傳遞給Mapper的map方法。輸出鍵和輸出值則由用戶進行指定。

這裡由於是找每一個用戶的最大銷售數量,Mapper的功能是接收並解析每行數據。所以輸出鍵我設成了銷售人員id->銷售數量的映射。所以實際的Mapper實現看起來像這樣:

/**
 * 解析輸入的文本數據
 */
public class MaxSaleMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    protected TextParse<SaleDataEntity> saleDataParse = new SaleDataParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        SaleDataEntity data = saleDataParse.parse(s);
        if (data != null) {
            //寫入輸出給Reducer
            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));
        }
    }
}

其中LongWritable相當於java里的long,Text相當於java里的String,IntWritable相當於java里的int。

這裡你可能會想到,既然已經解析成了數據實體,為什麼不直接把實體設置成輸出值?因為map函數和reduce函數不一定運行在同一個進程里,所以會涉及到序列化和反序列化,這裡先不展開。

3、Reduce函數

Reducer也是一個泛型類,它也需要4個參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Reducer<輸入鍵, 輸入值, 輸出鍵, 輸出值>

與Mapper不同的是,輸入鍵和輸入值來源於Mapper的輸出,也就是Mapper實現中的context.write()。

輸出鍵和輸出值也是由用戶指定,預設的輸出會寫到文件中,關於Reducer的輸出以後會討論。

Reducer的功能是尋找每個用戶的最大值,所以Reducer的實現看起來像這樣:

/**
 * 查找每一個用戶的最大銷售值
 */
public class MaxSaleReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable value : values) {
            if (value.get() > max) {
                max = value.get();
            }
        }
        context.write(key, new IntWritable(max));
    }
}

你可能會奇怪,為什麼reduce方法的第二個參數是一個迭代器。簡單來說,Mapper會把映射的值進行歸併,然後再傳遞給Reducer。

4、驅動程式

我們已經完成了map和reduce函數的實現,現在我們需要把它們組裝起來。我們需要寫一個Main類,它看起來像這樣

public class MaxSale {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MaxSale.class);
        job.setJobName("MaxSale");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxSaleMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MaxSaleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置Reduce任務數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

這裡解釋一下

  • 首先我們創建了一個Job
  • 然後設置輸入目錄和輸出目錄,它們分別是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath
  • 使用setMapperClass設置了map函數,setMapOutputKeyClass設置了map函數的輸入鍵類型,setMapOutputValueClass設置了輸出鍵類型
  • 使用setReducerClass設置了reduce函數,setOutputKeyClass設置了輸出鍵類型,setOutputValueClass設置了輸出值類型
  • 然後使用setNumReduceTasks設置reduce任務個數為1,每個reduce任務都會輸出一個文件,這裡是為了方便查看
  • 最後job.waitForCompletion(true)啟動並等待任務結束

5、運行結果

使用maven package打包,會生成一個jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依賴,需要設置一下HADOOP_CLASSPATH,然後把依賴放到HADOOP_CLASSPATH目錄中。

最後輸入啟動命令,格式為:hadoop jar 生成的jar.jar 輸入數據目錄 輸出數據目錄。這裡給出我使用的命令示例:

Windows:
set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output

然後你會看到程式有如下輸出,這裡截取的部分:

23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
        File System Counters
                FILE: Number of bytes read=33569210
                FILE: Number of bytes written=21132276
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1473110
                Map output records=1473110
                Map output bytes=17677320
                Map output materialized bytes=20623546
                Input split bytes=122
                Combine input records=0
                Spilled Records=1473110
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=36
                Total committed heap usage (bytes)=268435456
        File Input Format Counters
                Bytes Read=33558528
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

等待程式執行結束,output文件夾會有輸出part-r-00000,文件里每一行是每一個用戶的id和他銷售最大值。

0	9994
1	9975
2	9987
3	9985
4	9978
5	9998

三、MapReduce執行流程

簡單分析一下這個示常式度的執行流程:

  1. 首先輸入文件被按行切分,輸入到各個maper
  2. maper的輸出按輸出鍵進行分類,經過shuffle操作後輸入到reducer
  3. reducer收到maper的輸出後,執行尋找最大值操作,然後輸出
  4. 輸出會被預設的輸出格式格式化後輸出到文件part-r-00000

四、示例代碼說明

本文所有的代碼放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop

下麵是項目目錄說明:

  • maxSaleMapReduce模塊是Map函數和Reduce的實現,這個模塊依賴common模塊。所以運行的時候需要把common模塊生成的jar添加到HADOOP_CLASSPATH中
  • common模塊是公共模塊,裡面有一個SaleDataGenerator的數據生成器,可以生成本次示例代碼使用的生成數據

最後需要說明的是,項目代碼主要用於學習,代碼風格並非代表本人實際風格,不完善之處請輕噴。

五、常見問題

  1. java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: xxxxxxxxxxxx\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

    這個是因為沒有下載winutils.exe和hadoop.dll,具體可以參考《安裝一個最小化的Hadoop》中windows額外說明

  2. 運行出現異常java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)

    ​ ......

    這個和問題1類似,Hadoop在Windows需要winutils.exe和hadoop.dll訪問文件,這兩個文件通過org.apache.hadoop.util.Shell#getQualifiedBinPath這個方法獲取,而這個方法又依賴Hadoop的安裝目錄。

    設置HADOOP_HOME環境變數,或者傳入系統參數hadoop.home.dir為Hadoop程式目錄,具體參見《安裝一個最小化的Hadoop》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2023-01-24 一、NoSQL資料庫 1、NoSQL資料庫的簡介 NoSQL(NoSQL=Not Only SQL),即“不僅僅是SQL”,泛指非關係型的資料庫。NosQL不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了資料庫的擴展能力。 (1)不遵循SQL標準 ...
  • 1、首先是adb版本需要新的,老的adb不支持無線連接,如:“adb pair 192.168.3.x:xxxxx”不支持pair這個參數,這個參數表示驗證配對碼!!!新的adb才支持。 無法識別pair參數 2、支持無線調試的手機需要和用於調試的電腦在同一個區域網下。 3、通過adb連接: 【1】 ...
  • JavaScript 中有兩種類型轉換:隱式類型轉換和顯式類型轉換。 隱式類型轉換指 JavaScript 在運行時自動將一種類型轉換為另一種類型。例如,在數學運算中,JavaScript 會將字元串轉換為數字。 顯式類型轉換指在代碼中使用內置函數或全局對象將一種類型顯式地轉換為另一種類型。例如,使 ...
  • JavaScript 中有多種方法可以實現數組去重,下麵是幾種常用的方法: 1、使用 Set 去重:Set 數據結構中不能有重覆元素,可以將數組轉成 Set 類型,再轉回數組。 let arr = [1,2,3,4,5,6,2,3,4]; let uniqueArr = [...new Set(ar ...
  • 前言 為何要學習代碼?為何要學習這個代碼?怎麼學習這個代碼?可不可以學習這個代碼? 能做什麼,有什麼目標? ​ 基本的手機應用、簡單的PC游戲應用、大數據平臺;目標是以愛好為基準做一些游戲和程式; 與Python優缺幾何? ​ 入門難度有區別;一個更面向程式員群體一個則面向費程式員群體;各有優缺; ...
  • 本文作者通過分析微服務的常見優點能解決的問題,提出如何使用單體應用來緩解這些問題,最終指出採用微服務還是單體架構要根據團隊實際情況,而不是為了微服務而微服務。作者最後給出建議,中小團隊和新型團隊,建議採用單體架構,大中型團隊,可以採用微服務架構,但要充分權衡。 在 Web 軟體架構方面,微服務... ...
  • 這篇文章主要關註健康檢測相關的內容,包括服務提供方可能得狀態以及撞見間轉化、什麼是可用率、檢測程式的部署方式等。 ...
  • 題目來源 343. 整數拆分 題目詳情 給定一個正整數 n ,將其拆分為 k 個 正整數 的和( k >= 2 ),並使這些整數的乘積最大化。 返回 你可以獲得的最大乘積 。 示例 1: 輸入: n = 2 輸出: 1 解釋: 2 = 1 + 1, 1 × 1 = 1。 示例 2: 輸入: n = ...
一周排行
    -Advertisement-
    Play Games
  • 一:背景 1. 講故事 年前遇到了好幾例托管堆被損壞的案例,有些運氣好一些,從被破壞的托管堆記憶體現場能觀測出大概是什麼問題,但更多的情況下是無法做出準確判斷的,原因就在於生成的dump是第二現場,借用之前文章的一張圖,大家可以理解一下。 為了幫助更多受此問題困擾的朋友,這篇來整理一下如何 快狠準 的 ...
  • 前言 .NET6 開始,.NET Croe API 項目取消了 Startup.cs 文件,在 Program.cs 文件的 Main 函數中完成服務的註冊和中間件管道的管理。但當我們項目引入更多包的時候,Program.cs 文件也會看起來很臃腫。 而且,我們不只會有一個後端項目,為了方便快速創建 ...
  • 目錄 背景 get 與 post 的區別 所有介面都用 post 請求? 背景 最近在逛知乎的時候發現一個有趣的問題:公司規定所有介面都用 post 請求,這是為什麼? 看到這個問題的時候其實我也挺有感觸的,因為我也曾經這樣問過我自己。在上上一家公司的時候接到一個項目是從零開始搭建一個微服務,當時就 ...
  • *以下內容為本人的學習筆記,如需要轉載,請聲明原文鏈接 微信公眾號「englyf」https://mp.weixin.qq.com/s/2GFLTstDC7w6u3fTJxflNA 本文大概 1685 個字,閱讀需花 6 分鐘內容不多, 但也花了一些精力如要交流, 歡迎關註我然後評論區留言 謝謝你的 ...
  • 在新版本的pandas中,上述代碼會引起警告,建議改成SQLAlchemy connectable(engine/connection),後續代碼將引入這種升級的連接方式。 ...
  • 幾乎所有的高級編程語言都有自己的垃圾回收機制,開發者不需要關註記憶體的申請與釋放,Python 也不例外。Python 官方團隊的文章 https://devguide.python.org/internals/garbage-collector 詳細介紹了 Python 中的垃圾回收演算法,本文是這篇 ...
  • 如果您想查找高於或低於平均值的數字,可以不必計算該平均值,就能查看更高或更低的值。通過Java應用程式,可以自動突出顯示這些數字。除了快速突出顯示高於或低於平均值的值外,您還可以查看高於或低於的值的個數。現在讓我們看看如何在 Java應用程式中實現此操作。 引入jar包 導入方法1: 手動引入。將  ...
  • 第一種方式:使用{} firstDict = {"name": "wang yuan wai ", "age" : 25} 說明:{}為創建一個空的字典對象 第二種方式:使用fromkeys()方法 second_dict = dict.fromkeys(("name", "age")) #valu ...
  • 在golang中可以使用a := b這種方式將b賦值給a,只有當b能進行深拷貝時a與b才不會互相影響,否則就需要進行更為複雜的深拷貝。 下麵就是Go賦值操作的一個說明: Go語言中所有賦值操作都是值傳遞,如果結構中不含指針,則直接賦值就是深度拷貝;如果結構中含有指針(包括自定義指針,以及切片,map ...
  • 本文結合京東監控埋點場景,對解決樣板代碼的技術選型方案進行分析,給出最終解決方案後,結合理論和實踐進一步展開。通過關註文中的技術分析過程和技術場景,讀者可收穫一種樣板代碼思想過程和解決思路,並對Java編譯器底層有初步瞭解。 ...