示例 數據: 要求: 將每年每月中的氣溫排名前三的數據找出來 實現: 1.每一年用一個reduce任務處理; 2.創建自定義數據類型,存儲 [年-月-日-溫度]; 2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序; 3.自己實現分組函數,對 [年-月] 分組,r ...
示例
數據:
1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1951-12-02 12:21:02 45c 1951-12-03 12:21:02 50c 1951-12-23 12:21:02 33c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c
要求:
將每年每月中的氣溫排名前三的數據找出來
實現:
1.每一年用一個reduce任務處理;
2.創建自定義數據類型,存儲 [年-月-日-溫度];
2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序;
3.自己實現分組函數,對 [年-月] 分組,reduce的key是分組結果,根據相同的分組值,統計reduce的value值,只統計三個值就可以,因為已經實現了自排序函數。
註意點:
1.自定義數據類型的使用;
2.自定義排序類的使用;
3.自定義分組類的使用,分組類對那些數據進行分組;
4.自定義分區類,分區類與reduce job個數的關係;
示例代碼:
RunJob.java
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 import java.io.IOException; 14 import java.text.ParseException; 15 import java.text.SimpleDateFormat; 16 import java.util.Calendar; 17 import java.util.Date; 18 19 /** 20 * weather 統計天氣信息 21 * 22 * 數據: 23 * 1999-10-01 14:21:02 34c 24 * 1999-11-02 13:01:02 30c 25 * 26 * 要求: 27 * 將每年的每月中氣溫排名前三的數據找出來 28 * 29 * 實現: 30 * 1.每一年用一個reduce任務處理; 31 * 2.創建自定義數據類型,存儲 [年-月-日-溫度]; 32 * 2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序; 33 * 3.自己實現分組函數,對 [年-月] 分組,reduce的key是分組結果,根據相同的分組值,統計reduce的value值,只統計三個值就可以,因為已經實現了自排序函數。 34 * 35 * Created by Edward on 2016/7/11. 36 */ 37 public class RunJob { 38 39 public static void main(String[] args) 40 { 41 //access hdfs's user 42 System.setProperty("HADOOP_USER_NAME","root"); 43 44 Configuration conf = new Configuration(); 45 conf.set("fs.defaultFS", "hdfs://node1:8020"); 46 47 48 try { 49 FileSystem fs = FileSystem.get(conf); 50 51 Job job = Job.getInstance(conf); 52 job.setJarByClass(RunJob.class); 53 job.setMapperClass(MyMapper.class); 54 job.setReducerClass(MyReducer.class); 55 56 //需要指定 map out 的 key 和 value 57 job.setOutputKeyClass(InfoWritable.class); 58 job.setOutputValueClass(Text.class); 59 60 //設置分區 繼承 HashPartitioner 61 job.setPartitionerClass(YearPartition.class); 62 //根據年份創建指定數量的reduce task 63 job.setNumReduceTasks(3); 64 65 //設置排序 繼承 WritableComparator 66 //job.setSortComparatorClass(SortComparator.class); 67 68 //設置分組 繼承 WritableComparator 對reduce中的key進行分組 69 job.setGroupingComparatorClass(GroupComparator.class); 70 71 FileInputFormat.addInputPath(job, new Path("/test/weather/input")); 72 73 Path path = new Path("/test/weather/output"); 74 if(fs.exists(path))//如果目錄存在,則刪除目錄 75 { 76 fs.delete(path,true); 77 } 78 FileOutputFormat.setOutputPath(job, path); 79 80 boolean b = job.waitForCompletion(true); 81 if(b) 82 { 83 System.out.println("OK"); 84 } 85 86 } catch (Exception e) { 87 e.printStackTrace(); 88 } 89 } 90 91 92 public static class MyMapper extends Mapper<LongWritable, Text, InfoWritable, Text > { 93 94 private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 95 96 @Override 97 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 98 String[] str = value.toString().split("\t"); 99 100 try { 101 Date date = sdf.parse(str[0]); 102 Calendar c = Calendar.getInstance(); 103 c.setTime(date); 104 int year = c.get(Calendar.YEAR); 105 int month = c.get(Calendar.MONTH)+1; 106 int day = c.get(Calendar.DAY_OF_MONTH); 107 108 double temperature = Double.parseDouble(str[1].substring(0,str[1].length()-1)); 109 110 InfoWritable info = new InfoWritable(); 111 info.setYear(year); 112 info.setMonth(month); 113 info.setDay(day); 114 info.setTemperature(temperature); 115 116 context.write(info, value); 117 118 } catch (ParseException e) { 119 e.printStackTrace(); 120 } 121 } 122 } 123 124 public static class MyReducer extends Reducer<InfoWritable, Text, Text, NullWritable> { 125 @Override 126 protected void reduce(InfoWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 127 int i=0; 128 for(Text t: values) 129 { 130 i++; 131 if(i>3) 132 break; 133 context.write(t, NullWritable.get()); 134 } 135 } 136 } 137 }
InfoWritable.java
1 import org.apache.hadoop.io.WritableComparable; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 /** 8 * 自定義數據類型 繼承 WritableComparable 9 * 【年-月-日-溫度】 10 * Created by Edward on 2016/7/11. 11 */ 12 public class InfoWritable implements WritableComparable<InfoWritable> { 13 14 private int year; 15 private int month; 16 private int day; 17 private double temperature; 18 19 public void setYear(int year) { 20 this.year = year; 21 } 22 23 public void setMonth(int month) { 24 this.month = month; 25 } 26 27 public void setDay(int day) { 28 this.day = day; 29 } 30 31 public void setTemperature(double temperature) { 32 this.temperature = temperature; 33 } 34 35 public int getYear() { 36 return year; 37 } 38 39 public int getMonth() { 40 return month; 41 } 42 43 public int getDay() { 44 return day; 45 } 46 47 public double getTemperature() { 48 return temperature; 49 } 50 51 /** 52 * 53 * 對象比較,對溫度進行倒序排序 54 */ 55 @Override 56 public int compareTo(InfoWritable o) { 57 58 int result = Integer.compare(this.getYear(),o.getYear()); 59 if(result == 0) 60 { 61 result = Integer.compare(this.getMonth(),o.getMonth()); 62 if(result == 0) 63 { 64 return -Double.compare(this.getTemperature(), o.getTemperature()); 65 } 66 else 67 return result; 68 } 69 else 70 return result; 71 72 //return this==o?0:-1; 73 } 74 75 @Override 76 public void write(DataOutput dataOutput) throws IOException { 77 dataOutput.writeInt(this.year); 78 dataOutput.writeInt(this.month); 79 dataOutput.writeInt(this.day); 80 dataOutput.writeDouble(this.temperature); 81 } 82 83 @Override 84 public void readFields(DataInput dataInput) throws IOException { 85 this.year = dataInput.readInt(); 86 this.month = dataInput.readInt(); 87 this.day = dataInput.readInt(); 88 this.temperature = dataInput.readDouble(); 89 } 90 }
YearPartition.java
1 import org.apache.hadoop.io.Text; 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 3 4 /** 5 * 6 * 創建分區,通過key中的year來創建分區 7 * 8 * Created by Edward on 2016/7/11. 9 */ 10 public class YearPartition extends HashPartitioner <InfoWritable, Text>{ 11 @Override 12 public int getPartition(InfoWritable key, Text value, int numReduceTasks) { 13 return key.getYear()%numReduceTasks; 14 } 15 }
GroupComparator.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 /** 5 * 創建分組類,繼承WritableComparator 6 * 【年-月】 7 * Created by Edward on 2016/7/11. 8 */ 9 public class GroupComparator extends WritableComparator { 10 11 GroupComparator() 12 { 13 super(InfoWritable.class, true); 14 } 15 16 @Override 17 public int compare(WritableComparable a, WritableComparable b) { 18 InfoWritable ia = (InfoWritable)a; 19 InfoWritable ib = (InfoWritable)b; 20 21 int result = Integer.compare(ia.getYear(),ib.getYear()); 22 if(result == 0) 23 { 24 return Integer.compare(ia.getMonth(),ib.getMonth()); 25 } 26 else 27 return result; 28 } 29 }
SortComparator.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 /** 5 * 排序類,繼承WritableComparator 6 * 排序規則【年-月-溫度】 溫度降序 7 * Created by Edward on 2016/7/11. 8 */ 9 public class SortComparator extends WritableComparator { 10 11 /** 12 * 調用父類的構造函數 13 */ 14 SortComparator() 15 { 16 super(InfoWritable.class, true); 17 } 18 19 20 /** 21 * 比較兩個對象的大小,使用降序排列 22 * @param a 23 * @param b 24 * @return 25 */ 26 @Override 27 public int compare(WritableComparable a, WritableComparable b) { 28 29 InfoWritable ia = (InfoWritable)a; 30 InfoWritable ib = (InfoWritable)b; 31 32 int result = Integer.compare(ia.getYear(),ib.getYear()); 33 if(result == 0) 34 { 35 result = Integer.compare(ia.getMonth(),ib.getMonth()); 36 if(result == 0) 37 { 38 return -Double.compare(ia.getTemperature(), ib.getTemperature()); 39 } 40 else 41 return result; 42 } 43 else 44 return result; 45 } 46 }