什麼是二次排序 待排序的數據具有多個欄位,首先對第一個欄位排序,再對第一欄位相同的行按照第二欄位排序,第二次排序不破壞第一次排序的結果,這個過程就稱為二次排序 如何在mapreduce中實現二次排序 mapreduce的工作原理 MR的工作原理如下圖(如果看不清可右鍵新標簽頁查看): 圖片部分數據參 ...
- 分區(partitioning):使得具有相同Key值的鍵值對可以被劃分到一起,並且保證對應單個Key值的所有鍵值對會被髮送到同一個Reducer上去處理(更準確的說,應該是在同一個Reducer上的一次reduce函數調用上被處理)
- 分組(grouping):合併對應單個Key值的鍵值對 (<key, value_1>, <key, value_2>, ..., <key, value_n>) 為 <key, (value_1, value_2, ..., value_n)> 並作為一次reduce函數調用的輸入參數
- 總結:分區和分組兩個應該結合起來看待,兩者的目標都是 使得對應單個key值的所有鍵值對會在同一個Reducer上的一次reduce函數調用上被處理,而分區提供前提,分組實現操作。
- Shuffle:MR的核心,經常被稱為MR中“magic”發生的地方
7 5
-9999 1
3 95
-9999 5
2 7
1 2
4 62
4 13
2 99
1 8
7 8888
-9999 1
-9999 5
1 2
1 8
2 7
2 99
3 95
4 13
4 62
7 5
7 8888
假設每一行以空格劃分的兩個Int型數據分別為Int1、Int2,那麼最簡單的思路是:Mapper以每一行數據作為輸入,輸出鍵值對為<Int1, Int2>,由於我們知道在reducer運行之前,數據會先按照Key也就是Int1排序,那麼Int1相同的數據就將合併到一起供同一個Reducer進行處理,那麼我們便可以在reduce函數中對輸入的<Int1, [Int2-list]>按照Int2升序操作即可。
- 其分區邏輯為:只對Int1進行分區(預設的分區操作是以整個Key進行哈希操作的,這就可能把有同樣Int1的組合Key發送給不同的Reducer,這顯然不是我們想要的);
- 其排序邏輯為:先對Int1排序,在Int1相同的基礎上對Int2排序(即是二次排序的邏輯);
- 其分組邏輯為:只對Int1進行分組(預設的分組操作是相同的Key才會被分到同一個組,這裡只對Int1分組就可保證與原邏輯一致,使得Int1相同的數據可以在一次reduce函數被調用時一同被處理)。
- 要是可序列化的就得實現readFiels()和write()這兩個序列化和反序列化函數
- 要是可比較的就得實現compareTo()函數,該函數即是排序規則的實現
public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); }關於Key排序的規則,hadoop中依次判斷如下:
@Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int compareTo(IntPair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } }
- 如果調用job的setSortComparatorClass()設置的mapred.output.key.comparator.class對應的comparator
- 否則,使用key已經登記的comparator
- 否則,實現介面WritableComparable的compareTo()函數來操作
- 在下麵第三部分講分組邏輯的實現時就有討論相關的問題,這裡筆者認為相比自定義Key類的compareTo函數,setSortComparator設置的comparator如果是用RawComparator實現的話,那麼後者是基於直接在位元組上進行比較的操作(不用反序列化),而前者還需要反序列化後再進行比較,顯然後者的實現效率要高一點。所以如果要追求Key值間的更高效率的比較的話,就有了後者存在的意義了。
這裡再思考一個問題,上面講到 “在MR中,所有的Key值類型都必須實現WritableComparable介面,使其支持可序列化(用於寫入磁碟)和可比較(用於排序)”,但是其實java本身就實現了序列化機制,那麼為什麼hadoop不直接用而是要引入一個Writable介面呢?
- 既然hadoop不用,那肯定是有它的原理的(廢話)。那就要來看看hadoop對序列化的需求是怎樣的以及java本身的序列化到底有什麼缺點。
- hadoop對序列化的需求:hadoop在集群之間在進行節點間的通訊(使用RPC調用)、存儲數據的時候,都需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。
- 計算量開銷大,且序列化的結果體積大太,有時能達到對象大小的數倍乃至十倍。它的引用機制也會導致大文件不能分割的問題。(這顯然就不適合hadoop了)
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() % numPartitions);
由於分區只針對Int1,所以這裡進行哈希時只使用到了Key.getFirst()。由於分區的標號只能是0到numPartitions-1的整數,所以getPartition()函數中就要個取模操作。同時為了保證分區的結果為正,這裡最後要取最絕對值。如果不在0到numPartitions-1範圍內就會報Illegal partition的錯誤。
這樣在通過添加 job.setPartitionerClass(FirstPartitioner.class); 就可以實現設置了。
@Override public int hashCode() { return first; } @Override public boolean equals(Object other) { if (other instanceof IntPair) { IntPair o = (IntPair) other; return o.first == first && o.second == second; } else { return false; } }
在Java中hashCode()函數和equals函數基本上是成對實現的,關於hashCode()函數的設計方式可參考:hashCode 方法及 equals 方法的規範,一般對於Int型數據,其哈希值就是其本來的值,所以這裡直接返回first而不需要進行什麼乘法或取模運算。
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
關於為什麼要將key的哈希值和Int最大值相與可參考 what does that mean for Text.hashCode() & Interger.MAX_VALUE?
實現方式一:繼承(extends)writableComparator 類
public static class FirstGroupingComparator extends WritableComparator{ protected FirstGroupingComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair key1 = (IntPair) w1; IntPair key2 = (IntPair) w2; int l = key1.getFirst(); int r = key2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
實現方式二:實現(implements)RawComparator 介面
public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
- 對於第一個compare函數,其直接進行的是位元組流上的比較,省去了反序列化的操作,比較效率會高一點,但是也相對難懂了一點,首先查看其在父類的參數的定義說明:
/** * Compare two objects in binary. * b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * * @param b1 The first byte array. * @param s1 The position index in b1. The object under comparison's starting index. * @param l1 The length of the object in b1. * @param b2 The second byte array. * @param s2 The position index in b2. The object under comparison's starting index. * @param l2 The length of the object under comparison in b2. * @return An integer result of the comparison. */ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
/** Lexicographic order of binary data. */ public static int compareBytes(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); }
觀察compare函數以及其調用的compareBytes函數,這兩者的輸入參數的命名都是一樣的,所以它們對應的意義也應該一樣,而對於傳給compareBytes函數的參數 l1 和 l2 只需要設置為Integer.SIZE/8(也就是4個位元組的長度,剛好是一個int型數據的位元組長度數目,這樣就達到了只比較IntPair的 first 部分的值的目的,從而實現分組的邏輯)。
public static int bytes2Int(byte[] b, int start, int len) { int sum = 0; int end = start + len; for (int i = start; i < end; i++) { int n = ((int)b[i]) & 0xff; n <<= (--len) * 8; sum += n; } return sum; }
- 對於第二個compare函數,其直接比較IntPair的第一個值,思路簡單,但是有個問題時,這個函數必須重載但是實際上在二次排序中並沒有運行該函數,不知道重載了有什麼用。
首先,這兩種方式要起效通過 job.setGroupingComparatorClass(FirstGroupingComparator.class); 即可。
/** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } }
實現思路很簡單,就是讀取每一行的數據,得到其中的兩個int數據left和right,進一步得到鍵值對<(left, right), right>。
但是實現起來還是有一點要註意的:Mapper的map函數在實踐中會被調用很多次,所以一些能夠聲明在map函數之外的變數就不要聲明在map函數裡面,比如這裡的private final的int型變數key和value就聲明在map函數之外,在map函數調用的過程中它們每一次都設置新的值,而新的值通過context.write函數執行之後這兩個變數又可以復用了。而如果聲明在map函數裡面,則可能會存在頻繁地調用map函數處理每一行輸入的數據,這個過程中不斷地new變數不斷地delete變數,效率上有點影響的。
/** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } }
package test.linzch3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Comparator; import java.util.StringTokenizer; import java.util.function.Function; import java.util.function.ToDoubleFunction; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; /** * This is an example Hadoop Map/Reduce application. * It reads the text input files that must contain two integers per a line. * The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */ public class SecondarySort { /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int compareTo(IntPair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } // @Override // public int hashCode() { // return first; // } // // @Override // public boolean equals(Object other) { // if (other instanceof IntPair) { // IntPair o = (IntPair) other; // return o.first == first && o.second == second; // } else { // return false; // } // } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() % numPartitions); } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ // public static class FirstGroupingComparator extends WritableComparator{ // protected FirstGroupingComparator() // { // super(IntPair.class, true); // } // @Override // public int compare(WritableComparable w1, WritableComparable w2) // { // IntPair key1 = (IntPair) w1; // IntPair key2 = (IntPair) w2; // int l = key1.getFirst(); // int r = key2.getFirst(); // return l == r ? 0 : (l < r ? -1 : 1); // } // } public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: secondarysort <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "secondary sort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is IntPair, IntWritable job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); /*delete the output directory if exists*/ Path out = new Path(otherArgs[otherArgs.length - 1]); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } System.exit(job.waitForCompletion(true) ? 0 : 1); } }