一:概述

1.1 优缺点

  • 压缩的优点:以减少磁盘 IO、减少磁盘存储空间。
  • 压缩的缺点:增加 CPU 开销。

1.2 压缩原则

  • 运算密集型的 Job,少用压缩
  • IO 密集型的 Job,多用压缩

二:MR 支持的压缩编码

2.1 压缩算法对比介绍

压缩格式 Hadoop 是否自带 算法 文件扩展名 是否可切片 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定 输入格式
Snappy 是,直接使用 Snappy .snappy 和文本处理一样,不需要修改

2.2 压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

http://google.github.io/snappy/

Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

三:压缩方式选择

压缩方式选择时重点考虑:

  • 压缩/解压缩速度
  • 压缩率(压缩后存储大小)
  • 压缩后是否 可以支持切片。

3.1 Gzip 压缩

  • 优点:压缩率比较高;
  • 缺点:不支持 Split;压缩/解压速度一般;

3.2 Bzip2 压缩

  • 优点:压缩率高;支持 Split;
  • 缺点:压缩/解压速度慢。

3.3 Lzo 压缩

  • 优点:压缩/解压速度比较快;支持 Split;
  • 缺点:压缩率一般;想支持切片需要额外创建索引。

3.4 Snappy 压缩

  • 优点:压缩和解压缩速度快;
  • 缺点:不支持 Split;压缩率一般;

四:压缩位置选择

压缩可以在 MapReduce 作用的任意阶段启用。

五:压缩参数配置

5.1 为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

5.2 要在 Hadoop 中启用压缩,可以配置如下参数

参数 默认值 阶段 建议
io.compression.codecs (在 core-site.xml 中配置) 无,这个需要在命令行输入 hadoop checknative 查看 输入压缩 Hadoop 使用文件扩展 名判断是否支持某种编解码器
mapreduce.map.output.compr ess(在 mapred-site.xml 中 配置) false mapper 输出 这个参数设为 true 启 用压缩
mapreduce.map.output.compr ess.codec(在 mapredsite.xml 中配置) org.apache.hadoop.io.com press.DefaultCodec mapper 输出 企业多使用 LZO 或 Snappy 编解码器在此 阶段压缩数据
mapreduce.output.fileoutpu tformat.compress(在 mapred-site.xml 中配置) false reducer 输出 这个参数设为 true 启 用压缩
mapreduce.output.fileoutpu tformat.compress.codec(在 mapred-site.xml 中配置) org.apache.hadoop.io.com press.DefaultCodec reducer 输出 使用标准工具或者编 解码器,如 gzip 和 bzip2

六:压缩实操案例

6.1 Map 输出端采用压缩

即使你的 MapReduce 的输入输出文件都是未压缩的文件,你仍然可以对 Map 任务的中 间结果输出做压缩,因为它要写在硬盘并且通过网络传输到 Reduce 节点,对其压缩可以提高很多性能。

Hadoop 源码支持的压缩格式有:BZip2Codec、DefaultCodec

(1)驱动器 Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package cn.aiyingke.mapreduce.compress.mapCompress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


/*
* 1.拿到配置环境变量
*/
Configuration configuration = new Configuration();


/*
* Author: Rupert-Tears
* CreateTime: 16:28 2023/2/10
* Description: 开启map端输出压缩
*/
configuration.setBoolean("mapreduce.map.output.compress", true);

// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);


/*
* 2.拿到job作业对象
*/
Job job = Job.getInstance(configuration, "WordCount-MR");
/*
* 3.设置主类,(即:告知集群入口jar包在哪)
*/
job.setJarByClass(WordCountDriver.class);
/*
* 4.设置 mapper combiner reducer
*/
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
/*
* 5.设置输出 key value的类型
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

/*
* 6.设置输入、输出的数据路径
*/
FileInputFormat.addInputPath(job, new Path("Y:\\Temp\\input"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\output888"));
/*
* 7.提交执行
*/
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

(2)Mapper 保持不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.aiyingke.mapreduce.compress.mapCompress;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;
/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 暂存一个常用的IntWritable对象,对 java 整形的封装
*/
IntWritable intWritable = new IntWritable(1);
/**
* 暂存一个 String 的封装类
*/
Text text = new Text();

/**
* 覆写map方法
*
* @throws InterruptedException 1.对输入的文件,以行为单位进行切分,按规则切分(空格);
* 2.遍历切分完成后的集合或迭代器
* 3.组合<key,value>,通过Context进行输出
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将传入的数据转换为字符串 指定分割符号为逗号
StringTokenizer stringTokenizer = new StringTokenizer(value.toString(), ",");
// 循环获取每个逗号分割出来的元素
while (stringTokenizer.hasMoreTokens()) {
// 接收获取元素
String word = stringTokenizer.nextToken();
// 添加分割后的元素到封装类中
text.set(word);
// 上下文调度
context.write(text, intWritable);
}
}
}

(3)Reducer 保持不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.aiyingke.mapreduce.compress.mapCompress;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:41 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 定义一个存储 int类型的求和结果的变量
*/
IntWritable sumIntWritable = new IntWritable();

/**
* 以key相同的组为单位处理,即 Reducer 方法调用一次,处理同 key的组数据
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个总词频变量
int count = 0;
// 循环计数
for (IntWritable val : values) {
count += val.get();

}
// 将频数封装
sumIntWritable.set(count);
// 上下文调度
context.write(key, sumIntWritable);

}
}

6.2 Reduce 输出端采用压缩

(1)驱动器 Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package cn.aiyingke.mapreduce.compress.reduceCompress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


/*
* 1.拿到配置环境变量
*/
Configuration configuration = new Configuration();


/*
* Author: Rupert-Tears
* CreateTime: 16:28 2023/2/10
* Description: 开启map端输出压缩
*/
configuration.setBoolean("mapreduce.map.output.compress", true);

// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);


/*
* 2.拿到job作业对象
*/
Job job = Job.getInstance(configuration, "WordCount-MR");
/*
* 3.设置主类,(即:告知集群入口jar包在哪)
*/
job.setJarByClass(WordCountDriver.class);
/*
* 4.设置 mapper combiner reducer
*/
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
/*
* 5.设置输出 key value的类型
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

/*
* Author: Rupert-Tears
* CreateTime: 16:33 2023/2/10
* Description: 设置reduce端输出压缩开启
*/
FileOutputFormat.setCompressOutput(job, true);

// 设置压缩的方式
// FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
/*
* 6.设置输入、输出的数据路径
*/
FileInputFormat.addInputPath(job, new Path("Y:\\Temp\\input"));
FileOutputFormat.setOutputPath(job, new Path("Y:\\Temp\\output666"));

/*
* 7.提交执行
*/
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

(2)Mapper 保持不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.aiyingke.mapreduce.compress.reduceCompress;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;
/**
* Author: Rupert Tears
* Date: Created in 23:40 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 暂存一个常用的IntWritable对象,对 java 整形的封装
*/
IntWritable intWritable = new IntWritable(1);
/**
* 暂存一个 String 的封装类
*/
Text text = new Text();

/**
* 覆写map方法
*
* @throws InterruptedException 1.对输入的文件,以行为单位进行切分,按规则切分(空格);
* 2.遍历切分完成后的集合或迭代器
* 3.组合<key,value>,通过Context进行输出
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将传入的数据转换为字符串 指定分割符号为逗号
StringTokenizer stringTokenizer = new StringTokenizer(value.toString(), ",");
// 循环获取每个逗号分割出来的元素
while (stringTokenizer.hasMoreTokens()) {
// 接收获取元素
String word = stringTokenizer.nextToken();
// 添加分割后的元素到封装类中
text.set(word);
// 上下文调度
context.write(text, intWritable);
}
}
}

(3)Reducer 保持不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.aiyingke.mapreduce.compress.reduceCompress;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Author: Rupert Tears
* Date: Created in 23:41 2022/12/16
* Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 定义一个存储 int类型的求和结果的变量
*/
IntWritable sumIntWritable = new IntWritable();

/**
* 以key相同的组为单位处理,即 Reducer 方法调用一次,处理同 key的组数据
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个总词频变量
int count = 0;
// 循环计数
for (IntWritable val : values) {
count += val.get();

}
// 将频数封装
sumIntWritable.set(count);
// 上下文调度
context.write(key, sumIntWritable);

}
}