一:需求

对本地 WordCount.txt 文件进行词频统计,其内容均按 “,”分割,将统计结果输出到 outputDir 目录下;

二:代码实现

2.1 环境引入

(1)Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>

(2)配置日志

在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。

1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2.2 编写代码

创建包 cn.aiyingke.mapreduce.wordcount

(1)Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package driver;

import mapper.WordCountMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import reducer.WordCountReducer;

import java.io.IOException;

/**
* @Author: Rupert Tears
* @Date: Created in 18:17 2022/1/18
* @Description: Thought is already is late, exactly is the earliest time.
* @Function: 在 hdfs 中的某一目录下,有一些列文件,内容均为 “,”号分割,
* 统计出按 “,”分割的各个元素出现频次,输出到指定 hdfs目录中。
* <p>
* 概述:词频统计工作
* 驱动器类:
* 1.拿到配置环境变量
* 2.拿到job作业对象
* 3.设置主类,(即:告知集群入口jar包在哪)
* 4.设置 mapper combiner reducer
* 5.设置输出 key value的类型
* 6.设置输入、输出的数据路径
* 7.提交执行
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


/*
* 1.拿到配置环境变量
*/
Configuration configuration = new Configuration();

/*
* 2.拿到job作业对象
*/
Job job = Job.getInstance(configuration, "WordCount-MR");
/*
* 3.设置主类,(即:告知集群入口jar包在哪)
*/
job.setJarByClass(WordCountDriver.class);
/*
* 4.设置 mapper combiner reducer
*/
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
/*
* 5.设置输出 key value的类型
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/*
* 6.设置输入、输出的数据路径
*/
FileInputFormat.addInputPath(job, new Path("S:\\Software\\IDEA 2021.1.2\\IDEA-workspace\\Rupert-Tears\\MapReduce\\WordCount\\data\\instanceData\\WordCount.txt"));
FileOutputFormat.setOutputPath(job, new Path("S:\\Software\\IDEA 2021.1.2\\IDEA-workspace\\Rupert-Tears\\MapReduce\\WordCount\\data\\outputDir"));
/*
* 7.提交执行
*/
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

(2)Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
* @Author: Rupert Tears
* @Date: Created in 18:22 2022/1/18
* @Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 暂存一个常用的IntWritable对象,对 java 整形的封装
*/
IntWritable intWritable = new IntWritable(1);
/**
* 暂存一个 String 的封装类
*/
Text text = new Text();

/**
* 覆写map方法
*
* @throws InterruptedException 1.对输入的文件,以行为单位进行切分,按规则切分(空格);
* 2.遍历切分完成后的集合或迭代器
* 3.组合<key,value>,通过Context进行输出
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将传入的数据转换为字符串 指定分割符号为逗号
StringTokenizer stringTokenizer = new StringTokenizer(value.toString(), ",");
// 循环获取每个逗号分割出来的元素
while (stringTokenizer.hasMoreTokens()) {
// 接收获取元素
String word = stringTokenizer.nextToken();
// 添加分割后的元素到封装类中
text.set(word);
// 上下文调度
context.write(text, intWritable);
}
}
}

(3)Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package reducer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* @Author: Rupert Tears
* @Date: Created in 18:23 2022/1/18
* @Description: Thought is already is late, exactly is the earliest time.
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 定义一个存储 int类型的求和结果的变量
*/
IntWritable sumIntWritable = new IntWritable();

/**
* 以key相同的组为单位处理,即 Reducer 方法调用一次,处理同 key的组数据
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个总词频变量
int count = 0;
// 循环计数
for (IntWritable val : values) {
count += val.get();

}
// 将频数封装
sumIntWritable.set(count);
// 上下文调度
context.write(key, sumIntWritable);

}
}

2.3 本地测试

(1)需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖

(2)在 IDEA上运行程序即可。

三:集群测试

3.1 打包插件

用 maven 打 jar 包,需要添加的打包插件依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

3.2 修改输入输出

1
2
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

3.3 上传集群并执行命令

输入输出文件应为HDFS中的文件路径;

1
[ghost@hadoop100 ~]$ hadoop jar /home/ghost/mapreduce/MapReduceDemo-1.0-SNAPSHOT.jar cn.aiyingke.mapreduce.wordcount.WordCountDriver /word.txt /output

part-r-00000:其中包含 r 代表的是 reduce 的结果;