1. 需求 將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進位形式的key-value對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value 三個小文件 one.txt two.t ...
1. 需求
將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進位形式的key-value對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value
三個小文件
one.txt
yongpeng weidong weinan
sanfeng luozong xiaoming
two.txt
shuaige changmo zhenqiang
dongli lingu xuanxuan
three.txt
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin
2. 需求分析
3.案例代碼
1) 自定義RecordReader
package com.nty.inputformat; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * author nty * date time 2018-12-11 9:10 */ public class CustomRecordReader extends RecordReader<Text, BytesWritable> { /** * 由於採用了FileInputFormat的輸入方式,所以輸入源3個文件,會分成三個切片,所以一個RecordReader只處理一個文件,一次讀完 */ //標記文件是否被讀過,true表示沒被讀過 private boolean flag = true; private Text key = new Text(); private BytesWritable value = new BytesWritable(); //輸入流 FSDataInputStream fis; private FileSplit fs; /** * 初始化方法,只調用一次 * @param split * @param context * @throws IOException * @throws InterruptedException */ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //FileSplit是InputSplit的子類 fs = (FileSplit) split; //獲取文件路徑 Path path = fs.getPath(); //獲取文件系統 FileSystem fileSystem = FileSystem.get(context.getConfiguration()); //FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); //開流 fis = fileSystem.open(path); } /** * 讀取下一組KV * @return 讀到了返回true,反之返回false * @throws IOException * @throws InterruptedException */ public boolean nextKeyValue() throws IOException, InterruptedException { if(flag){ //讀取文件進入key和value String path = fs.getPath().toString(); key.set(path); //文件是一次性讀完,bytes的長度不能為普遍的1024,當然這麼寫會涉及到大文件的問題,不做討論. byte[] bytes = new byte[(int) fs.getLength()]; fis.read(bytes); value.set(bytes,0,bytes.length); //重新標記 flag = false; return true; } return false; } /** * 獲取當前讀到的key * @return * @throws IOException * @throws InterruptedException */ public Text getCurrentKey() throws IOException, InterruptedException { return this.key; } /** * 獲取當前讀到的value * @return * @throws IOException * @throws InterruptedException */ public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } /** * 獲取當前讀取的進度 * @return * @throws IOException * @throws InterruptedException */ public float getProgress() throws IOException, InterruptedException { //文件一次讀完,只有0和1的進度,根據flag來判斷 return flag ? 0f : 1f; } /** * 關閉資源 * @throws IOException */ public void close() throws IOException { IOUtils.closeStream(fis); } }
2) 自定義Inputformat
package com.nty.inputformat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * author nty * date time 2018-12-11 9:09 */ //需求中,key為文件路徑+名稱,所以key類型為Text,value為文件內容,用BytesWritable public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> { //最後輸出的value為一個文件,所讓文件不能被切分,返回false @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } //返回自定義的 RecordReader public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new CustomRecordReader(); } }
3) 編寫Mapper類
package com.nty.inputformat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * author nty * date time 2018-12-11 9:10 */ public class CustomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
4) 編寫Reducer類
package com.nty.inputformat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * author nty * date time 2018-12-11 9:10 */ public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { for (BytesWritable value : values) { context.write(key, value); } } }
5) 編寫Driver類
package com.nty.inputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; /** * author nty * date time 2018-12-11 9:10 */ public class CustomDriver { public static void main(String[] args) throws Exception{ //獲取job Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //設置類 job.setJarByClass(CustomDriver.class); //設置input和output job.setInputFormatClass(CustomInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //設置Mapper和Reducer job.setMapperClass(CustomMapper.class); job.setReducerClass(CustomReducer.class); //設置Mapper和Reducer的輸入輸出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //設置文件路徑 FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test")); FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out")); //提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }