03初識MapReduce

来源:https://www.cnblogs.com/touch-fish/archive/2023/01/25/17067245.html
-Advertisement-
Play Games

一、std::string 的底層實現 1、深拷貝 1 class String{ 2 public: 3 String(const String &rhs):m_pstr(new char[strlen(rhs) + 1]()){ 4 } 5 private: 6 char* m_pstr; 7 ...


初識MapReduce

一、什麼是MapReduce

MapReduce是一種編程範式,它藉助Map將一個大任務分解成多個小任務,再藉助Reduce歸併Map的結果。MapReduce雖然原理很簡單,但是使用MapReduce設計出一個解決問題的應用卻不是一件簡單的事情。下麵通過一個簡單的小例子來介紹MapReduce。

二、使用MapReduce尋找銷售人員業績最大值

《Hadoop權威指南》的例子是尋找天氣最大值,需要去下載數據。但是我們並不需要完全復刻他的場景,所以這裡用了另外一個例子。假設有一批銷售日誌數據文件,它的一部分是這樣的。

66$2021-01-01$5555
67$2021-01-01$5635

每一行代表某一位銷售人員某個日期的銷售數量,具體格式為

銷售用戶id$統計日期$銷售數量

我們需要尋找每一個銷售用戶的銷售最大值是多少。需要說明的是,這裡僅僅是舉一個很簡單的示例,便於學習MapReduce。

1、數據解析器

我首先寫了一個解析器來識別每一行的文本,它的作用是將每一行文本轉換為數據實體,數據實體這裡偷了個懶,欄位全部設置成了public。代碼片段如下:

/**
 * 銷售數據解釋器
 * 銷售數據格式為
 * userId$countDate(yyyy-MM-dd)$saleCount
 */
public class SaleDataParse implements TextParse<SaleDataEntity> {

    @Override
    public SaleDataEntity parse(String text) {
        if (text == null) {
            return null;
        }
        text = text.trim();
        if (text.isEmpty()) {
            return null;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String[] split = text.split("\\$");
        SaleDataEntity data = new SaleDataEntity();
        data.userId = Long.valueOf(split[0]);
        data.countDate = sdf.parse(split[1], new ParsePosition(0));
        data.saleCount = Integer.valueOf(split[2]);
        return data;
    }
}

/**
 * 銷售數據實體
 */
public class SaleDataEntity {
    /**
     * 銷售用戶id
     */
    public Long userId;
    /**
     * 銷售日期
     */
    public Date countDate;
    /**
     * 銷售總數
     */
    public Integer saleCount;
}

2、Map函數

Mapper是一個泛型類,它需要4個泛型參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Mapper<輸入鍵, 輸入值, 輸出鍵, 輸出值>

其中輸入鍵和輸入值的格式是由InputFormatClass決定的,關於輸入格式的討論之後會展開討論。MapReduce預設會把文件按行拆分,然後偏移量(輸入鍵)->行文本(輸入值)的映射傳遞給Mapper的map方法。輸出鍵和輸出值則由用戶進行指定。

這裡由於是找每一個用戶的最大銷售數量,Mapper的功能是接收並解析每行數據。所以輸出鍵我設成了銷售人員id->銷售數量的映射。所以實際的Mapper實現看起來像這樣:

/**
 * 解析輸入的文本數據
 */
public class MaxSaleMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    protected TextParse<SaleDataEntity> saleDataParse = new SaleDataParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        SaleDataEntity data = saleDataParse.parse(s);
        if (data != null) {
            //寫入輸出給Reducer
            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));
        }
    }
}

其中LongWritable相當於java里的long,Text相當於java里的String,IntWritable相當於java里的int。

這裡你可能會想到,既然已經解析成了數據實體,為什麼不直接把實體設置成輸出值?因為map函數和reduce函數不一定運行在同一個進程里,所以會涉及到序列化和反序列化,這裡先不展開。

3、Reduce函數

Reducer也是一個泛型類,它也需要4個參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Reducer<輸入鍵, 輸入值, 輸出鍵, 輸出值>

與Mapper不同的是,輸入鍵和輸入值來源於Mapper的輸出,也就是Mapper實現中的context.write()。

輸出鍵和輸出值也是由用戶指定,預設的輸出會寫到文件中,關於Reducer的輸出以後會討論。

Reducer的功能是尋找每個用戶的最大值,所以Reducer的實現看起來像這樣:

/**
 * 查找每一個用戶的最大銷售值
 */
public class MaxSaleReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable value : values) {
            if (value.get() > max) {
                max = value.get();
            }
        }
        context.write(key, new IntWritable(max));
    }
}

你可能會奇怪,為什麼reduce方法的第二個參數是一個迭代器。簡單來說,Mapper會把映射的值進行歸併,然後再傳遞給Reducer。

4、驅動程式

我們已經完成了map和reduce函數的實現,現在我們需要把它們組裝起來。我們需要寫一個Main類,它看起來像這樣

public class MaxSale {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MaxSale.class);
        job.setJobName("MaxSale");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxSaleMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MaxSaleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置Reduce任務數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

這裡解釋一下

  • 首先我們創建了一個Job
  • 然後設置輸入目錄和輸出目錄,它們分別是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath
  • 使用setMapperClass設置了map函數,setMapOutputKeyClass設置了map函數的輸入鍵類型,setMapOutputValueClass設置了輸出鍵類型
  • 使用setReducerClass設置了reduce函數,setOutputKeyClass設置了輸出鍵類型,setOutputValueClass設置了輸出值類型
  • 然後使用setNumReduceTasks設置reduce任務個數為1,每個reduce任務都會輸出一個文件,這裡是為了方便查看
  • 最後job.waitForCompletion(true)啟動並等待任務結束

5、運行結果

使用maven package打包,會生成一個jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依賴,需要設置一下HADOOP_CLASSPATH,然後把依賴放到HADOOP_CLASSPATH目錄中。

最後輸入啟動命令,格式為:hadoop jar 生成的jar.jar 輸入數據目錄 輸出數據目錄。這裡給出我使用的命令示例:

Windows:
set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output

然後你會看到程式有如下輸出,這裡截取的部分:

23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
        File System Counters
                FILE: Number of bytes read=33569210
                FILE: Number of bytes written=21132276
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1473110
                Map output records=1473110
                Map output bytes=17677320
                Map output materialized bytes=20623546
                Input split bytes=122
                Combine input records=0
                Spilled Records=1473110
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=36
                Total committed heap usage (bytes)=268435456
        File Input Format Counters
                Bytes Read=33558528
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

等待程式執行結束,output文件夾會有輸出part-r-00000,文件里每一行是每一個用戶的id和他銷售最大值。

0	9994
1	9975
2	9987
3	9985
4	9978
5	9998

三、MapReduce執行流程

簡單分析一下這個示常式度的執行流程:

  1. 首先輸入文件被按行切分,輸入到各個maper
  2. maper的輸出按輸出鍵進行分類,經過shuffle操作後輸入到reducer
  3. reducer收到maper的輸出後,執行尋找最大值操作,然後輸出
  4. 輸出會被預設的輸出格式格式化後輸出到文件part-r-00000

四、示例代碼說明

本文所有的代碼放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop

下麵是項目目錄說明:

  • maxSaleMapReduce模塊是Map函數和Reduce的實現,這個模塊依賴common模塊。所以運行的時候需要把common模塊生成的jar添加到HADOOP_CLASSPATH中
  • common模塊是公共模塊,裡面有一個SaleDataGenerator的數據生成器,可以生成本次示例代碼使用的生成數據

最後需要說明的是,項目代碼主要用於學習,代碼風格並非代表本人實際風格,不完善之處請輕噴。

五、常見問題

  1. java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: xxxxxxxxxxxx\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

    這個是因為沒有下載winutils.exe和hadoop.dll,具體可以參考《安裝一個最小化的Hadoop》中windows額外說明

  2. 運行出現異常java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)

    ​ ......

    這個和問題1類似,Hadoop在Windows需要winutils.exe和hadoop.dll訪問文件,這兩個文件通過org.apache.hadoop.util.Shell#getQualifiedBinPath這個方法獲取,而這個方法又依賴Hadoop的安裝目錄。

    設置HADOOP_HOME環境變數,或者傳入系統參數hadoop.home.dir為Hadoop程式目錄,具體參見《安裝一個最小化的Hadoop》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2023-01-24 一、NoSQL資料庫 1、NoSQL資料庫的簡介 NoSQL(NoSQL=Not Only SQL),即“不僅僅是SQL”,泛指非關係型的資料庫。NosQL不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了資料庫的擴展能力。 (1)不遵循SQL標準 ...
  • 1、首先是adb版本需要新的,老的adb不支持無線連接,如:“adb pair 192.168.3.x:xxxxx”不支持pair這個參數,這個參數表示驗證配對碼!!!新的adb才支持。 無法識別pair參數 2、支持無線調試的手機需要和用於調試的電腦在同一個區域網下。 3、通過adb連接: 【1】 ...
  • JavaScript 中有兩種類型轉換:隱式類型轉換和顯式類型轉換。 隱式類型轉換指 JavaScript 在運行時自動將一種類型轉換為另一種類型。例如,在數學運算中,JavaScript 會將字元串轉換為數字。 顯式類型轉換指在代碼中使用內置函數或全局對象將一種類型顯式地轉換為另一種類型。例如,使 ...
  • JavaScript 中有多種方法可以實現數組去重,下麵是幾種常用的方法: 1、使用 Set 去重:Set 數據結構中不能有重覆元素,可以將數組轉成 Set 類型,再轉回數組。 let arr = [1,2,3,4,5,6,2,3,4]; let uniqueArr = [...new Set(ar ...
  • 前言 為何要學習代碼?為何要學習這個代碼?怎麼學習這個代碼?可不可以學習這個代碼? 能做什麼,有什麼目標? ​ 基本的手機應用、簡單的PC游戲應用、大數據平臺;目標是以愛好為基準做一些游戲和程式; 與Python優缺幾何? ​ 入門難度有區別;一個更面向程式員群體一個則面向費程式員群體;各有優缺; ...
  • 本文作者通過分析微服務的常見優點能解決的問題,提出如何使用單體應用來緩解這些問題,最終指出採用微服務還是單體架構要根據團隊實際情況,而不是為了微服務而微服務。作者最後給出建議,中小團隊和新型團隊,建議採用單體架構,大中型團隊,可以採用微服務架構,但要充分權衡。 在 Web 軟體架構方面,微服務... ...
  • 這篇文章主要關註健康檢測相關的內容,包括服務提供方可能得狀態以及撞見間轉化、什麼是可用率、檢測程式的部署方式等。 ...
  • 題目來源 343. 整數拆分 題目詳情 給定一個正整數 n ,將其拆分為 k 個 正整數 的和( k >= 2 ),並使這些整數的乘積最大化。 返回 你可以獲得的最大乘積 。 示例 1: 輸入: n = 2 輸出: 1 解釋: 2 = 1 + 1, 1 × 1 = 1。 示例 2: 輸入: n = ...
一周排行
    -Advertisement-
    Play Games
  • 1、預覽地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 隨著網路的發展,企業對於信息系統數據的保密工作愈發重視,不同身份、角色對於數據的訪問許可權都應該大相徑庭。 列如 1、不同登錄人員對一個數據列表的可見度是不一樣的,如數據列、數據行、數據按鈕 ...
  • 前言 上一篇文章寫瞭如何使用RabbitMQ做個簡單的發送郵件項目,然後評論也是比較多,也是準備去學習一下如何確保RabbitMQ的消息可靠性,但是由於時間原因,先來說說設計模式中的簡單工廠模式吧! 在瞭解簡單工廠模式之前,我們要知道C#是一款面向對象的高級程式語言。它有3大特性,封裝、繼承、多態。 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 介紹 Nodify是一個WPF基於節點的編輯器控制項,其中包含一系列節點、連接和連接器組件,旨在簡化構建基於節點的工具的過程 ...
  • 創建一個webapi項目做測試使用。 創建新控制器,搭建一個基礎框架,包括獲取當天日期、wiki的請求地址等 創建一個Http請求幫助類以及方法,用於獲取指定URL的信息 使用http請求訪問指定url,先運行一下,看看返回的內容。內容如圖右邊所示,實際上是一個Json數據。我們主要解析 大事記 部 ...
  • 最近在不少自媒體上看到有關.NET與C#的資訊與評價,感覺大家對.NET與C#還是不太瞭解,尤其是對2016年6月發佈的跨平臺.NET Core 1.0,更是知之甚少。在考慮一番之後,還是決定寫點東西總結一下,也回顧一下.NET的發展歷史。 首先,你沒看錯,.NET是跨平臺的,可以在Windows、 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 添加節點(nodes) 通過上一篇我們已經創建好了編輯器實例現在我們為編輯器添加一個節點 添加model和viewmode ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...
  • 類型檢查和轉換:當你需要檢查對象是否為特定類型,並且希望在同一時間內將其轉換為那個類型時,模式匹配提供了一種更簡潔的方式來完成這一任務,避免了使用傳統的as和is操作符後還需要進行額外的null檢查。 複雜條件邏輯:在處理複雜的條件邏輯時,特別是涉及到多個條件和類型的情況下,使用模式匹配可以使代碼更 ...
  • 在日常開發中,我們經常需要和文件打交道,特別是桌面開發,有時候就會需要載入大批量的文件,而且可能還會存在部分文件缺失的情況,那麼如何才能快速的判斷文件是否存在呢?如果處理不當的,且文件數量比較多的時候,可能會造成卡頓等情況,進而影響程式的使用體驗。今天就以一個簡單的小例子,簡述兩種不同的判斷文件是否... ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...