Hadoop: 單詞計數(Word Count)的MapReduce實現

来源:https://www.cnblogs.com/orion-orion/archive/2022/05/24/16306899.html
-Advertisement-
Play Games

首先,Hadoop會把輸入數據劃分成等長的輸入分片(input split) 或分片發送到MapReduce。Hadoop為每個分片創建一個map任務,由它來運行用戶自定義的map函數以分析每個分片中的記錄。在我們的單詞計數例子中,輸入是多個文件,一般一個文件對應一個分片,如果文件太大則會劃分為多個... ...


1.Map與Reduce過程

1.1 Map過程

首先,Hadoop會把輸入數據劃分成等長的輸入分片(input split)分片發送到MapReduce。Hadoop為每個分片創建一個map任務,由它來運行用戶自定義的map函數以分析每個分片中的記錄。在我們的單詞計數例子中,輸入是多個文件,一般一個文件對應一個分片,如果文件太大則會劃分為多個分片。map函數的輸入以<key, value>形式做為輸入,value為文件的每一行,key為該行在文件中的偏移量(一般我們會忽視)。這裡map函數起到的作用為將每一行進行分詞為多個word,併在context中寫入<word, 1>以代表該單詞出現一次。

map過程的示意圖如下:

mapper代碼編寫如下:

public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量
        StringTokenizer iter = new StringTokenizer(value.toString());
        while (iter.hasMoreTokens()) {
            word.set(iter.nextToken());
            // 向context中寫入<word, 1>
            context.write(word, one);
            System.out.println(word);
        }
    }
}

如果我們能夠並行處理分片(不一定是完全並行),且分片是小塊的數據,那麼處理過程將會有一個好的負載平衡。但是如果分片太小,那麼管理分片與map任務創建將會耗費太多時間。對於大多數作業,理想分片大小為一個HDFS塊的大小,預設是64MB。

map任務的執行節點和輸入數據的存儲節點相同時,Hadoop的性能能達到最佳,這就是電腦系統中所謂的data locality optimization(數據局部性優化)。而最佳分片大小與塊大小相同的原因就在於,它能夠保證一個分片存儲在單個節點上,再大就不能了。

1.2 Reduce過程

接下來我們看reducer的編寫。reduce任務的多少並不是由輸入大小來決定,而是需要人工單獨指定的(預設為1個)。和上面map不同的是,reduce任務不再具有本地讀取的優勢————一個reduce任務的輸入往往來自於所有mapper的輸出,因此map和reduce之間的數據流被稱為 shuffle(洗牌) 。Hadoop會先按照key-value對進行排序,然後將排序好的map的輸出通過網路傳輸到reduce任務運行的節點,併在那裡進行合併,然後傳遞到用戶定義的reduce函數中。

reduce 函數示意圖如下:

reducer代碼編寫如下:

 public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

2.完整代碼

2.1 項目架構

關於VSCode+Java+Maven+Hadoop開發環境搭建,可以參見我的博客《VSCode+Maven+Hadoop開發環境搭建》,此處不再贅述。這裡展示我們的項目架構圖:

Word-Count-Hadoop
├─ input
│  ├─ file1
│  ├─ file2
│  └─ file3
├─ output
├─ pom.xml
├─ src
│  └─ main
│     └─ java
│        └─ WordCount.java
└─ target

WordCount.java代碼如下:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount{
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量
            StringTokenizer iter = new StringTokenizer(value.toString());
            while (iter.hasMoreTokens()) {
                word.set(iter.nextToken());
                // 向context中寫入<word, 1>
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word_count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        //此處的Combine操作意為即第每個mapper工作完了先局部reduce一下,最後再全局reduce
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //第0個參數是輸入目錄,第1個參數是輸出目錄
        //先判斷output path是否存在,如果存在則刪除
        Path path = new Path(args[1]);// 
        FileSystem fileSystem = path.getFileSystem(conf);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
        }

        //設置輸入目錄和輸出目錄
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

pom.xml中記得配置Hadoop的依賴環境:

    ...
  <!-- 集中定義版本號 -->
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <hadoop.version>3.3.1</hadoop.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- 導入hadoop依賴環境 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-api</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
  </dependencies>
  ...
</project>

此外,因為我們的程式自帶輸入參數,我們還需要在VSCode的launch.json中配置輸入參數intput(代表輸入目錄)和output(代表輸出目錄):

...
"args": [
    "input",
    "output"
],
...

編譯運行完畢後,可以查看output文件夾下的part-r-00000文件:

David	1
Goodbye	1
Hello	3
Tom	1
World	2

可見我們的程式正確地完成了單詞計數的功能。

參考

數學是符號的藝術,音樂是上界的語言。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 群暉NAS設置IPV6公網訪問 最近入手了一個群暉nas,記錄下設置公網訪問的過程。 NAS:群暉NAS220+ 路由器:小米AX3600 1、打開路由器上的IPV6功能。 現在路由器預設的還是使用IPV4,IPV6還是需要手動打開的,再去nas ...
  • 解決 Win10 Wsl2 IP 變化問題(2021.2.10) Win10 Wsl2 的 IP 地址每次重啟後都會變化,如果經常需要在 Win10 訪問 Wsl2 內的服務的話會比較麻煩,因此筆者尋找一種解決方案併在此記錄。 1. 產生環境 WSL2; Ubuntu 20.04 focal(on ...
  • echo echo 命令是 Linux bash 和 C shell中最常用的內置命令之一,通常用於腳本語言和批處理文件,用於標準輸出以及顯示文本內容等。echo命令在生產環境腳本中還是使用的非常多的,很多時候都要查看腳本執行是否正常,以及腳本執行到哪裡,都是通過echo命令來列印來定位 。 在寫腳 ...
  • 1、作業控制技巧 Bash環境中通過命令運行一個進程的時候,使【&】 符可以使改進程進入後臺 (base) [root@localhost ~]# sh test.sh & [1] 46963 (base) [root@localhost ~]# 將該進程放入後臺並暫停執行 Ctrl+z (base ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 準備工作:兩個U盤,一個大的作為系統盤,一個小的作為引導盤。 U盤分區 為什麼分盤 我們將u盤作為啟動盤之後,u盤文件不易區分整理,萬一不小心刪除了啟動盤的文件就不好了,所以我們可以將u盤一分為二,一部分作為啟動盤,另一部分作為讀寫盤,這樣就很合 ...
  • chmod怎麼用,Linux文件許可權管理 本文翻譯自Linux官網的Linux入門文章《File Permissions - chmod》,其中一些部分自作主張做了些修改 原文鏈接:File Permissions - chmod 原文 導言 Linux從UNIX繼承了文件所有權和許可權的觀念。這是因 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 1. 問題描述 電腦上成功安裝VMware虛擬機後,安裝Ubuntu系統。Ubuntu系統無法聯網,多方檢查發現問題:宿主機的網路連接中沒有VMware Network Adapter VMnet1和VMware Network Adapter ...
  • 要實現這個示例,必須先安裝好hadoop和hive環境,環境部署可以參考我之前的文章: 大數據Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce) 大數據Hadoop之——數據倉庫Hive 【流程圖如下】 【示例代碼如下】 #!/usr/bin/env python # - ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...