概述 序列化(Serialization)是指把結構化對象轉化為位元組流。 反序列化(Deserialization)是序列化的逆過程。把位元組流轉為結構化對象。 當要在進程間傳遞對象或持久化對象的時候,就需要序列化對象成位元組流,反之當要將接收到或從磁碟讀取的位元組流轉換為對象,就要進行反序列化。 Jav ...
概述
序列化(Serialization)是指把結構化對象轉化為位元組流。
反序列化(Deserialization)是序列化的逆過程。把位元組流轉為結構化對象。
當要在進程間傳遞對象或持久化對象的時候,就需要序列化對象成位元組流,反之當要將接收到或從磁碟讀取的位元組流轉換為對象,就要進行反序列化。
Java 的序列化(Serializable)是一個重量級序列化框架,一個對象被序列化後,會附帶很多額外的信息(各種校驗信息,header,繼承體系…),不便於在網路中高效傳輸;所以,hadoop 自己開發了一套序列化機制( Writable),精簡,高效。不用像 java 對象類一樣傳輸多層的父子關係,需要哪個屬性就傳輸哪個屬性值,大大的減少網路傳輸的開銷。
Writable是Hadoop的序列化格式,hadoop定義了這樣一個Writable介面。一個類要支持可序列化只需實現這個介面即可。
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
如需要將自定義的 bean 放在 key 中傳輸,則還需要實現 comparable 介面,因為 mapreduce 框中的 shuffle 過程一定會對 key 進行排序,此時,自定義的bean 實現的介面應該是:WritableComparable
代碼示例
1 . 需求
統計每一個用戶(手機號)所耗費的總上行流量、下行流量,總流量結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序。
準備數據
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
2 . 分析
實現自定義的bean 來封裝流量信息,並將bean 作為 map 輸出的 key 來傳輸
MR 程式在處理數據的過程中會對數據排序(map 輸出的 kv 對傳輸到 reduce之前,會排序),排序的依據是 map 輸出的 key。所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到 key 中,讓 key 實現介面:WritableComparable,然後重寫 key 的 compareTo 方法。
3 . 未排序的實現
自定義JavaBean
public class FlowBean implements WritableComparable<FlowBean>{ private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 * 先序列化的先反序列化 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } /** * 指定對象排序的方法 * 如果指定的數與參數相等返回 0。 * 如果指定的數小於參數返回 -1。 * 如果指定的數大於參數返回 1。 */ @Override public int compareTo(FlowBean o) { return this.getSumFlow() > o.getSumFlow() ? -1 : 1 ;//按照指定的總流量的倒序排序 // return this.getSumFlow() > o.getSumFlow() ? 1 : -1 ;//按照指定的總流量的正序排序 } }
Mapper方法
public class FlowSumMapper extends Mapper<LongWritable,Text,Text,FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phoNum = fields[1];//提前目標文件中的手機號 long upFlow = Long.parseLong(fields[fields.length-3]);//提取目標文件中的上行流量 long downFlow = Long.parseLong(fields[fields.length-2]);//提取目標文件中的下行流量 k.set(phoNum); v.set(upFlow,downFlow); context.write(k,v); } }
Reducer方法
public class FlowSumReducer extends Reducer<Text,FlowBean,Text,FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlowd = 0; for (FlowBean value : values) { sumUpFlow += value.getUpFlow();//獲取每條記錄的上行流量並計算總和 sumDownFlowd += value.getDownFlow();//獲取每條記錄的下行流量並計算總和 } v.set(sumUpFlow ,sumDownFlowd); context.write(key,v); } }
主方法
public class FlowSumRunner { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); //指定mr程式使用本地模式模擬一套環境執行mr程式,一般用於本地代碼測試 conf.set("mapreduce.framework.name","local"); //通過job方法獲得mr程式運行的實例 Job job = Job.getInstance(conf); //指定本次mr程式的運行主類 job.setJarByClass(FlowSumRunner.class); //指定本次mr程式使用的mapper reduce job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //指定本次mr程式map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //指定本次mr程式reduce輸出的數據類型,也就是說最終的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定本次mr程式待處理數據目錄 輸出結果存放目錄 FileInputFormat.addInputPath(job,new Path("D:\\flowsum\\input")); FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\output")); //提交本次mr程式 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1);//程式執行成功,退出狀態碼為0,退出程式,否則為1 } }
3 . 排序的實現
使用上面的輸出作為該需求的輸入
Mapper方法
public class FlowSumSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fileds = line.split("\t"); String phoNum = fileds[0]; long sumUpFlow = Long.parseLong(fileds[1]); long sumDownFlow = Long.parseLong(fileds[2]); v.set(phoNum); k.set(sumUpFlow,sumDownFlow); context.write(k,v); } }
Reducer方法
public class FlowSumSortReducer extends Reducer<FlowBean,Text,Text,FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text phoNum = values.iterator().next();//iterator中只有一個值 context.write(phoNum,key); } }
主方法
1 //得出上題結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序 2 public class FlowSumSortDriver { 3 public static void main(String[] args) throws Exception{ 4 Configuration conf = new Configuration(); 5 //指定mr程式使用本地模式模擬一套環境執行mr程式,一般用於本地代碼測試 6 conf.set("mapreduce.framework.name","local"); 7 8 //通過job方法獲得mr程式運行的實例 9 Job job = Job.getInstance(conf); 10 11 //指定本次mr程式的運行主類 12 job.setJarByClass(FlowSumSortDriver.class); 13 //指定本次mr程式使用的mapper reduce 14 job.setMapperClass(FlowSumSortMapper.class); 15 job.setReducerClass(FlowSumSortReducer.class); 16 //指定本次mr程式map輸出的數據類型 17 job.setMapOutputKeyClass(FlowBean.class); 18 job.setMapOutputValueClass(Text.class); 19 //指定本次mr程式reduce輸出的數據類型,也就是說最終的輸出類型 20 job.setOutputKeyClass(Text.class); 21 job.setOutputValueClass(FlowBean.class); 22 //指定本次mr程式待處理數據目錄 輸出結果存放目錄 23 FileInputFormat.addInputPath(job,new Path("D:\\flowsum\\output")); 24 FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\outsortput")); 25 26 //提交本次mr程式 27 boolean b = job.waitForCompletion(true); 28 System.exit(b ? 0 : 1);//程式執行成功,退出狀態碼為0,退出程式,否則為1 29 } 30 }
Mapreduce的分區—Partitioner
1 . 需求
將流量彙總統計結果按照手機歸屬地不同省份輸出到不同文件中。
2 . 分析
Mapreduce 中會將 map 輸出的 kv 對,按照相同 key 分組,然後分發給不同的 reducetask。
預設的分發規則為:根據 key 的 hashcode%reducetask 數來分發
所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件 Partitioner,自定義一個 CustomPartitioner 繼承抽象類:Partitioner,然後在job 對象中,設置自定義 partitioner: job.setPartitionerClass(CustomPartitioner.class)
3 . 實現
自定義partitioner類
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { public static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ provinceMap.put("134", 0); provinceMap.put("135", 1); provinceMap.put("136", 2); provinceMap.put("137", 3); provinceMap.put("138", 4); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { Integer code = provinceMap.get(key.toString().substring(0, 3)); if (code != null) { return code; } return 5; } }
Mapper、Reducer及主方法
1 public class FlowSumProvince { 2 public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 3 Text k = new Text(); 4 FlowBean v = new FlowBean(); 5 6 @Override 7 protected void map(LongWritable key, Text value,Context context) 8 throws IOException, InterruptedException { 9 //拿取一行文本轉為String 10 String line = value.toString(); 11 //按照分隔符\t進行分割 12 String[] fileds = line.split("\t"); 13 //獲取用戶手機號 14 String phoneNum = fileds[1]; 15 16 long upFlow = Long.parseLong(fileds[fileds.length-3]); 17 long downFlow = Long.parseLong(fileds[fileds.length-2]); 18 19 k.set(phoneNum); 20 v.set(upFlow, downFlow); 21 context.write(k,v); 22 } 23 } 24 25 public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ 26 FlowBean v = new FlowBean(); 27 @Override 28 protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException { 29 long upFlowCount = 0; 30 long downFlowCount = 0; 31 32 for (FlowBean flowBean : flowBeans) { 33 upFlowCount += flowBean.getUpFlow(); 34 downFlowCount += flowBean.getDownFlow(); 35 } 36 v.set(upFlowCount, downFlowCount); 37 context.write(key, v); 38 } 39 40 public static void main(String[] args) throws Exception{ 41 Configuration conf = new Configuration(); 42 Job job = Job.getInstance(conf); 43 44 //指定我這個 job 所在的 jar包位置 45 job.setJarByClass(FlowSumProvince.class); 46 //指定我們使用的Mapper是那個類 reducer是哪個類 47 job.setMapperClass(FlowSumProvinceMapper.class); 48 job.setReducerClass(FlowSumProvinceReducer.class); 49 // 設置我們的業務邏輯 Mapper 類的輸出 key 和 value 的數據類型 50 job.setMapOutputKeyClass(Text.class); 51 job.setMapOutputValueClass(FlowBean.class); 52 // 設置我們的業務邏輯 Reducer 類的輸出 key 和 value 的數據類型 53 job.setOutputKeyClass(Text.class); 54 job.setOutputValueClass(FlowBean.class); 55 56 //這裡設置運行reduceTask的個數 57 job.setNumReduceTasks(6); 58 59 //這裡指定使用我們自定義的分區組件 60 job.setPartitionerClass(ProvincePartitioner.class); 61 62 FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input")); 63 // 指定處理完成之後的結果所保存的位置 64 FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince")); 65 boolean res = job.waitForCompletion(true); 66 System.exit(res ? 0 : 1); 67 } 68 } 69 }