大數據-Hadoop生態(16)-MapReduce框架原理-自定義FileInputFormat

来源:https://www.cnblogs.com/duoduotouhenying/archive/2018/12/11/10101817.html
-Advertisement-
Play Games

1. 需求 將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進位形式的key-value對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value 三個小文件 one.txt two.t ...


 

1. 需求

將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進位形式的key-value對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value

三個小文件

one.txt

yongpeng weidong weinan
sanfeng luozong xiaoming

two.txt

shuaige changmo zhenqiang 
dongli lingu xuanxuan

three.txt

longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin

 

2. 需求分析

 

3.案例代碼

1) 自定義RecordReader

package com.nty.inputformat;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-11 9:10
 */
public class CustomRecordReader extends RecordReader<Text, BytesWritable> {

    /**
     * 由於採用了FileInputFormat的輸入方式,所以輸入源3個文件,會分成三個切片,所以一個RecordReader只處理一個文件,一次讀完
     */

    //標記文件是否被讀過,true表示沒被讀過
    private boolean flag = true;

    private Text key = new Text();
    private BytesWritable value = new BytesWritable();

    //輸入流
    FSDataInputStream fis;

    private FileSplit fs;

    /**
     * 初始化方法,只調用一次
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //FileSplit是InputSplit的子類
        fs = (FileSplit) split;

        //獲取文件路徑
        Path path = fs.getPath();

        //獲取文件系統
        FileSystem fileSystem = FileSystem.get(context.getConfiguration());
        //FileSystem fileSystem = path.getFileSystem(context.getConfiguration());

        //開流
        fis = fileSystem.open(path);
    }

    /**
     * 讀取下一組KV
     * @return 讀到了返回true,反之返回false
     * @throws IOException
     * @throws InterruptedException
     */
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(flag){
            //讀取文件進入key和value
            String path = fs.getPath().toString();
            key.set(path);

            //文件是一次性讀完,bytes的長度不能為普遍的1024,當然這麼寫會涉及到大文件的問題,不做討論.
            byte[] bytes = new byte[(int) fs.getLength()];
            fis.read(bytes);
            value.set(bytes,0,bytes.length);

            //重新標記
            flag = false;

            return  true;
        }
        return false;
    }

    /**
     * 獲取當前讀到的key
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public Text getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /**
     * 獲取當前讀到的value
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    /**
     * 獲取當前讀取的進度
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public float getProgress() throws IOException, InterruptedException {
        //文件一次讀完,只有0和1的進度,根據flag來判斷
        return flag ? 0f : 1f;
    }

    /**
     * 關閉資源
     * @throws IOException
     */
    public void close() throws IOException {
        IOUtils.closeStream(fis);
    }
}

2) 自定義Inputformat

package com.nty.inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-11 9:09
 */
//需求中,key為文件路徑+名稱,所以key類型為Text,value為文件內容,用BytesWritable
public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> {

    //最後輸出的value為一個文件,所讓文件不能被切分,返回false
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    //返回自定義的 RecordReader
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new CustomRecordReader();
    }
}

3) 編寫Mapper類

package com.nty.inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-11 9:10
 */
public class CustomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(key,value);
    }
}

4) 編寫Reducer類

package com.nty.inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-11 9:10
 */
public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        for (BytesWritable value : values) {
            context.write(key, value);
        }
    }
}

5) 編寫Driver類

package com.nty.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * author nty
 * date time 2018-12-11 9:10
 */
public class CustomDriver {

    public static void main(String[] args) throws  Exception{
        //獲取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //設置類
        job.setJarByClass(CustomDriver.class);
        //設置input和output
        job.setInputFormatClass(CustomInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        //設置Mapper和Reducer
        job.setMapperClass(CustomMapper.class);
        job.setReducerClass(CustomReducer.class);

        //設置Mapper和Reducer的輸入輸出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //設置文件路徑
        FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
        //提交
        boolean b = job.waitForCompletion(true);

        System.exit(b ? 0 : 1);

    }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 今天出去面試,碰見一個面試題,說來也巧,又是劃分子網的問題,曾經花了一整個上午研究子網,算是研究透徹了,今天正好碰上順便分享下怎麼處理這個問題 面試官說好的給我一個鐘答題,實際給了我十五分鐘,原定的和HR視頻面試時間到了,還有兩個需要用英文簡述的題沒做,有點尷尬,英語太渣,廢話不多說,對於網路精英來 ...
  • Vagrant 是一個簡單易用的部署工具,用英文說應該是 Orchestration Tool 。它能幫助開發人員迅速的構建一個開發環境,幫助測試人員構建測試環境, Vagrant 基於 Ruby 開發,使用開源 VirtualBox 作為虛擬化支持,可以輕鬆的跨平臺部署。 通俗的來說, 就是在本地 ...
  • 1、概述 i.MX 6ULL系列晶元的MMDC是一個多模式DDR控制器,支持DDR3/DDR3Lx16和LPDDR2x16的存儲類型,MMDC是可配置,高性能,優化的記憶體控制器。 註:DDR3/DDR3Lx16、LPDDR2x16 ,此處的x16表示晶元位寬,每個傳輸周期能夠提供的數據量(bit)。 ...
  • 由於某些需求,需要在蘋果OS x系統下展示一組點雲,準備使用蘋果官方的三維顯示控制項來完成這一功能。場景點雲作為離散的點, 如果每個點以SCNnode的形式加入場景中,則回造成過大的記憶體消耗,筆者電腦下,單個場景展示到1w點時記憶體就崩潰了。所以準備通過 修改單個node的shader屬性,實現自定義的 ...
  • 最近導師讓學習golang, 然後我就找了些有關golang的學習視頻和網站。 昨天在電腦上下載了go tools, 之後在sublime上配置了golang的運行環境。By the way, 我的電腦是windows的操作系統。 Golang學習資料: 學習視頻:https://www.cours ...
  • mysql 3306 主庫配置文件 [client]port = 3306default-character-set=utf8mb4socket = /ssd/mysql/3306/tmp/mysql.sock# Here follows entries for some specific prog ...
  • 硝煙剛剛散去,馬上又將迎來雙十二了。自從雙十一火了之後,逐漸的雙十二也演變成為了一個全民狂歡的購物節日。我們都知道阿裡雲在雙十一推出了拼團的優惠活動,那麼在接下來的2018年雙十二又會給到我們一些什麼樣的優惠呢?下麵阿裡雲官方雲大使伺服器吧小編就帶大家來看看。 阿裡雲在12月7日正式上線了2018年 ...
  • 一. 概述 使用和配置主從複製非常簡單,每次當 slave 和 master 之間的連接斷開時, slave 會自動重連到 master 上,並且無論這期間 master 發生了什麼, slave 都將嘗試讓自身成為 master 的精確副本。這個系統的運行依靠三個主要的機制: (1) 當一個 ma ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...