一、MR排序的分類 1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件記憶體都是經過排序的; 2.全局排序; 3.輔助排序:再第一次排序後經過分區再排序一次; 4.二次排序:經過一次排序後又根據業務邏輯再次進行排序。 二、MR排序的介面——WritableComparabl ...
一、MR排序的分類
1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件記憶體都是經過排序的;
2.全局排序;
3.輔助排序:再第一次排序後經過分區再排序一次;
4.二次排序:經過一次排序後又根據業務邏輯再次進行排序。
二、MR排序的介面——WritableComparable
該介面繼承了Hadoop的Writable介面和Java的Comparable介面,實現該介面要重寫write、readFields、compareTo三個方法。
三、流量統計案例的排序與分區
/** * @author: PrincessHug * @date: 2019/3/24, 15:36 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FlowSortBean implements WritableComparable<FlowSortBean> { private long upFlow; private long dwFlow; private long flowSum; public FlowSortBean() { } public FlowSortBean(long upFlow, long dwFlow) { this.upFlow = upFlow; this.dwFlow = dwFlow; this.flowSum = upFlow + dwFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDwFlow() { return dwFlow; } public void setDwFlow(long dwFlow) { this.dwFlow = dwFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dwFlow); out.writeLong(flowSum); } @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dwFlow = in.readLong(); flowSum = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dwFlow + "\t" + flowSum; } @Override public int compareTo(FlowSortBean o) { return this.flowSum > o.getFlowSum() ? -1:1; } } public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取數據 String line = value.toString(); //切分數據 String[] fields = line.split("\t"); //封裝數據 long upFlow = Long.parseLong(fields[1]); long dwFlow = Long.parseLong(fields[2]); //傳輸數據 context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0])); } } public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> { @Override protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(),key); } } public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> { @Override public int getPartition(FlowSortBean key, Text value, int i) { String phoneNum = value.toString().substring(0, 3); int partition = 4; if ("135".equals(phoneNum)){ return 0; }else if ("137".equals(phoneNum)){ return 1; }else if ("138".equals(phoneNum)){ return 2; }else if ("139".equals(phoneNum)){ return 3; } return partition; } } public class FlowSortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //設置配置,初始化Job類 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //設置執行類 job.setJarByClass(FlowSortDriver.class); //設置Mapper、Reducer類 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); //設置Mapper輸出數據類型 job.setMapOutputKeyClass(FlowSortBean.class); job.setMapOutputValueClass(Text.class); //設置Reducer輸出數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowSortBean.class); //設置自定義分區 job.setPartitionerClass(FlowSortPartitioner.class); job.setNumReduceTasks(5); //設置文件輸入輸出類型 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout")); //提交任務 if (job.waitForCompletion(true)){ System.out.println("運行完成!"); }else { System.out.println("運行失敗!"); } } }
註意:再寫Mapper類的時候,要註意KV對輸出的數據類型,Key的類型一定要為FlowSortBean,因為在Mapper和Reducer之間進行的排序(只是排序)是通過Mapper輸出的Key來進行排序的,而分區可以指定是通過Key或者Value。
四、Combiner合併
Combiner是在MR之外的一個組件,可以用來在maptask輸出到環形緩衝區溢寫之後,分區排序完成時進行局部的彙總,可以減少網路傳輸量,進而優化MR程式。
Combiner是用在當數據量到達一定規模之後的,小的數據量並不是很明顯。
例如WordCount程式,當單詞文件的大小到達一定程度,可以使用自定義Combiner進行優化:
public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{ protected void reduce(Text key,Iterable<IntWritable> values,Context context){ //計數 int count = 0; //累加求和 for(IntWritable v:values){ count += v.get(); } //輸出 context.write(key,new IntWritable(count)); } }
然後再Driver類中設置使用Combiner類
job.setCombinerClass(WordCountCombiner.class);
如果仔細觀察,WordCount的自定義Combiner類與Reducer類是完全相同的,因為他們的邏輯是相同的,即在maptask之後的分區內先進行一次累加求和,然後到reducer後再進行總的累加求和,所以在設置Combiner時也可以這樣:
job.setCombinerClass(WordCountReducer.class);
註意:Combiner的應用一定要註意不能影響最終業務邏輯的情況下使用,比如在求平均值的時候:
mapper輸出兩個分區:3,5,7 =>avg=5
2,6 =>avg=4
reducer合併輸出: 5,4 =>avg=4.5 但是實際應該為4.6,錯誤!
所以在使用Combiner時要註意其不會影響最中的結果!!!