03初識MapReduce

来源:https://www.cnblogs.com/touch-fish/archive/2023/01/25/17067245.html
-Advertisement-
Play Games

一、std::string 的底層實現 1、深拷貝 1 class String{ 2 public: 3 String(const String &rhs):m_pstr(new char[strlen(rhs) + 1]()){ 4 } 5 private: 6 char* m_pstr; 7 ...


初識MapReduce

一、什麼是MapReduce

MapReduce是一種編程範式,它藉助Map將一個大任務分解成多個小任務,再藉助Reduce歸併Map的結果。MapReduce雖然原理很簡單,但是使用MapReduce設計出一個解決問題的應用卻不是一件簡單的事情。下麵通過一個簡單的小例子來介紹MapReduce。

二、使用MapReduce尋找銷售人員業績最大值

《Hadoop權威指南》的例子是尋找天氣最大值,需要去下載數據。但是我們並不需要完全復刻他的場景,所以這裡用了另外一個例子。假設有一批銷售日誌數據文件,它的一部分是這樣的。

66$2021-01-01$5555
67$2021-01-01$5635

每一行代表某一位銷售人員某個日期的銷售數量,具體格式為

銷售用戶id$統計日期$銷售數量

我們需要尋找每一個銷售用戶的銷售最大值是多少。需要說明的是,這裡僅僅是舉一個很簡單的示例,便於學習MapReduce。

1、數據解析器

我首先寫了一個解析器來識別每一行的文本,它的作用是將每一行文本轉換為數據實體,數據實體這裡偷了個懶,欄位全部設置成了public。代碼片段如下:

/**
 * 銷售數據解釋器
 * 銷售數據格式為
 * userId$countDate(yyyy-MM-dd)$saleCount
 */
public class SaleDataParse implements TextParse<SaleDataEntity> {

    @Override
    public SaleDataEntity parse(String text) {
        if (text == null) {
            return null;
        }
        text = text.trim();
        if (text.isEmpty()) {
            return null;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String[] split = text.split("\\$");
        SaleDataEntity data = new SaleDataEntity();
        data.userId = Long.valueOf(split[0]);
        data.countDate = sdf.parse(split[1], new ParsePosition(0));
        data.saleCount = Integer.valueOf(split[2]);
        return data;
    }
}

/**
 * 銷售數據實體
 */
public class SaleDataEntity {
    /**
     * 銷售用戶id
     */
    public Long userId;
    /**
     * 銷售日期
     */
    public Date countDate;
    /**
     * 銷售總數
     */
    public Integer saleCount;
}

2、Map函數

Mapper是一個泛型類,它需要4個泛型參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Mapper<輸入鍵, 輸入值, 輸出鍵, 輸出值>

其中輸入鍵和輸入值的格式是由InputFormatClass決定的,關於輸入格式的討論之後會展開討論。MapReduce預設會把文件按行拆分,然後偏移量(輸入鍵)->行文本(輸入值)的映射傳遞給Mapper的map方法。輸出鍵和輸出值則由用戶進行指定。

這裡由於是找每一個用戶的最大銷售數量,Mapper的功能是接收並解析每行數據。所以輸出鍵我設成了銷售人員id->銷售數量的映射。所以實際的Mapper實現看起來像這樣:

/**
 * 解析輸入的文本數據
 */
public class MaxSaleMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    protected TextParse<SaleDataEntity> saleDataParse = new SaleDataParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        SaleDataEntity data = saleDataParse.parse(s);
        if (data != null) {
            //寫入輸出給Reducer
            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));
        }
    }
}

其中LongWritable相當於java里的long,Text相當於java里的String,IntWritable相當於java里的int。

這裡你可能會想到,既然已經解析成了數據實體,為什麼不直接把實體設置成輸出值?因為map函數和reduce函數不一定運行在同一個進程里,所以會涉及到序列化和反序列化,這裡先不展開。

3、Reduce函數

Reducer也是一個泛型類,它也需要4個參數,從左到右分別是輸入鍵、輸入值、輸出鍵和輸出值。也就是這樣

Reducer<輸入鍵, 輸入值, 輸出鍵, 輸出值>

與Mapper不同的是,輸入鍵和輸入值來源於Mapper的輸出,也就是Mapper實現中的context.write()。

輸出鍵和輸出值也是由用戶指定,預設的輸出會寫到文件中,關於Reducer的輸出以後會討論。

Reducer的功能是尋找每個用戶的最大值,所以Reducer的實現看起來像這樣:

/**
 * 查找每一個用戶的最大銷售值
 */
public class MaxSaleReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable value : values) {
            if (value.get() > max) {
                max = value.get();
            }
        }
        context.write(key, new IntWritable(max));
    }
}

你可能會奇怪,為什麼reduce方法的第二個參數是一個迭代器。簡單來說,Mapper會把映射的值進行歸併,然後再傳遞給Reducer。

4、驅動程式

我們已經完成了map和reduce函數的實現,現在我們需要把它們組裝起來。我們需要寫一個Main類,它看起來像這樣

public class MaxSale {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MaxSale.class);
        job.setJobName("MaxSale");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxSaleMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MaxSaleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置Reduce任務數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

這裡解釋一下

  • 首先我們創建了一個Job
  • 然後設置輸入目錄和輸出目錄,它們分別是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath
  • 使用setMapperClass設置了map函數,setMapOutputKeyClass設置了map函數的輸入鍵類型,setMapOutputValueClass設置了輸出鍵類型
  • 使用setReducerClass設置了reduce函數,setOutputKeyClass設置了輸出鍵類型,setOutputValueClass設置了輸出值類型
  • 然後使用setNumReduceTasks設置reduce任務個數為1,每個reduce任務都會輸出一個文件,這裡是為了方便查看
  • 最後job.waitForCompletion(true)啟動並等待任務結束

5、運行結果

使用maven package打包,會生成一個jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依賴,需要設置一下HADOOP_CLASSPATH,然後把依賴放到HADOOP_CLASSPATH目錄中。

最後輸入啟動命令,格式為:hadoop jar 生成的jar.jar 輸入數據目錄 輸出數據目錄。這裡給出我使用的命令示例:

Windows:
set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output

然後你會看到程式有如下輸出,這裡截取的部分:

23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
        File System Counters
                FILE: Number of bytes read=33569210
                FILE: Number of bytes written=21132276
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1473110
                Map output records=1473110
                Map output bytes=17677320
                Map output materialized bytes=20623546
                Input split bytes=122
                Combine input records=0
                Spilled Records=1473110
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=36
                Total committed heap usage (bytes)=268435456
        File Input Format Counters
                Bytes Read=33558528
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

等待程式執行結束,output文件夾會有輸出part-r-00000,文件里每一行是每一個用戶的id和他銷售最大值。

0	9994
1	9975
2	9987
3	9985
4	9978
5	9998

三、MapReduce執行流程

簡單分析一下這個示常式度的執行流程:

  1. 首先輸入文件被按行切分,輸入到各個maper
  2. maper的輸出按輸出鍵進行分類,經過shuffle操作後輸入到reducer
  3. reducer收到maper的輸出後,執行尋找最大值操作,然後輸出
  4. 輸出會被預設的輸出格式格式化後輸出到文件part-r-00000

四、示例代碼說明

本文所有的代碼放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop

下麵是項目目錄說明:

  • maxSaleMapReduce模塊是Map函數和Reduce的實現,這個模塊依賴common模塊。所以運行的時候需要把common模塊生成的jar添加到HADOOP_CLASSPATH中
  • common模塊是公共模塊,裡面有一個SaleDataGenerator的數據生成器,可以生成本次示例代碼使用的生成數據

最後需要說明的是,項目代碼主要用於學習,代碼風格並非代表本人實際風格,不完善之處請輕噴。

五、常見問題

  1. java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: xxxxxxxxxxxx\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

    這個是因為沒有下載winutils.exe和hadoop.dll,具體可以參考《安裝一個最小化的Hadoop》中windows額外說明

  2. 運行出現異常java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)

    ​ ......

    這個和問題1類似,Hadoop在Windows需要winutils.exe和hadoop.dll訪問文件,這兩個文件通過org.apache.hadoop.util.Shell#getQualifiedBinPath這個方法獲取,而這個方法又依賴Hadoop的安裝目錄。

    設置HADOOP_HOME環境變數,或者傳入系統參數hadoop.home.dir為Hadoop程式目錄,具體參見《安裝一個最小化的Hadoop》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2023-01-24 一、NoSQL資料庫 1、NoSQL資料庫的簡介 NoSQL(NoSQL=Not Only SQL),即“不僅僅是SQL”,泛指非關係型的資料庫。NosQL不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了資料庫的擴展能力。 (1)不遵循SQL標準 ...
  • 1、首先是adb版本需要新的,老的adb不支持無線連接,如:“adb pair 192.168.3.x:xxxxx”不支持pair這個參數,這個參數表示驗證配對碼!!!新的adb才支持。 無法識別pair參數 2、支持無線調試的手機需要和用於調試的電腦在同一個區域網下。 3、通過adb連接: 【1】 ...
  • JavaScript 中有兩種類型轉換:隱式類型轉換和顯式類型轉換。 隱式類型轉換指 JavaScript 在運行時自動將一種類型轉換為另一種類型。例如,在數學運算中,JavaScript 會將字元串轉換為數字。 顯式類型轉換指在代碼中使用內置函數或全局對象將一種類型顯式地轉換為另一種類型。例如,使 ...
  • JavaScript 中有多種方法可以實現數組去重,下麵是幾種常用的方法: 1、使用 Set 去重:Set 數據結構中不能有重覆元素,可以將數組轉成 Set 類型,再轉回數組。 let arr = [1,2,3,4,5,6,2,3,4]; let uniqueArr = [...new Set(ar ...
  • 前言 為何要學習代碼?為何要學習這個代碼?怎麼學習這個代碼?可不可以學習這個代碼? 能做什麼,有什麼目標? ​ 基本的手機應用、簡單的PC游戲應用、大數據平臺;目標是以愛好為基準做一些游戲和程式; 與Python優缺幾何? ​ 入門難度有區別;一個更面向程式員群體一個則面向費程式員群體;各有優缺; ...
  • 本文作者通過分析微服務的常見優點能解決的問題,提出如何使用單體應用來緩解這些問題,最終指出採用微服務還是單體架構要根據團隊實際情況,而不是為了微服務而微服務。作者最後給出建議,中小團隊和新型團隊,建議採用單體架構,大中型團隊,可以採用微服務架構,但要充分權衡。 在 Web 軟體架構方面,微服務... ...
  • 這篇文章主要關註健康檢測相關的內容,包括服務提供方可能得狀態以及撞見間轉化、什麼是可用率、檢測程式的部署方式等。 ...
  • 題目來源 343. 整數拆分 題目詳情 給定一個正整數 n ,將其拆分為 k 個 正整數 的和( k >= 2 ),並使這些整數的乘積最大化。 返回 你可以獲得的最大乘積 。 示例 1: 輸入: n = 2 輸出: 1 解釋: 2 = 1 + 1, 1 × 1 = 1。 示例 2: 輸入: n = ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...