一:概述 1.1 优缺点
压缩的优点:以减少磁盘 IO、减少磁盘存储空间。
压缩的缺点:增加 CPU 开销。
1.2 压缩原则
运算密集型的 Job,少用压缩
IO 密集型的 Job,多用压缩
二:MR 支持的压缩编码 2.1 压缩算法对比介绍
压缩格式
Hadoop 是否自带
算法
文件扩展名
是否可切片
换成压缩格式后,原来的程序是否需要修改
DEFLATE
是,直接使用
DEFLATE
.deflate
否
和文本处理一样,不需要修改
Gzip
是,直接使用
DEFLATE
.gz
否
和文本处理一样,不需要修改
bzip2
是,直接使用
bzip2
.bz2
是
和文本处理一样,不需要修改
LZO
否,需要安装
LZO
.lzo
是
需要建索引,还需要指定 输入格式
Snappy
是,直接使用
Snappy
.snappy
否
和文本处理一样,不需要修改
2.2 压缩性能的比较
压缩算法
原始文件大小
压缩文件大小
压缩速度
解压速度
gzip
8.3GB
1.8GB
17.5MB/s
58MB/s
bzip2
8.3GB
1.1GB
2.4MB/s
9.5MB/s
LZO
8.3GB
2.9GB
49.3MB/s
74.6MB/s
http://google.github.io/snappy/
Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
三:压缩方式选择 压缩方式选择时重点考虑:
压缩/解压缩速度
压缩率(压缩后存储大小)
压缩后是否 可以支持切片。
3.1 Gzip 压缩
优点:压缩率比较高;
缺点:不支持 Split;压缩/解压速度一般;
3.2 Bzip2 压缩
优点:压缩率高;支持 Split;
缺点:压缩/解压速度慢。
3.3 Lzo 压缩
优点:压缩/解压速度比较快;支持 Split;
缺点:压缩率一般;想支持切片需要额外创建索引。
3.4 Snappy 压缩
优点:压缩和解压缩速度快;
缺点:不支持 Split;压缩率一般;
四:压缩位置选择 压缩可以在 MapReduce 作用的任意阶段启用。
五:压缩参数配置 5.1 为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器
压缩格式
对应的编码/解码器
DEFLATE
org.apache.hadoop.io.compress.DefaultCodec
gzip
org.apache.hadoop.io.compress.GzipCodec
bzip2
org.apache.hadoop.io.compress.BZip2Codec
LZO
com.hadoop.compression.lzo.LzopCodec
Snappy
org.apache.hadoop.io.compress.SnappyCodec
5.2 要在 Hadoop 中启用压缩,可以配置如下参数
参数
默认值
阶段
建议
io.compression.codecs (在 core-site.xml 中配置)
无,这个需要在命令行输入 hadoop checknative 查看
输入压缩
Hadoop 使用文件扩展 名判断是否支持某种编解码器
mapreduce.map.output.compr ess(在 mapred-site.xml 中 配置)
false
mapper 输出
这个参数设为 true 启 用压缩
mapreduce.map.output.compr ess.codec(在 mapredsite.xml 中配置)
org.apache.hadoop.io.com press.DefaultCodec
mapper 输出
企业多使用 LZO 或 Snappy 编解码器在此 阶段压缩数据
mapreduce.output.fileoutpu tformat.compress(在 mapred-site.xml 中配置)
false
reducer 输出
这个参数设为 true 启 用压缩
mapreduce.output.fileoutpu tformat.compress.codec(在 mapred-site.xml 中配置)
org.apache.hadoop.io.com press.DefaultCodec
reducer 输出
使用标准工具或者编 解码器,如 gzip 和 bzip2
六:压缩实操案例 6.1 Map 输出端采用压缩 即使你的 MapReduce 的输入输出文件都是未压缩的文件,你仍然可以对 Map 任务的中 间结果输出做压缩,因为它要写在硬盘并且通过网络传输到 Reduce 节点,对其压缩可以提高很多性能。
Hadoop 源码支持的压缩格式有:BZip2Codec、DefaultCodec
(1)驱动器 Driver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package cn.aiyingke.mapreduce.compress.mapCompress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration configuration = new Configuration (); configuration.setBoolean("mapreduce.map.output.compress" , true ); configuration.setClass("mapreduce.map.output.compress.codec" , BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration, "WordCount-MR" ); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304 ); FileInputFormat.addInputPath(job, new Path ("Y:\\Temp\\input" )); FileOutputFormat.setOutputPath(job, new Path ("Y:\\Temp\\output888" )); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
(2)Mapper 保持不变 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package cn.aiyingke.mapreduce.compress.mapCompress;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> { IntWritable intWritable = new IntWritable (1 ); Text text = new Text (); @Override protected void map (Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer (value.toString(), "," ); while (stringTokenizer.hasMoreTokens()) { String word = stringTokenizer.nextToken(); text.set(word); context.write(text, intWritable); } } }
(3)Reducer 保持不变 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.aiyingke.mapreduce.compress.mapCompress;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> { IntWritable sumIntWritable = new IntWritable (); @Override protected void reduce (Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0 ; for (IntWritable val : values) { count += val.get(); } sumIntWritable.set(count); context.write(key, sumIntWritable); } }
6.2 Reduce 输出端采用压缩 (1)驱动器 Driver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package cn.aiyingke.mapreduce.compress.reduceCompress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.DefaultCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration configuration = new Configuration (); configuration.setBoolean("mapreduce.map.output.compress" , true ); configuration.setClass("mapreduce.map.output.compress.codec" , BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration, "WordCount-MR" ); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304 ); FileOutputFormat.setCompressOutput(job, true ); FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); FileInputFormat.addInputPath(job, new Path ("Y:\\Temp\\input" )); FileOutputFormat.setOutputPath(job, new Path ("Y:\\Temp\\output666" )); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
(2)Mapper 保持不变 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package cn.aiyingke.mapreduce.compress.reduceCompress;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> { IntWritable intWritable = new IntWritable (1 ); Text text = new Text (); @Override protected void map (Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer (value.toString(), "," ); while (stringTokenizer.hasMoreTokens()) { String word = stringTokenizer.nextToken(); text.set(word); context.write(text, intWritable); } } }
(3)Reducer 保持不变 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.aiyingke.mapreduce.compress.reduceCompress;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> { IntWritable sumIntWritable = new IntWritable (); @Override protected void reduce (Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0 ; for (IntWritable val : values) { count += val.get(); } sumIntWritable.set(count); context.write(key, sumIntWritable); } }