一:Reduce Join

​ Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记 录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。

​ Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要 在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进 行合并就 ok 了。

二:Reduce Join 案例实操

2.1 需求

订单数据表

商品信息表

image-20230201175323217 image-20230201175344682

将商品信息表中数据根据商品 pid 合并到订单数据表中。

最终数据形式:

2.2 需求分析

​ 通过将关联条件作为 Map 输出的 key,将两表满足 Join 条件的数据并携带数据所来源 的文件信息,发往同一个 ReduceTask,在 Reduce 中进行数据的串联。

2.3 Reduce端表合并(数据倾斜)

2.4 代码实现

(1)创建商品和订单合并后的 TableBean 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.aiyingke.mapreduce.reduceJoin;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 18:01 2023/2/1
* Description: Thought is already is late, exactly is the earliest time.
*/
@NoArgsConstructor
@Getter
@Setter
public class TableBean implements Writable {

private String id; //订单ID
private String pid; //产品ID
private int amount; //产品数量
private String pName; //产品名称
private String flag; //判断订单表(order)或者产品表(pd)的标志字段

@Override
public String toString() {
// 最终输出形态
return id + "\t" + pName + "\t" + amount;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pName);
dataOutput.writeUTF(flag);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pName = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
}

(2)编写 TableMapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.aiyingke.mapreduce.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 17:38 2023/2/2
* Description: Thought is already is late, exactly is the earliest time.
*/
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();

@Override
protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
// 获取对应文件名称
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
// 获取一行
String line = value.toString();

// 判断是哪个文件,然后针对文件进行不同的操作
String[] split = line.split("\t");

if (filename.contains("order")) {
// 订单表的处理
// 封装 outK
outK.set(split[1]); //两表相同的字段 用于进入同一个reduce pid
// 封装 outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2])); // int 类型
outV.setPName(" "); // 该表中未含有这个字段,设置为空
outV.setFlag("order");
} else {
// 商品表的处理
// 封装 outK
outK.set(split[0]);
// 封装 outV
outV.setId(" ");
outV.setPid(split[0]);
outV.setAmount(0); // int 类型
outV.setPName(split[1]); // 该表中未含有这个字段,设置为空
outV.setFlag("pd");
}
// 写出 KV
context.write(outK, outV);
}
}

(3)编写 TableReducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.aiyingke.mapreduce.reduceJoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

/**
* Author: Rupert Tears
* Date: Created in 17:59 2023/2/2
* Description: Thought is already is late, exactly is the earliest time.
*/
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

@Override
protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {

ArrayList<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();

for (TableBean value : values) {
// 判断数据来自哪个表
if ("order".equals(value.getFlag())) {
// 订单表
// 创建一个临时TableBean对象接收value,不可以直接进行赋值,hadoop内部进行了优化 传递过来的对象仅有地址,因此直接赋值 只会保留最后一个对象的信息
TableBean tmpOrderBean = new TableBean();

try {
BeanUtils.copyProperties(tmpOrderBean, value);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
// 将临时创建的对象 添加进入集合中
orderBeans.add(tmpOrderBean);
} else {
try {
BeanUtils.copyProperties(pdBean,value);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
//遍历集合 orderBeans,替换掉每个 orderBean 的 pid 为 pName,然后写出
for (TableBean orderBean : orderBeans) {
// 同pid 进同一个reduce 故而pName 唯一
orderBean.setPName(pdBean.getPName());
// 写出修改后的对象
context.write(orderBean,NullWritable.get());
}
}
}

(4)编写 TableDriver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.aiyingke.mapreduce.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 18:16 2023/2/2
* Description: Thought is already is late, exactly is the earliest time.
*/
public class TableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);

job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("Y:\\Temp\\input\\"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\output5\\"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

(5)运行结果

image-20230203142810183

(6)总结

​ 缺点:这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map 节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。

​ 解决方案:Map 端实现数据合并。

三:Map Join

3.1 应用场景

Map Join 适用于一张表十分小、一张表很大的场景。

3.2 优点

在 Reduce 端处理过多的表,非常容易产生数据倾斜;

在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数 据的压力,尽可能的减少数据倾斜。

3.3 实现手段——采用 DistributedCache

(1)在 Mapper 的 setup 阶段,将文件读取到缓存集合中。

(2)在 Driver 驱动类中加载缓存。

1
2
3
4
//缓存普通文件到 Task 运行节点。
job.addCacheFile(new URI("file:///y:/Temp/input/pd.txt"));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI("hdfs://hadoop101:8020/tmp/cache/pd.txt"));

四:Map Join 案例实操

4.1 需求

4.2 需求分析

MapJoin 适用于关联表中有小表的情形;

4.3 代码实现

(1)先在 MapJoinDriver 驱动类中添加缓存文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.aiyingke.mapreduce.mapJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
* Author: Rupert Tears
* Date: Created in 18:16 2023/2/2
* Description: Thought is already is late, exactly is the earliest time.
*/
public class TableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

/*
* Author: Rupert-Tears
* CreateTime: 14:37 2023/2/3
* Description: 加载缓存数据
*/
job.addCacheFile(new URI("file:///Y:/Temp/pd.txt"));

//Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0
job.setNumReduceTasks(0);

FileInputFormat.setInputPaths(job, new Path("Y:\\Temp\\input\\"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\output2\\"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

(2)在 MapJoinMapper 类中的 setup 方法中读取缓存文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.aiyingke.mapreduce.mapJoin;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
* Author: Rupert Tears
* Date: Created in 17:38 2023/2/2
* Description: Thought is already is late, exactly is the earliest time.
*/
public class TableMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

private Map<String, String> pdMap = new HashMap<>();
private Text text = new Text();

/**
* Author: Rupert-Tears
* CreateTime: 14:43 2023/2/3
* Description: 任务开始前 先将pd数据缓存进入 pdMap
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 通过缓存文件得到小表数据 pd.txt
final URI[] cacheFiles = context.getCacheFiles();
final Path path = new Path(cacheFiles[0]);
// 获取文件系统对象 并开启流
final FileSystem fileSystem = FileSystem.get(context.getConfiguration());
final FSDataInputStream open = fileSystem.open(path);
// 通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(open, StandardCharsets.UTF_8));
// 逐行读取、按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
// 切割一行 11 小米
final String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
// 关闭流
IOUtils.closeStream(reader);
}

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 读取大表数据
// 1001 11 2
final String[] split = value.toString().split("\t");
// 通过大表的每行数据的pid 去取出pdMap 中的pName
final String pName = pdMap.get(split[1]);
// 将大表每行数据的pid替换为pName
text.set(split[0] + "\t" + pName + "\t" + split[2]);
// 写出
context.write(text, NullWritable.get());
}
}