一:Combiner合并概述

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2)Combiner组件的父类就是Reducer。

(3)Combiner和Reducer的区别在于运行的位置

  • Combiner是在每一个MapTask所在的节点运行
  • Reducer是接收全局所有Mapper的输出结果

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。以下求平均值就不能够使用 Combiner;

二:自定义 Combiner 实现步骤

2.1 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

1
2
3
4
```

## 2.2 在 Job 驱动类中设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 三:Combiner 合并案例实操

## 3.1 需求

统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。

### (1)数据输入

- data.txt

### (2)期望输出数据

- Combine 输入数据多,输出时经过合并,输出数据降低。

## 3.2 需求分析

对每一个MapTask的输出局部汇总(Combiner);

![](./Mapreduce-Combiner合并/image-20221218183845643.png)

## 3.3 代码实现(方案一)

### (1)增加一个 WordCountCombiner 类继承 Reducer

```java
package cn.aiyingke.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 19:18 2022/12/18
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

private final IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// 定义聚合变量
int sum = 0;

// 遍历求和
for (IntWritable value : values) {
sum += value.get();
}

// 封装 outKV
outV.set(sum);

// 写出
context.write(key, outV);
}
}

(2)在 WordcountDriver 驱动类中指定 Combiner

1
2
// 设置自定义的 Combiner
job.setCombinerClass(WordCountCombiner.class);

3.4 代码实现(方案二)

(1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定

1
job.setCombinerClass(WordCountReducer.class);

3.5 代码汇总

(1)驱动器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package cn.aiyingke.mapreduce.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


/*
* 1.拿到配置环境变量
*/
Configuration configuration = new Configuration();

/*
* 2.拿到job作业对象
*/
Job job = Job.getInstance(configuration, "WordCount-MR");
/*
* 3.设置主类,(即:告知集群入口jar包在哪)
*/
job.setJarByClass(WordCountDriver.class);
/*
* 4.设置 mapper combiner reducer
*/
job.setMapperClass(WordCountMapper.class);
// 设置自定义的 Combiner
job.setCombinerClass(WordCountCombiner.class);
// job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
/*
* 5.设置输出 key value的类型
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

/*
* 6.设置输入、输出的数据路径
*/
FileInputFormat.addInputPath(job, new Path("Y:\\Temp\\data.txt"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\asdsad"));
/*
* 7.提交执行
*/
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

(2)Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.aiyingke.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;
/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 暂存一个常用的IntWritable对象,对 java 整形的封装
*/
IntWritable intWritable = new IntWritable(1);
/**
* 暂存一个 String 的封装类
*/
Text text = new Text();

/**
* 覆写map方法
*
* @throws InterruptedException 1.对输入的文件,以行为单位进行切分,按规则切分(空格);
* 2.遍历切分完成后的集合或迭代器
* 3.组合<key,value>,通过Context进行输出
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将传入的数据转换为字符串 指定分割符号为逗号
StringTokenizer stringTokenizer = new StringTokenizer(value.toString(), ",");
// 循环获取每个逗号分割出来的元素
while (stringTokenizer.hasMoreTokens()) {
// 接收获取元素
String word = stringTokenizer.nextToken();
// 添加分割后的元素到封装类中
text.set(word);
// 上下文调度
context.write(text, intWritable);
}
}
}

(3)Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.aiyingke.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:41 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 定义一个存储 int类型的求和结果的变量
*/
IntWritable sumIntWritable = new IntWritable();

/**
* 以key相同的组为单位处理,即 Reducer 方法调用一次,处理同 key的组数据
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个总词频变量
int count = 0;
// 循环计数
for (IntWritable val : values) {
count += val.get();

}
// 将频数封装
sumIntWritable.set(count);
// 上下文调度
context.write(key, sumIntWritable);

}
}

(4)Combiner 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.aiyingke.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 19:18 2022/12/18
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

private final IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// 定义聚合变量
int sum = 0;

// 遍历求和
for (IntWritable value : values) {
sum += value.get();
}

// 封装 outKV
outV.set(sum);

// 写出
context.write(key, outV);
}
}
1
2
3
// 日志输出
Combine input records=6
Combine output records=3