一、Hadoop數據序列化的數據類型 Java數據類型 => Hadoop數據類型 int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text boolean BooleanWrita ...
一、Hadoop數據序列化的數據類型
Java數據類型 => Hadoop數據類型
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
boolean BooleanWritable
byte ByteWritable
map MapWritable
array ArrayWritable
二、Hadoop的序列化
1.什麼是序列化?
在java中,序列化介面是Serializable,它下麵又實現了很多的序列化介面,所以java的序列化是一個重量級的序列化框架,一個對象被java序列化之後會附帶很多額外的信息(校驗信息、header、繼承體系等),不便於在網路中進行高效的傳輸,所以Hadoop開發了一套自己的序列化框架——Writable。
序列化就是把記憶體當中的對象,轉化為位元組序列以便於存儲和網路傳輸;
反序列化是將收到的位元組序列或硬碟當中的持續化數據,轉換成記憶體中的對象。
2.序列化的理解方法(自己悟的,不對勿噴~~)
比如下麵流量統計案例中,流量的封裝類FlowBean實現了Writable介面,其中定義了變數upFlow、dwFlow、flowSum;
在Mapper和Reducer類中初始化封裝類FlowBean時,記憶體會分配空間載入這些對象,而這些對象不便於在網路中高效的傳輸,這是封裝類FlowBean中的序列化方法將這些對象轉換為位元組序列,方便了存儲和傳輸;
當Mapper或Reducer需要將這些對象的位元組序列寫出到磁碟時,封裝類FlowBean中的反序列化方法將位元組序列轉換為對象,然後寫道磁碟中。
3.序列化特點
序列化與反序列化時分散式數據處理當中經常會出現的,比如hadoop通信是通過遠程調用(rpc)實現的,這個過程就需要序列化。
特點:1)緊湊;
2)快速
3)可擴展
4)可互操作
三、Mapreduce的流量統計程式案例
1.代碼
/** * @author: PrincessHug * @date: 2019/3/23, 23:38 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FlowBean implements Writable { private long upFlow; private long dwFlow; private long flowSum; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDwFlow() { return dwFlow; } public void setDwFlow(long dwFlow) { this.dwFlow = dwFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } public FlowBean() { } public FlowBean(long upFlow, long dwFlow) { this.upFlow = upFlow; this.dwFlow = dwFlow; this.flowSum = upFlow + dwFlow; } /** * 序列化 * @param out 輸出流 * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dwFlow); out.writeLong(flowSum); } /** * 反序列化 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dwFlow = in.readLong(); flowSum = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dwFlow + "\t" + flowSum; } } public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取數據 String line = value.toString(); //切分數據 String[] fields = line.split("\t"); //封裝數據 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dwFlow = Long.parseLong(fields[fields.length - 2]); //發送數據 context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow)); } } public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //聚合數據 long upFlow_sum = 0; long dwFlow_sum = 0; for (FlowBean f:values){ upFlow_sum += f.getUpFlow(); dwFlow_sum += f.getDwFlow(); } //發送數據 context.write(key,new FlowBean(upFlow_sum,dwFlow_sum)); } } public class FlowPartitioner extends Partitioner<Text,FlowBean> { @Override public int getPartition(Text key, FlowBean value, int i) { //獲取用來分區的電話號碼前三位 String phoneNum = key.toString().substring(0, 3); //設置分區邏輯 int partitionNum = 4; if ("135".equals(phoneNum)){ return 0; }else if ("137".equals(phoneNum)){ return 1; }else if ("138".equals(phoneNum)){ return 2; }else if ("139".equals(phoneNum)){ return 3; } return partitionNum; } } public class FlowCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //獲取配置,定義工具 Configuration conf = new Configuration(); Job job = Job.getInstance(); //設置運行類 job.setJarByClass(FlowCountDriver.class); //設置Mapper類及Mapper輸出數據類型 job.setMapperClass(FlowCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //設置Reducer類及其輸出數據類型 job.setReducerClass(FlowCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置自定義分區 job.setPartitionerClass(FlowPartitioner.class); job.setNumReduceTasks(5); //設置文件輸入輸出流 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout")); //返回運行完成 if (job.waitForCompletion(true)){ System.out.println("運行完畢!"); }else { System.out.println("運行出錯!"); } } }