一:数据清洗(ETL)
“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取 (Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓库,但其对象并不限于数据仓库。
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户 要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
二:案例分析
2.1 需求
去除日志中字段个数小于等于 11 的日志。
(1)输入数据:web.log
(2)期望输出数据:每行字段长度都大于 11。
2.2 需求分析
在 Map 阶段对输入的数据根据规则进行过滤清洗。
2.3 代码实现
(1)编写 WebLogMapper 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package cn.aiyingke.mapreduce.ETL;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString();
boolean res = parseLog(line, context);
if (!res) { return; }
context.write(value, NullWritable.get()); }
private boolean parseLog(String line, Context context) { final String[] s = line.split(" ");
if (s.length > 11) { return true; } else { return false; } } }
|
(2)编写 WebLogDriver 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package cn.aiyingke.mapreduce.ETL;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WebLogDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { args = new String[]{"Y:\\Temp\\input", "Y:\\Temp\\output2"};
final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf);
job.setJarByClass(WebLogMapper.class); job.setMapperClass(WebLogMapper.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
|