一:排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
二:排序分类 (1)部分排序 MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在 处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
三:自定义排序 WritableComparable 原理分析 bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可 以实现排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public int compareTo (FlowBean bean) { int result; if (this .sumFlow > bean.getSumFlow()) { result = -1 ; }else if (this .sumFlow < bean.getSumFlow()) { result = 1 ; }else { result = 0 ; } return result; }
四:WritableComparable 排序案例实操(全排序) 4.1 需求 对 phone_data .txt 中的数据,按照总流量进行倒叙排序;
(1)输入数据
(2)期望输出数据
4.2 需求分析
4.3 代码实现 (1)FlowBean 对象实现 WritableComparable,增加比较功能 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FlowBean implements WritableComparable <FlowBean> {} @Override public int compareTo (FlowBean o) { if (this .sumFlow > o.sumFlow) { return -1 ; } else if (this .sumFlow < o.sumFlow) { return 1 ; } else { return 0 ; } }
(2)实体类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package cn.aiyingke.mapreduce.writableComparable;import lombok.Getter;import lombok.Setter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter @Setter public class FlowBean implements WritableComparable <FlowBean> { private long upFlow; private long downFlow; private long sumFlow; public FlowBean () { } public void setSumFlow () { this .sumFlow = this .upFlow + this .downFlow; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields (DataInput dataInput) throws IOException { this .upFlow = dataInput.readLong(); this .downFlow = dataInput.readLong(); this .sumFlow = dataInput.readLong(); } @Override public String toString () { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo (FlowBean o) { if (this .sumFlow > o.sumFlow) { return -1 ; } else if (this .sumFlow < o.sumFlow) { return 1 ; } else { return 0 ; } } }
(3)Mapper 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.aiyingke.mapreduce.writableComparable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper <LongWritable, Text, FlowBean, Text> { private final FlowBean outK = new FlowBean (); private final Text outV = new Text (); @Override protected void map (LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean,Text >.Context context) throws IOException, InterruptedException { final String line = value.toString(); final String[] split = line.split("\t" ); outK.setUpFlow(Long.parseLong(split[1 ])); outK.setDownFlow(Long.parseLong(split[2 ])); outK.setSumFlow(); outV.set(split[0 ]); context.write(outK, outV); } }
(4)Reducer 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package cn.aiyingke.mapreduce.writableComparable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer <FlowBean, Text, Text, FlowBean> { @Override protected void reduce (FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, key); } } }
(5)驱动器类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package cn.aiyingke.mapreduce.writableComparable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { final Configuration configuration = new Configuration (); final Job job = Job.getInstance(); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path ("Y:\\Temp\\phone_data.txt" )); FileOutputFormat.setOutputPath(job, new Path ("Y:\\Temp\\output\\" )); final boolean b = job.waitForCompletion(true ); System.exit(b ? 0 : 1 ); } }
五:WritableComparable 排序案例实操(区内排序) 5.1 需求 要求每个省份手机号输出的文件中按照总流量内部排序。
5.2 需求分析 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
5.3 代码实现 (1)增加自定义分区类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package cn.aiyingke.mapreduce.writableComparableWithPartition;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner2 extends Partitioner <FlowBean, Text> { @Override public int getPartition (FlowBean flowBean, Text text, int i) { final String phone = text.toString(); final String prePhone = phone.substring(0 , 3 ); int partition; if ("136" .equals(prePhone)) { partition = 0 ; } else if ("137" .equals(prePhone)) { partition = 1 ; } else if ("138" .equals(prePhone)) { partition = 2 ; } else if ("139" .equals(prePhone)) { partition = 3 ; } else { partition = 4 ; } return partition; } }
(2)FlowDriver 设置分区类和 Reduce 数量 1 2 3 4 5 job.setPartitionerClass(ProvincePartitioner2.class); job.setNumReduceTasks(5 );
5.4 二次排序
若想要进行区内排序的二次排序,比如对某省份手机号的总流量进行降序;若总流量相同,按照上行流量降序排列;
仅需要在FlowBean中,对 compareTo 方法进行业务逻辑的增加。
@Override
public int compareTo(FlowBean o) {
// 按照总流量比较,倒叙排列(降序)
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
/*
* Author: Rupert-Tears
* CreateTime: 18:29 2022/12/18
* Description: 二次排序,对上行流量升序排列
*/
if (this.upFlow > o.upFlow) {
return 1;
} else if (this.upFlow < o.upFlow) {
return -1;
} else {
return 0;
}
}
}