一、輔助排序 需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。 思路:1.封裝訂單類OrderBean,實現WritableComparable介面; 2.自定義Mapp ...
一、輔助排序
需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。
思路:1.封裝訂單類OrderBean,實現WritableComparable介面;
2.自定義Mapper類,確定輸入輸出數據類型,寫業務邏輯;
3.自定義分區,根據不同的訂單id返回不同的分區值;
4.自定義Reducer類;
5.輔助排序類OrderGroupingComparator繼承WritableComparator類,並定義無參構成方法、重寫compare方法;
6.書寫Driver類;
代碼如下:
/** * @author: PrincessHug * @date: 2019/3/25, 21:42 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class OrderBean implements WritableComparable<OrderBean> { private int orderId; private double orderPrice; public OrderBean() { } public OrderBean(int orderId, double orderPrice) { this.orderId = orderId; this.orderPrice = orderPrice; } public int getOrderId() { return orderId; } public void setOrderId(int orderId) { this.orderId = orderId; } public double getOrderPrice() { return orderPrice; } public void setOrderPrice(double orderPrice) { this.orderPrice = orderPrice; } @Override public String toString() { return orderId + "\t" + orderPrice; } @Override public int compareTo(OrderBean o) { int rs ; if (this.orderId > o.getOrderId()){ rs = 1; }else if (this.orderId < o.getOrderId()){ rs = -1; }else { rs = (this.orderPrice > o.getOrderPrice()) ? -1:1; } return rs; } @Override public void write(DataOutput out) throws IOException { out.writeInt(orderId); out.writeDouble(orderPrice); } @Override public void readFields(DataInput in) throws IOException { orderId = in.readInt(); orderPrice = in.readDouble(); } } public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取數據 String line = value.toString(); //切割數據 String[] fields = line.split("\t"); //封裝數據 int orderId = Integer.parseInt(fields[0]); double orderPrice = Double.parseDouble(fields[2]); OrderBean orderBean = new OrderBean(orderId, orderPrice); //發送數據 context.write(orderBean,NullWritable.get()); } } public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) { //構造參數中i的值為reducetask的個數 return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i; } } public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } public class OrderGrouptingComparator extends WritableComparator { //必須使用super調用父類的構造方法來定義對比的類為OrderBean protected OrderGrouptingComparator(){ super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean)a; OrderBean bBean = (OrderBean)b; int rs ; if (aBean.getOrderId() > bBean.getOrderId()){ rs = 1; }else if (aBean.getOrderId() < bBean.getOrderId()){ rs = -1; }else { rs = 0; } return rs; } } public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //配置信息,Job對象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //執行類 job.setJarByClass(OrderBean.class); //設置Mapper、Reducer類 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); //設置Mapper輸出數據類型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); //設置Reducer輸出數據類型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); //設置輔助排序 job.setGroupingComparatorClass(OrderGrouptingComparator.class); //設置分區類 job.setPartitionerClass(OrderPartitioner.class); //設置reducetask數量 job.setNumReduceTasks(3); //設置文件輸入輸出流 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out")); //提交任務 if (job.waitForCompletion(true)){ System.out.println("運行完成!"); }else { System.out.println("運行失敗!"); } } }
由於這是敲了很多次的代碼,沒有加太多註釋,請諒解!
二、Mapreduce整體的流程
1.有一塊200M的文本文件,首先將待處理的數據提交客戶端;
2.客戶端會向Yarn平臺提交切片信息,然後Yarn計算出所需要的maptask的數量為2;
3.程式預設使用FileInputFormat的TextInputFormat方法將文件數據讀到maptask;
4.maptask運行業務邏輯,然後將數據通過InputOutputContext寫入到環形緩衝區;
5.環形緩衝區其實是記憶體開闢的一塊空間,就是記憶體,當環形緩衝區內數據達到預設大小100M的80%時,發生溢寫;
6.溢寫出的數據會進行多次的分區排序(shuffle機制,下一個隨筆詳細解釋);
7.分區排序後的數據塊可以選擇進行Combiner合併,然後寫入本地磁碟;
8.reducetask等maptask完全運行完畢後,開始從磁碟中讀取maptask產出寫出的數據,然後進行合併文件,歸併排序(這時就是進行上面輔助排序的時候);
9.Reducer一次讀取一組數據,然後使用預設的TextOutputFormat方法將數據寫出到結果文件。