寫這篇文章,是因為最近遇到了mapreduce的二次排序問題。以前的理解不完全正確。首先看一下mapreduce的過程 相信這張圖熟悉MR的人都應該見過,再來一張圖 wordcount也不細說了,hadoop裡面的hello,world 之前我的理解是map過來的<k,v>會形成(k,<v1,v2, ...
寫這篇文章,是因為最近遇到了mapreduce的二次排序問題。以前的理解不完全正確。首先看一下mapreduce的過程
相信這張圖熟悉MR的人都應該見過,再來一張圖
wordcount也不細說了,hadoop裡面的hello,world
之前我的理解是map過來的<k,v>會形成(k,<v1,v2,v3...>)的格式,並且按照這種思路寫出來不少的mapreduce程式,而且沒有錯。
後來自定義Writable對象,封裝一組值作為key,也沒有什麼問題,而且一直認為key只要在compareTo中重寫 了方法就萬事大吉,而且compareTo返回0的會作為相同的key。誤區就在這裡,之前一直認為key相同的value會合併到一個"list"中-。這句話就有錯,key是key,value是value,根本不會將key對應的value合併在一起,真實情況是預設將key相同(compareTo返回0的)的合併成了一組,在組相同的裡面去foreach裡面的value,如果是自定義key的話你可以將key列印一下,或發現key並不相同。
上代碼:
public class Entry implements WritableComparable<Entry> { private String yearMonth; private int count; public Entry() { } @Override public int compareTo(Entry entry) { int result = this.yearMonth.compareTo(entry.getYearMonth()); if (result == 0) { result = Integer.compare(count, entry.getCount()); } return result; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(yearMonth); dataOutput.writeInt(count); } @Override public void readFields(DataInput dataInput) throws IOException { this.yearMonth = dataInput.readUTF(); this.count = dataInput.readInt(); } public String getYearMonth() { return yearMonth; } public void setYearMonth(String yearMonth) { this.yearMonth = yearMonth; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public String toString() { return yearMonth; } }
自定義分區 EntryPartitioner.java
public class EntryPartitioner extends Partitioner<Entry, Text> { @Override public int getPartition(Entry entry, Text paramVALUE, int numberPartitions) { return Math.abs((entry.getYearMonth().hashCode() % numberPartitions)); } }
自定義分組
public class EntryGroupingComparator extends WritableComparator { public EntryGroupingComparator() { super(Entry.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { Entry a1 = (Entry) a; Entry b1 = (Entry) b; return a1.getYearMonth().compareTo(b1.getYearMonth()); } }
mapper類
public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> { private Entry entry = new Entry(); private Text value = new Text(); @Override protected void map(LongWritable key, Text lines, Context context) throws IOException, InterruptedException { String line = lines.toString(); String[] tokens = line.split(","); String yearMonth = tokens[0] + "-" + tokens[1]; int count = Integer.parseInt(tokens[2]); entry.setYearMonth(yearMonth); entry.setCount(count); value.set(tokens[2]); context.write(entry, value); } }
reducer類
public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> { @Override protected void reduce(Entry key, Iterable<Text> values, Context context) throws IOException, InterruptedException { System.out.println("-----------------華麗的分割線-----------------"); StringBuilder builder = new StringBuilder(); for (Text value : values) { System.out.println(key+"==>"+value); builder.append(value.toString()); builder.append(","); } context.write(key, new Text(builder.toString())); } }
reducer中列印出來的跟原來想的不一樣,一組的值除了自定義分組的屬性相同外,其他的屬性有不同的。看來以前是自己理解不夠深入啊,特此寫出,以示警戒