什麼是二次排序 待排序的數據具有多個欄位,首先對第一個欄位排序,再對第一欄位相同的行按照第二欄位排序,第二次排序不破壞第一次排序的結果,這個過程就稱為二次排序 如何在mapreduce中實現二次排序 mapreduce的工作原理 MR的工作原理如下圖(如果看不清可右鍵新標簽頁查看): 圖片部分數據參 ...
什麼是二次排序
待排序的數據具有多個欄位,首先對第一個欄位排序,再對第一欄位相同的行按照第二欄位排序,第二次排序不破壞第一次排序的結果,這個過程就稱為二次排序。
如何在mapreduce中實現二次排序
mapreduce的工作原理
MR的工作原理如下圖(如果看不清可右鍵新標簽頁查看):
圖片部分數據參考自:https://www.bbsmax.com/A/KE5Qjg6qdL/
相關重點:
- 分區(partitioning):使得具有相同Key值的鍵值對可以被劃分到一起,並且保證對應單個Key值的所有鍵值對會被髮送到同一個Reducer上去處理(更準確的說,應該是在同一個Reducer上的一次reduce函數調用上被處理)
- 分組(grouping):合併對應單個Key值的鍵值對 (<key, value_1>, <key, value_2>, ..., <key, value_n>) 為 <key, (value_1, value_2, ..., value_n)> 並作為一次reduce函數調用的輸入參數
- 總結:分區和分組兩個應該結合起來看待,兩者的目標都是 使得對應單個key值的所有鍵值對會在同一個Reducer上的一次reduce函數調用上被處理,而分區提供前提,分組實現操作。
- Shuffle:MR的核心,經常被稱為MR中“magic”發生的地方
示例數據以及需求
為了方便分析整個過程,這裡以如下數據作為示例,現在假設給出如下的數據,數據有兩列,每列都是整形數據,中間以空格劃分。
7 5
-9999 1
3 95
-9999 5
2 7
1 2
4 62
4 13
2 99
1 8
7 8888
要求輸出結果為:
------------------------------------------------
-9999 1
-9999 5
------------------------------------------------
1 2
1 8
------------------------------------------------
2 7
2 99
------------------------------------------------
3 95
------------------------------------------------
4 13
4 62
------------------------------------------------
7 5
7 8888
可以看到這就是一個二次排序的過程。
思路一:簡單粗暴版
假設每一行以空格劃分的兩個Int型數據分別為Int1、Int2,那麼最簡單的思路是:Mapper以每一行數據作為輸入,輸出鍵值對為<Int1, Int2>,由於我們知道在reducer運行之前,數據會先按照Key也就是Int1排序,那麼Int1相同的數據就將合併到一起供同一個Reducer進行處理,那麼我們便可以在reduce函數中對輸入的<Int1, [Int2-list]>按照Int2升序操作即可。
現在來分析一下,在這個思路下,一個Reducer要接收一個給定Key的所有值並對其進行內部排序,如果數據量大的話,那顯然這會耗盡機器的記憶體,對於實際運用,這是不可取的思路。
思路二:進階版
仔細觀察MR的原理圖就可以發現,MR的分區、排序、分組等操作都是針對Key進行的,既然我們想要對兩個欄位都進行排序,那麼可以將Int1和Int2組合成新的Key,原來的Value保持不變(不過這時候Value其實都不重要了,因為Key就包含了原來鍵值對的所有信息了,所以Value其實也可以設置為Null,這裡就選擇保持Value不變的方式進行操作),這樣一來按照MR的原理圖來看,對於新Key,
- 其分區邏輯為:只對Int1進行分區(預設的分區操作是以整個Key進行哈希操作的,這就可能把有同樣Int1的組合Key發送給不同的Reducer,這顯然不是我們想要的);
- 其排序邏輯為:先對Int1排序,在Int1相同的基礎上對Int2排序(即是二次排序的邏輯);
- 其分組邏輯為:只對Int1進行分組(預設的分組操作是相同的Key才會被分到同一個組,這裡只對Int1分組就可保證與原邏輯一致,使得Int1相同的數據可以在一次reduce函數被調用時一同被處理)。
下麵開始講解要實現的代碼。
一、自定義組合Key
在MR中,所有的Key值類型都必須實現WritableComparable介面,使其支持可序列化(用於寫入磁碟)和可比較(用於排序)。
- 要是可序列化的就得實現readFiels()和write()這兩個序列化和反序列化函數
- 要是可比較的就得實現compareTo()函數,該函數即是排序規則的實現
代碼實現如下:
public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); }關於Key排序的規則,hadoop中依次判斷如下:
@Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int compareTo(IntPair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } }
- 如果調用job的setSortComparatorClass()設置的mapred.output.key.comparator.class對應的comparator
- 否則,使用key已經登記的comparator
- 否則,實現介面WritableComparable的compareTo()函數來操作
- 在下麵第三部分講分組邏輯的實現時就有討論相關的問題,這裡筆者認為相比自定義Key類的compareTo函數,setSortComparator設置的comparator如果是用RawComparator實現的話,那麼後者是基於直接在位元組上進行比較的操作(不用反序列化),而前者還需要反序列化後再進行比較,顯然後者的實現效率要高一點。所以如果要追求Key值間的更高效率的比較的話,就有了後者存在的意義了。
這裡再思考一個問題,上面講到 “在MR中,所有的Key值類型都必須實現WritableComparable介面,使其支持可序列化(用於寫入磁碟)和可比較(用於排序)”,但是其實java本身就實現了序列化機制,那麼為什麼hadoop不直接用而是要引入一個Writable介面呢?
- 既然hadoop不用,那肯定是有它的原理的(廢話)。那就要來看看hadoop對序列化的需求是怎樣的以及java本身的序列化到底有什麼缺點。
- hadoop對序列化的需求:hadoop在集群之間在進行節點間的通訊(使用RPC調用)、存儲數據的時候,都需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。
- 計算量開銷大,且序列化的結果體積大太,有時能達到對象大小的數倍乃至十倍。它的引用機制也會導致大文件不能分割的問題。(這顯然就不適合hadoop了)
關於Writable和WritableComparator的區別可參考:Hadoop中Writable和WritableComparable區別
二、實現組合Key的分區邏輯
這裡有兩種實現方式,實現其一就可以實現目的。
實現方式一:自定義分區類
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() % numPartitions);
}
}
由於分區只針對Int1,所以這裡進行哈希時只使用到了Key.getFirst()。由於分區的標號只能是0到numPartitions-1的整數,所以getPartition()函數中就要個取模操作。同時為了保證分區的結果為正,這裡最後要取最絕對值。如果不在0到numPartitions-1範圍內就會報Illegal partition的錯誤。
這樣在通過添加 job.setPartitionerClass(FirstPartitioner.class); 就可以實現設置了。
實現方式二:重載組合Key的hashCode()函數以及equals()函數
以下代碼在組合Key——IntPair中實現。
@Override public int hashCode() { return first; } @Override public boolean equals(Object other) { if (other instanceof IntPair) { IntPair o = (IntPair) other; return o.first == first && o.second == second; } else { return false; } }
在Java中hashCode()函數和equals函數基本上是成對實現的,關於hashCode()函數的設計方式可參考:hashCode 方法及 equals 方法的規範,一般對於Int型數據,其哈希值就是其本來的值,所以這裡直接返回first而不需要進行什麼乘法或取模運算。
若選擇用這種方式實現,則預設使用HashPartitioner作為分區類,這裡組合鍵實現的hashCode()函數就在這裡被調用了。
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
關於為什麼要將key的哈希值和Int最大值相與可參考 what does that mean for Text.hashCode() & Interger.MAX_VALUE?
其實仔細觀察後就可以發現上面這兩種方式最後得到的結果都是相同的。
但是這裡有個小問題:感覺equals函數在整個過程中貌似沒有用上,就算去掉這個函數,程式還是能不報錯執行,不過看網上很多關於二次排序的博客文章都提到要實現這個函數,這裡我的理解是,除非我們確實需要用到Key的equals函數,否則在這篇範圍內是可以不用實現它的,而網上的許多關於二次排序的文章中其實也沒有用到這個函數。但是貌似hashcode和equals這兩個函數經常是成對出現的,為了保險起見,重載實現一下也無妨。
註意點:預設情況下Reducer的個數只有一個,即numReduceTasks=1,分區數為1,這時候也就沒有必要進行分區操作了(因為反正數據最終都是到同一個Reducer上去,分區的本意就是為了劃分數據到不同的Reducer上去的),所以當Reducer的個數為1時(或者預設時),實現方式一重載的getPartition函數就不會被執行,同理實現方式二重載的hashCode()函數也不會被執行。
三、實現分組邏輯
這裡也有兩種實現方式,實現其一就可以實現目的。
實現方式一:繼承(extends)writableComparator 類
實現方式很簡單,直接比較複合鍵的第一個值即可。
public static class FirstGroupingComparator extends WritableComparator{ protected FirstGroupingComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair key1 = (IntPair) w1; IntPair key2 = (IntPair) w2; int l = key1.getFirst(); int r = key2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
實現方式二:實現(implements)RawComparator 介面
public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
這裡只要重載兩個輸入參數不同的compare函數即可。
- 對於第一個compare函數,其直接進行的是位元組流上的比較,省去了反序列化的操作,比較效率會高一點,但是也相對難懂了一點,首先查看其在父類的參數的定義說明:
/** * Compare two objects in binary. * b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * * @param b1 The first byte array. * @param s1 The position index in b1. The object under comparison's starting index. * @param l1 The length of the object in b1. * @param b2 The second byte array. * @param s2 The position index in b2. The object under comparison's starting index. * @param l2 The length of the object under comparison in b2. * @return An integer result of the comparison. */ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
其進一步調用了WriteableComparator.compareBytes函數(該函數實現如下)
/** Lexicographic order of binary data. */ public static int compareBytes(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); }
觀察compare函數以及其調用的compareBytes函數,這兩者的輸入參數的命名都是一樣的,所以它們對應的意義也應該一樣,而對於傳給compareBytes函數的參數 l1 和 l2 只需要設置為Integer.SIZE/8(也就是4個位元組的長度,剛好是一個int型數據的位元組長度數目,這樣就達到了只比較IntPair的 first 部分的值的目的,從而實現分組的邏輯)。
-
PS:IntPair在class中依次定義了first和second兩個int型變數,這類對象在轉換成bytes數組時會依次將first、second轉換為位元組數據然後順序拼接起來,因此假設現在有個IntPair變數A,其轉換為bytes數組的變數為B,那麼B的長度就為8,B中前4個值就對應A.first,B中後4個值就對應A.second。可以使用如下函數輸出驗證這一想法。
public static int bytes2Int(byte[] b, int start, int len) { int sum = 0; int end = start + len; for (int i = start; i < end; i++) { int n = ((int)b[i]) & 0xff; n <<= (--len) * 8; sum += n; } return sum; }
-
- 對於第二個compare函數,其直接比較IntPair的第一個值,思路簡單,但是有個問題時,這個函數必須重載但是實際上在二次排序中並沒有運行該函數,不知道重載了有什麼用。
關於以上兩種方式的總結
首先,這兩種方式要起效通過 job.setGroupingComparatorClass(FirstGroupingComparator.class); 即可。
回顧下GroupingComparator的作用,其在Reducer中被使用,對Key值相同的數據歸類成同個組,Key值不同的數組歸類到不同的組,而同個組的數據會在一次reduce函數調用中一次性處理,所以其只需要涉及到鍵值間的比較而不用涉及到鍵值的序列化等操作。
再回顧上面的兩種實現方式,GroupingComparator皆可通過繼承writableComparable類也可以通過實現RawComparator介面來實現,而前者writableComparable也是後者RawComparator介面的一個實現,前者相對來說多了鍵值對的序列化功能,而再進行鍵值間的比較時,前者需要先反序列化後才可以進行比較,而後者直接在位元組數組層面上進行比較,顯然後者的效率要高點,所以在實踐中如果追求效率的話,用RawComparator實現GroupingComparator效率相對會比較高。
四、實現Mapper
/** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } }
實現思路很簡單,就是讀取每一行的數據,得到其中的兩個int數據left和right,進一步得到鍵值對<(left, right), right>。
但是實現起來還是有一點要註意的:Mapper的map函數在實踐中會被調用很多次,所以一些能夠聲明在map函數之外的變數就不要聲明在map函數裡面,比如這裡的private final的int型變數key和value就聲明在map函數之外,在map函數調用的過程中它們每一次都設置新的值,而新的值通過context.write函數執行之後這兩個變數又可以復用了。而如果聲明在map函數裡面,則可能會存在頻繁地調用map函數處理每一行輸入的數據,這個過程中不斷地new變數不斷地delete變數,效率上有點影響的。
五、實現Reducer
/** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } }
經過了Mapper的map、分區排序、合併等操作,到達Reducer的時候其實已經是二次排序完成了,所以這裡就只需要將數據輸出出來即可。
六、全部代碼
package test.linzch3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Comparator; import java.util.StringTokenizer; import java.util.function.Function; import java.util.function.ToDoubleFunction; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; /** * This is an example Hadoop Map/Reduce application. * It reads the text input files that must contain two integers per a line. * The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */ public class SecondarySort { /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int compareTo(IntPair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } // @Override // public int hashCode() { // return first; // } // // @Override // public boolean equals(Object other) { // if (other instanceof IntPair) { // IntPair o = (IntPair) other; // return o.first == first && o.second == second; // } else { // return false; // } // } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() % numPartitions); } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ // public static class FirstGroupingComparator extends WritableComparator{ // protected FirstGroupingComparator() // { // super(IntPair.class, true); // } // @Override // public int compare(WritableComparable w1, WritableComparable w2) // { // IntPair key1 = (IntPair) w1; // IntPair key2 = (IntPair) w2; // int l = key1.getFirst(); // int r = key2.getFirst(); // return l == r ? 0 : (l < r ? -1 : 1); // } // } public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: secondarysort <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "secondary sort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is IntPair, IntWritable job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); /*delete the output directory if exists*/ Path out = new Path(otherArgs[otherArgs.length - 1]); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } System.exit(job.waitForCompletion(true) ? 0 : 1); } }