一:排序概述

  • 排序是MapReduce框架中最重要的操作之一。
  • MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要
  • 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序
  • 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序
  • 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

二:排序分类

(1)部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

(2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在 处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序:(GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

(4)二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

三:自定义排序 WritableComparable 原理分析

bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可 以实现排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public int compareTo(FlowBean bean) {
int result;
// 按照总流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}

四:WritableComparable 排序案例实操(全排序)

4.1 需求

对 phone_data .txt 中的数据,按照总流量进行倒叙排序;

(1)输入数据

  • phone_data .txt

(2)期望输出数据

4.2 需求分析

4.3 代码实现

(1)FlowBean 对象实现 WritableComparable,增加比较功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {}


// 覆写compareTo方法
@Override
public int compareTo(FlowBean o) {

// 按照总流量比较,倒叙排列
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}

(2)实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.aiyingke.mapreduce.writableComparable;

import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 21:40 2022/12/17
* Description: Thought is already is late, exactly is the earliest time.
*/
@Getter
@Setter
public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow;
private long downFlow;
private long sumFlow;


public FlowBean() {
}

public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

@Override
public int compareTo(FlowBean o) {

// 按照总流量比较,倒叙排列
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}
}

(3)Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.aiyingke.mapreduce.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 21:59 2022/12/17
* Description: Thought is already is late, exactly is the earliest time.
*/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private final FlowBean outK = new FlowBean();
private final Text outV = new Text();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean,Text >.Context context) throws IOException, InterruptedException {

// 1.获取一行数据
// 13736230513 2481 24681 27162
final String line = value.toString();

// 2.切割数据
final String[] split = line.split("\t");

// 4.封装 outK outV
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);


// 5.写出 outK outV
context.write(outK, outV);
}
}

(4)Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.aiyingke.mapreduce.writableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 22:07 2022/12/17
* Description: Thought is already is late, exactly is the earliest time.
*/
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {


@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

//遍历values集合,循环写出,避免总流量相同的情况
for (Text value : values) {
// 调换 KV 位置,反向写出
context.write(value, key);
}
}
}

(5)驱动器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.aiyingke.mapreduce.writableComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 22:14 2022/12/17
* Description: Thought is already is late, exactly is the earliest time.
*/
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// 1.获取job对象
final Configuration configuration = new Configuration();
final Job job = Job.getInstance();

// 2.关联 driver mapper reducer
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);

// 3.设置 map 端输出 KV 类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4.设置程序最终输出 KV 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);


// 5.设置程序输入输出路径
FileInputFormat.setInputPaths(job, new Path("Y:\\Temp\\phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\output\\"));

// 6.提交job
final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

五:WritableComparable 排序案例实操(区内排序)

5.1 需求

要求每个省份手机号输出的文件中按照总流量内部排序。

5.2 需求分析

基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

5.3 代码实现

(1)增加自定义分区类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.aiyingke.mapreduce.writableComparableWithPartition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* Author: Rupert Tears
* Date: Created in 18:13 2022/12/18
* Description: Thought is already is late, exactly is the earliest time.
*/
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int i) {

// 获取手机号前三位
final String phone = text.toString();
final String prePhone = phone.substring(0, 3);

// 定义一个分区号变量 partition,根据 perPhone 设置分区号
int partition;
if ("136".equals(prePhone)) {
partition = 0;
} else if ("137".equals(prePhone)) {
partition = 1;
} else if ("138".equals(prePhone)) {
partition = 2;
} else if ("139".equals(prePhone)) {
partition = 3;
} else {
partition = 4;
}

// 返回分区号
return partition;
}
}

(2)FlowDriver 设置分区类和 Reduce 数量

1
2
3
4
5
// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner2.class);

// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);

5.4 二次排序

  • 若想要进行区内排序的二次排序,比如对某省份手机号的总流量进行降序;若总流量相同,按照上行流量降序排列;

  • 仅需要在FlowBean中,对 compareTo 方法进行业务逻辑的增加。

  •     @Override
        public int compareTo(FlowBean o) {
    
            // 按照总流量比较,倒叙排列(降序)
            if (this.sumFlow > o.sumFlow) {
                return -1;
            } else if (this.sumFlow < o.sumFlow) {
                return 1;
            } else {
                /*
                 * Author: Rupert-Tears
                 * CreateTime: 18:29 2022/12/18
                 * Description: 二次排序,对上行流量升序排列
                 */
                if (this.upFlow > o.upFlow) {
                    return 1;
                } else if (this.upFlow < o.upFlow) {
                    return -1;
                } else {
                    return 0;
                }
            }
        }