原帖地址:http://www.ptbird.cn/mapreduce-tempreture.html 「 Hadoop」mapreduce對溫度數據進行自定義排序、分組、分區等 一、需求說明 1、數據文件說明 hdfs中有一些存儲溫度的數據文件,以文本形式存儲,示例如下: 日期和時間中間是空格,為 ...
原帖地址:http://www.ptbird.cn/mapreduce-tempreture.html
「 Hadoop」mapreduce對溫度數據進行自定義排序、分組、分區等
一、需求說明
1、數據文件說明
hdfs中有一些存儲溫度的數據文件,以文本形式存儲,示例如下:
日期和時間中間是空格,為整體,表示檢測站點監測的時間,後面是檢測的溫度,中間通過製表符 t 相隔。
2、需求
- 計算在1949-1955年中,每年的溫度降序排序且每年單獨一個文件輸出存儲
需要進行自定義分區、自定義分組、自定義排序。
二、解決
1、思路
- 按照年份升序排序再按照每年的溫度降序排序
- 按照年份進行分組,每一年份對應一個reduce task
2、自定義mapper輸出類型KeyPair
可以看出,每一行溫度姑且稱為一個數據,每個數據中有兩部分,一部分是時間,另一部分是溫度。
因此map輸出必須使用自定義的格式輸出,並且輸出之後需要自定義進行排序和分組等操作,預設的那些都不管用了。
定義KeyPair
自定義的輸出類型因為要將map的輸出放到reduce中去運行,因此需要實現hadoop的WritableComparable的介面,並且該介面的模板變數也得是KeyPair,就像是LongWritable一個意思(查看LongWritable的定義就可以知道)
實現WritableComparable 的介面,就必須重寫write/readFileds/compareTo三個方法,依次作用於序列化/反序列化/比較
同時需要重寫toString和hashCode避免equals的問題。
KeyPair定義如下
值得註意的是:在進行序列化輸出的時候也就是write,裡面用了將標準格式的時間(文件中顯示的格式時間)進行的時間的轉換,用了DataInput和DataOutput
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Project : hadooptest2
* Package : com.mapreducetest.temp
* User : Postbird @ http://www.ptbird.cn
* TIME : 2017-01-19 21:53
*/
/**
* 為溫度和年份封裝成對象
* year表示年份 而temp為溫度
*/
public class KeyPair implements WritableComparable<KeyPair>{
//年份
private int year;
//溫度
private int temp;
public void setYear(int year) {
this.year = year;
}
public void setTemp(int temp) {
this.temp = temp;
}
public int getYear() {
return year;
}
public int getTemp() {
return temp;
}
@Override
public int compareTo(KeyPair o) {
//傳過來的對象和當前的year比較 相等為0 不相等為1
int result=Integer.compare(year,o.getYear());
if(result != 0){
//兩個year不相等
return 0;
}
//如果年份相等 比較溫度
return Integer.compare(temp,o.getTemp());
}
@Override
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(temp);
}
@Override
//反序列化
public void readFields(DataInput dataInput) throws IOException {
this.year=dataInput.readInt();
this.temp=dataInput.readInt();
}
@Override
public String toString() {
return year+"\t"+temp;
}
@Override
public int hashCode() {
return new Integer(year+temp).hashCode();
}
}
3、自定義分組
將同一年監測的溫度放到一起,因此需要對年份進行比較。
因此比較輸入的數據中的年份即可,註意此時比較的都是KeyPair的類型,Map出來的輸出也是這個類型。
因為繼承了WritableComparator,因此需要重寫compare方法,比較的是KeyPair(KeyPair實現了WritableComparable介面),實際比較的使他們的年份,年份相同則得到0
/**
* Project : hadooptest2
* Package : com.mapreducetest.temp
* User : Postbird @ http://www.ptbird.cn
* TIME : 2017-01-19 22:08
*/
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 為溫度分組 比較年份即可
*/
public class GroupTemp extends WritableComparator{
public GroupTemp() {
super(KeyPair.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//年份相同返回的是0
KeyPair o1=(KeyPair)a;
KeyPair o2=(KeyPair)b;
return Integer.compare(o1.getYear(),o2.getYear());
}
}
4、自定義分區
自定義分區的目的是在根據年份分好了組之後,將不同的年份創建不同的reduce task任務,因此需要對年份處理。
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* Project : hadooptest2
* Package : com.mapreducetest.temp
* User : Postbird @ http://www.ptbird.cn
* TIME : 2017-01-19 22:17
*/
//自定義分區
//每一個年份生成一個reduce任務
public class FirstPartition extends Partitioner<KeyPair,Text>{
@Override
public int getPartition(KeyPair key, Text value, int num) {
//按照年份進行分區 年份相同,返回的是同一個值
return (key.getYear()*127)%num;
}
}
5、自定義排序
最終還是比較的是溫度的排序,因此這部分也是非常重要的。
根據上面的需求,需要對年份進行生序排序,而對溫度進行降序排序,首選比較條件是年份.
/**
* Project : hadooptest2
* Package : com.mapreducetest.temp
* User : Postbird @ http://www.ptbird.cn
* TIME : 2017-01-19 22:08
*/
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 為溫度排序的封裝類
*/
public class SortTemp extends WritableComparator{
public SortTemp() {
super(KeyPair.class,true);
}
//自定義排序
@Override
public int compare(WritableComparable a, WritableComparable b) {
//按照年份升序排序 按照溫度降序排序
KeyPair o1=(KeyPair)a;
KeyPair o2=(KeyPair)b;
int result=Integer.compare(o1.getYear(),o2.getYear());
//比較年份 如果年份不相等
if(result != 0){
return result;
}
//兩個年份相等 對溫度進行降序排序,註意 - 號
return -Integer.compare(o1.getTemp(),o2.getTemp());
}
}
6、MapReduce程式的編寫
幾個值得註意的點:
- 數據文件中前面的時間是字元串,但是我們的KeyPair的set卻不是字元串,因此需要進行字元串轉日期的format操作,使用的是SimpleDateFormat,格式自然是"yyyy-MM-dd HH:mm:ss"了。
- 輸入每行數據之後,通過正則匹配"t"的製表符,然後將溫度和時間分開,將時間format並得到年份,將第二部分字元串去掉“℃”的符號得到數字,然後創建KeyPair類型的數據,在輸出即可。
- 每個年份都生成一個reduce task依據就是自定義分區中對年份進行了比較處理,為了簡單就把map的輸出結果在reduce中再輸出一次,三個reduce task,就會生成三個輸出文件。
- 因為使用了自定義的排序,分組,分區,因此就需要進行指定相關的class,同時也需要執行reduce task的數量。
- 其實最後客戶端還是八股文的固定形式而已,只不過多了自定義的指定,沒有別的。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* Project : hadooptest2
* Package : com.mapreducetest.temp
* User : Postbird @ http://www.ptbird.cn
* TIME : 2017-01-19 22:28
*/
public class RunTempJob {
//字元串轉日期format
public static SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* Mapper
* 輸出的Key是自定義的KeyPair
*/
static class TempMapper extends Mapper<LongWritable,Text,KeyPair,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line=value.toString();
//1949-10-01 14:21:02 34℃
// 前面是空格 時間和溫度通過\t分割
String[] ss=line.split("\t");
// System.err.println(ss.length);
if(ss.length==2){
try{
//獲得日期
Date date=SDF.parse(ss[0]);
Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);//得到年份
//字元串截取得到溫度,去掉℃
String temp = ss[1].substring(0,ss[1].indexOf("℃"));
//創建輸出key 類型為KeyPair
KeyPair kp=new KeyPair();
kp.setYear(year);
kp.setTemp(Integer.parseInt(temp));
//輸出
context.write(kp,value);
}catch(Exception ex){
ex.printStackTrace();
}
}
}
}
/**
* Reduce 區域
* Map的輸出是Reduce的輸出
*/
static class TempReducer extends Reducer<KeyPair,Text,KeyPair,Text> {
@Override
protected void reduce(KeyPair kp, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value:values){
context.write(kp,value);
}
}
}
//client
public static void main(String args[]) throws IOException, InterruptedException{
//獲取配置
Configuration conf=new Configuration();
//修改命令行的配置
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: temp <in> <out>");
System.exit(2);
}
//創建Job
Job job=new Job(conf,"temp");
//1.設置job運行的類
job.setJarByClass(RunTempJob.class);
//2.設置map和reduce的類
job.setMapperClass(RunTempJob.TempMapper.class);
job.setReducerClass(RunTempJob.TempReducer.class);
//3.設置map的輸出的key和value 的類型
job.setMapOutputKeyClass(KeyPair.class);
job.setMapOutputValueClass(Text.class);
//4.設置輸入文件的目錄和輸出文件的目錄
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
//5.設置Reduce task的數量 每個年份對應一個reduce task
job.setNumReduceTasks(3);//3個年份
//5.設置partition sort Group的class
job.setPartitionerClass(FirstPartition.class);
job.setSortComparatorClass(SortTemp.class);
job.setGroupingComparatorClass(GroupTemp.class);
//6.提交job 等待運行結束併在客戶端顯示運行信息
boolean isSuccess= false;
try {
isSuccess = job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
//7.結束程式
System.exit(isSuccess ?0:1);
}
}
三、生成效果:
HDFS中三個reduce task會生成三個輸出。
每個輸出文件都是每年中的溫度的排序結果:
可以看出,1951是map(也可以說是KeyPair)輸出的年份,46是溫度,而後面是將text又輸出了一次,每一年都是根據需求降序排序的。)
標簽:hadoopmapreduce文章版權:Postbird-There I am , in the world more exciting!
本文鏈接:http://www.ptbird.cn/mapreduce-tempreture.html
轉載請註明文章原始出處 !