一:序列化概述 1.1 什么是序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁 盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换 成内存中的对象。
1.2 为什么要序列化? 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能 由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机。
1.3 为什么不用 Java 的序列化? Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带 很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)。
1.4 Hadoop 序列化特点 (1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互。
二:自定义 bean 对象实现序列化接口(Writable) 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部 传递一个 bean 对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步:
2.1 必须实现 Writable 接口 2.2 反序列化时,需要反射调用空参构造函数,所以必须有空参构造 1 2 3 public FlowBean () { super (); }
2.3 重写序列化方法 1 2 3 4 5 6 @Override public void write (DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
2.4 重写反序列化方法 1 2 3 4 5 6 @Override public void readFields (DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
2.5 注意反序列化的顺序和序列化的顺序完全一致 2.6 要想把结果显示在文件中,需要重写 toString(),可用”\t”分开,方便后续用。 2.7 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。 1 2 3 4 5 @Override public int compareTo (FlowBean o) { return this .sumFlow > o.getSumFlow() ? -1 : 1 ; }
三:序列化案例实操 3.1 需求 统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据 phone_data.txt
(2)输入数据格式
(3)期望输出数据格式
3.2 需求分析
3.3 编写 MapReduce 程序 (1)编写流量统计的 Bean 对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package cn.aiyingke.mapreduce.writable;import lombok.Getter;import lombok.Setter;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter @Setter public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean () { } public void setSumFlow () { this .sumFlow = this .upFlow + this .downFlow; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields (DataInput dataInput) throws IOException { this .upFlow = dataInput.readLong(); this .downFlow = dataInput.readLong(); this .sumFlow = dataInput.readLong(); } @Override public String toString () { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
(2)编写 Mapper 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package cn.aiyingke.mapreduce.writable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper <LongWritable, Text, Text, FlowBean> { private final Text outK = new Text (); private final FlowBean outV = new FlowBean (); @Override protected void map (LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { final String line = value.toString(); final String[] split = line.split("\t" ); final String phone = split[1 ]; final String upFlow = split[split.length - 3 ]; final String downFlow = split[split.length - 2 ]; outK.set(phone); outV.setUpFlow(Long.parseLong(upFlow)); outV.setDownFlow(Long.parseLong(downFlow)); outV.setSumFlow(); context.write(outK, outV); } }
(3)编写 Reducer 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package cn.aiyingke.mapreduce.writable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer <Text, FlowBean, Text, FlowBean> { private final FlowBean outV = new FlowBean (); @Override protected void reduce (Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long totalUpFlow = 0 ; long totalDownFlow = 0 ; for (FlowBean value : values) { totalUpFlow += value.getUpFlow(); totalDownFlow += value.getDownFlow(); } outV.setUpFlow(totalUpFlow); outV.setDownFlow(totalDownFlow); outV.setSumFlow(); context.write(key, outV); } }
(4)编写 Driver 驱动类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package cn.aiyingke.mapreduce.writable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { final Configuration configuration = new Configuration (); final Job job = Job.getInstance(); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path ("Y:\\Temp\\phone_data.txt" )); FileOutputFormat.setOutputPath(job, new Path ("Y:\\Temp\\output2\\" )); final boolean b = job.waitForCompletion(true ); System.exit(b ? 0 : 1 ); } }