MapReduce 的学习

HDFS 和 MapReduce 是 Hadoop 的两个重要核心,其中 MapReduce 是 Hadoop 的分布式计算模型。MapReduce 主要分为两步 Map 步和 Reduce 步,引用网上流传很广的一个故事来解释,现在你要统计一个图书馆里面有多少本书,为了完成这个任务,你可以指派小明去统计书架 1,指派小红去统计书架 2,这个指派的过程就是 Map 步,最后,每个人统计完属于自己负责的书架后,再对每个人的结果进行累加统计,这个过程就是 Reduce 步。

本代码运行环境为 ubuntu 18.04,使用 Hadoop 2.9.2 版本
首先按照之前的方法(在 CentOS 7 中安装 Hadoop 与 HBase)搭建并启动 Hadoop

MapReduce 原理

分析 MapReduce 执行过程

MapReduce 运行的时候,会通过 Mapper 运行的任务读取 HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。

Mapper 任务的执行过程详解

每个 Mapper 任务是一个 java 进程,它会读取 HDFS 中的文件,解析成很多的键值对,经过我们覆盖的 map 方法处理后,转换为很多的键值对再输出。整个 Mapper 任务的处理过程又可以分为以下六个阶段。

  • 第一阶段是把输入文件按照一定的标准分片 (InputSplit),每个输入片的大小是固定的。默认情况下,输入片 (InputSplit) 的大小与数据块 (Block) 的大小是相同的。如果数据块 (Block) 的大小是默认值 64MB,输入文件有两个,一个是 32MB,一个是 72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个 Mapper 进程处理。这里的三个输入片,会有三个 Mapper 进程处理。

  • 第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键” 是每一行的起始位置 (单位是字节),“值” 是本行的文本内容。

  • 第三阶段是调用 Mapper 类中的 map 方法。第二阶段中解析出来的每一个键值对,调用一次 map 方法。如果有 1000 个键值对,就会调用 1000 次 map 方法。每一次调用 map 方法会输出零个或者多个键值对。

  • 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是 Reducer 任务运行的数量。默认只有一个 Reducer 任务。

  • 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对 <2,2>,<1,3>,<2,1>,键和值分别是整数。那么排序后的结果是 <1,3>,<2,1>,<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的 linux 文件中。

  • 第六阶段是对数据进行归约处理,也就是 reduce 处理。键相等的键值对会调用一次 reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的 linxu 文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。

Reducer 任务的执行过程详解

每个 Reducer 任务是一个 Java 进程。Reducer 任务接收 Mapper 任务的输出,归约处理后写入到 HDFS 中,可以分为几个阶段。

  • 第一阶段是 Reducer 任务会主动从 Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此 Reducer 会复制多个 Mapper 的输出。
  • 第二阶段是把复制到 Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
  • 第三阶段是对排序后的键值对调用 reduce 方法。键相等的键值对调用一次 reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到 HDFS 文件中。在整个 MapReduce 程序的开发过程中,我们最大的工作量是覆盖 map 函数和覆盖 reduce 函数。

导入 MapReduce 的相关依赖

使用 Java 语言进行 MapReduce 操作首先需要导入必要的依赖

如果使用的是 maven 管理项目,则需要编辑项目目录的 pom.xml 文件

打开 pom.xml,在 <dependencies>...<dependencies> 之间加入:

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.2</version>
</dependency>

如果不使用 Maven 管理,可以手动导入相关包。

Wordcount 实例

功能:统计每一个单词在整个数据集中出现的总次数

WordCount 实现的官方文档:Example: WordCount v1.0

Wordcount 整体流程

最简单的 MapReduce 应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个 MapReduce 计算任务时候,任务过程被分为两个阶段:map 阶段和 reduce 阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。main 函数将作业控制和文件输入 / 输出结合起来。

过程如图所示:

WordCount 过程图

对一个有三行文本的文件进行 MapReduce 操作。

Map 过程:并行读取文本,对读取的单词进行 map 操作,每个词都以 <key,value> 形式生成。

Reduce 过程:对 map 的结果进行排序,合并,最后得出词频。

WordCount 代码实现

新建文件 input.txt,内容:

1
2
3
Deer Bear River
Car Car River
Deer Car Bear

将其上传到 HDFS 中,在终端运行:

1
2
3
hadoop fs -mkdir /wordcount
hadoop fs -mkdir /wordcount/input
hadoop fs -put input.txt /wordcount/input

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {
/**
map 函数的输入键、输入值、输出键和输出值 Text 相当于java的string类型
IntWritable 相当于java中的Integer
*/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
//content用于输出内容的写入
//map()方法的输入是一个键和一个值。我们首先将包含一行输入的Text值转换成java中的string类型
StringTokenizer itr = new StringTokenizer(value.toString()); //这是一个分割字符串的类,java中默认的分隔符是:"空格","\t"制表符,"\n"换行符,"\r"回车符
while (itr.hasMoreTokens()) { //判断是否还有分隔符
word.set(itr.nextToken()); //下一个字符串转换为Text类型,nextToken():返回从当前位置到下一个分隔符的字符串。
context.write(word, one);
}
}
}

//reduce函数也有四个形式参数类型用于指定输入和输出类型。
//reduce函数的输入类型必须匹配map函数的输出类型:即Text类型和Intwritable在这种情况下,reduce的输出也是Text和Intwritable
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //指定要用的map类型
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class); //指定要用的reduce类型
job.setOutputKeyClass(Text.class); //控制reduce函数的输出类型
job.setOutputValueClass(IntWritable.class);
//定义输入数据的路径,可以是单个文件,也可以是一个目录(此时,将目录下所有文件当做输入)
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/wordcount/input/input.txt"));
//定义输出路径,指定reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/wordcount/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

实验结果

运行程序,成功后可以看到在 /wordcount/output/ 下多出两个文件

运行成功后多出两个文件

其中第二个文件里为运行完成后的结果

WordCount 结果

WordCount 完成!

统计访问次数

统计用户在 2019 年度每个自然日的总访问次数。原始数据文件中提供了用户名称与访问日期。这个任务就是要获取以每个自然日为单位的所有用户访问次数的累加值。如果通过 MapReduce 编程实现这个任务,首先要考虑的是,Mapper 与 Reducer 各自的处理逻辑是怎样的;然后根据处理逻辑编写出核心代码;最后在 Idea 中编写完整代码,编译打包后提交给集群运行。

分析思路和逻辑

  1. 输入 / 输出格式。

    这里社交网站用户的访问日期在格式上都属于文本格式,访问次数为整型数据格式。其组成的键值对为 <访问日期,访问次数>,因此 Mapper 的输出与 Reducer 的输出都选用 Text 类与 IntWritble 类。

  2. Mapper 要实现的计算逻辑

    Map 函数的主要任务是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。<访问日期,1 >

  3. Reducer 要实现的计算逻辑

    读取 Mapper 输出的键值对 <访问日期,1>,进行累加。

user_login.txt 访问日期内容如下

将其上传到 HDFS 中,路径为 /DailyAccessCount/input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Nehru,2019-01-01
Dane,2019-01-01
Walter,2019-01-01
Gloria,2019-01-01
Clarke,2019-01-01
Madeline,2019-01-01
Kevyn,2019-01-01
Rebecca,2019-01-01
Calista,2019-01-01
Madeline,2019-01-02
Kevyn,2019-01-02
Rebecca,2019-01-03
Calista,2019-01-04
Walter,2019-03-12
Gloria,2019-03-12
Clarke,2019-03-12

程序代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class DailyAccessCount {

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(","); //指定逗号为分隔符,组成数组
String keyOutput = array[1]; //提取数组中的访问日期作为key
context.write(new Text(keyOutput), one); //组成键值对
}
}

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0; //定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get(); //将相同键的所有值进行累加
}
result.set(sum);
context.write(key, result); //输出访问日期,总访问次数
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "Daily Access Count"); //新建Job并设置主类
job.setJarByClass(DailyAccessCount.class);
job.setMapperClass(MyMapper.class); //为作业设置map类
job.setReducerClass(MyReducer.class); //为作业设置reduce类
job.setMapOutputKeyClass(Text.class); //设置map输出key的格式类
job.setMapOutputValueClass(IntWritable.class); //设置map输出value的格式类
job.setOutputKeyClass(Text.class); //设置输出键的格式类
job.setOutputValueClass(IntWritable.class); //设置输出值的格式类
//定义输入数据的路径,可以是单个文件,也可以是一个目录(此时,将目录下所有文件当做输入)
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/DailyAccessCount/input/user_login.txt"));
//定义输出路径,指定reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/DailyAccessCount/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1); //通知集群运行这个作业,并阻塞直到作业完成
}
}

实验结果

运行程序,成功后可以看到在 /DailyAccessCount/output/ 下多出两个文件

运行成功后多出两个文件

其中第二个文件里为运行完成后的结果

DailyAccessCount 结果

统计访问次数完成!

统计每年最高气温

有如下文件 input_temperature.txt,其中 2010012325 表示在 2010 年 01 月 23 日的气温为 25 度。使用 MapReduce,计算每一年出现过的最大气温。

将其上传到 HDFS 中,路径为 /Temperature/input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023

设计代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Temperature {
/*
* 四个泛型类型分别代表:
* KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,....)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
public static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//打印样本:Before Mapper:0,2014010114
System.out.print("Before Mapper:" + key + "," + value);
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
//打印样本:After Mapper:2014,14
System.out.println(" ==> After Mapper:" + new Text(year) + "," + new IntWritable(temperature));
}
}

/*
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
public static class TempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
//取values的最大值
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(",");
}
//打印样本:Before Reduce:2001,12,10,11,29,16,
System.out.print("Before Reduce:" + key + "," + sb.toString());
context.write(key, new IntWritable(maxValue));
//打印样本:After Reduce:2001,29
System.out.println(" ==> After Reduce:" + key + "," + maxValue);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Temperature");
job.setJarByClass(Temperature.class);
job.setMapperClass(TempMapper.class); //为作业设置map类
job.setReducerClass(TempReducer.class); //为作业设置reduce类
job.setMapOutputKeyClass(Text.class); //设置map输出key的格式类
job.setMapOutputValueClass(IntWritable.class); //设置map输出value的格式类
job.setOutputKeyClass(Text.class); //设置输出键的格式类
job.setOutputValueClass(IntWritable.class); //设置输出值的格式类
//定义输入数据的路径,可以是单个文件,也可以是一个目录(此时,将目录下所有文件当做输入)
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/Temperature/input/input_temperature.txt"));
//定义输出路径,指定reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/Temperature/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1); //通知集群运行这个作业,并阻塞直到作业完成
}
}

实验结果

运行程序,成功后可以看到在 /Temperature/output/ 下多出两个文件

运行成功后多出两个文件

其中第二个文件里为运行完成后的结果

Temperature 结果

统计每年最高气温完成!

数据去重

实验原理

“数据去重” 主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在 MapReduce 流程中,map 的输出 <key,value> 经过 shuffle 过程聚集成 <key,value-list> 后交给 reduce。我们自然而然会想到将同一个数据的所有记录都交给一台 reduce 机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是 reduce 的输入应该以数据作为 key,而对 value-list 则没有要求(可以设置为空)。当 reduce 接收到一个 <key,value-list> 时就直接将输入的 key 复制到输出的 key 中,并将 value 设置成空值,然后输出 <key,value>

实验数据

现有一个某电商网站的数据文件,名为 buyer_favorite1,记录了用户收藏的商品以及收藏的日期,文件 buyer_favorite1 中包含(用户 id,商品 id,收藏日期)三个字段,数据内容以 \t 分割,由于数据很大,所以为了方便统计我们只截取它的一部分数据,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
user_id product_id date
10181 1000481 2010-04-04 16:54:31
20001 1001597 2010-04-07 15:07:52
20001 1001560 2010-04-07 15:08:27
20042 1001368 2010-04-08 08:20:30
20067 1002061 2010-04-08 16:45:33
20056 1003289 2010-04-12 10:50:55
20056 1003290 2010-04-12 11:57:35
20056 1003292 2010-04-12 12:05:29
20054 1002420 2010-04-14 15:24:12
20055 1001679 2010-04-14 19:46:04
20054 1010675 2010-04-14 15:23:53
20054 1002429 2010-04-14 17:52:45
20076 1002427 2010-04-14 19:35:39
20054 1003326 2010-04-20 12:54:44
20056 1002420 2010-04-15 11:24:49
20064 1002422 2010-04-15 11:35:54
20056 1003066 2010-04-15 11:43:01
20056 1003055 2010-04-15 11:43:06
20056 1010183 2010-04-15 11:45:24
20056 1002422 2010-04-15 11:45:49
20056 1003100 2010-04-15 11:45:54
20056 1003094 2010-04-15 11:45:57
20056 1003064 2010-04-15 11:46:04
20056 1010178 2010-04-15 16:15:20
20076 1003101 2010-04-15 16:37:27
20076 1003103 2010-04-15 16:37:05
20076 1003100 2010-04-15 16:37:18
20076 1003066 2010-04-15 16:37:31
20054 1003103 2010-04-15 16:40:14
20054 1003100 2010-04-15 16:40:16

将其上传到 HDFS 中,路径为 /Filter/input

要求用 Java 编写 MapReduce 程序,根据商品 id 进行去重,统计用户收藏商品中都有哪些商品被收藏。

实验源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class Filter {
public static class Map extends Mapper<Object, Text, Text, NullWritable> {

private static Text newKey = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
System.out.println("line is " + line);
if (line != null) {
String arr[] = line.split(" ");
System.out.println("a[1] is " + arr[1]);
newKey.set(arr[1]);
context.write(newKey, NullWritable.get());
System.out.println("the new key is " + newKey);
}
}
}

public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException,InterruptedException {
context.write(key, NullWritable.get());
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Filter");
job.setJarByClass(Filter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job ,new Path("hdfs://localhost:8020/Filter/input/buyer_favorite.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/Filter/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

实验结果

运行程序,成功后可以看到在 /Filter/output/ 下多出两个文件

运行成功后多出两个文件

其中第二个文件里为运行完成后的结果

数据去重结果

数据去重完成!

日志分析:分析非结构化文件

根据 tomcat 日志计算 url 访问情况

要求:区别统计 GET 和 POST URL 访问量

结果为:访问方式、URL、访问量

测试数据集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
196.168.2.1 - - [03/Jul/2014:23:36:38 +0800] "GET /course/detail/3.htm HTTP/1.0" 200 38435 0.038
182.131.89.195 - - [03/Jul/2014:23:37:43 +0800] "GET /html/notes/20140617/888.html HTTP/1.0" 301 - 0.000
196.168.2.1 - - [03/Jul/2014:23:38:27 +0800] "POST /service/notes/addViewTimes_23.htm HTTP/1.0" 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:39:03 +0800] "GET /html/notes/20140617/779.html HTTP/1.0" 200 69539 0.046
196.168.2.1 - - [03/Jul/2014:23:43:00 +0800] "GET /html/notes/20140318/24.html HTTP/1.0" 200 67171 0.049
196.168.2.1 - - [03/Jul/2014:23:43:59 +0800] "POST /service/notes/addViewTimes_779.htm HTTP/1.0" 200 1 0.003
196.168.2.1 - - [03/Jul/2014:23:45:51 +0800] "GET /html/notes/20140617/888.html HTTP/1.0" 200 70044 0.060
196.168.2.1 - - [03/Jul/2014:23:46:17 +0800] "GET /course/list/73.htm HTTP/1.0" 200 12125 0.010
196.168.2.1 - - [03/Jul/2014:23:46:58 +0800] "GET /html/notes/20140609/542.html HTTP/1.0" 200 94971 0.077
196.168.2.1 - - [03/Jul/2014:23:48:31 +0800] "POST /service/notes/addViewTimes_24.htm HTTP/1.0" 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:48:34 +0800] "POST /service/notes/addViewTimes_542.htm HTTP/1.0" 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:49:31 +0800] "GET /notes/index-top-3.htm HTTP/1.0" 200 53494 0.041
196.168.2.1 - - [03/Jul/2014:23:50:55 +0800] "GET /html/notes/20140609/544.html HTTP/1.0" 200 183694 0.076
196.168.2.1 - - [03/Jul/2014:23:53:32 +0800] "POST /service/notes/addViewTimes_544.htm HTTP/1.0" 200 2 0.004
196.168.2.1 - - [03/Jul/2014:23:54:53 +0800] "GET /service/notes/addViewTimes_900.htm HTTP/1.0" 200 151770 0.054
196.168.2.1 - - [03/Jul/2014:23:57:42 +0800] "GET /html/notes/20140620/872.html HTTP/1.0" 200 52373 0.034
196.168.2.1 - - [03/Jul/2014:23:58:17 +0800] "POST /service/notes/addViewTimes_900.htm HTTP/1.0" 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:58:51 +0800] "GET /html/notes/20140617/888.html HTTP/1.0" 200 70044 0.057
186.76.76.76 - - [03/Jul/2014:23:48:34 +0800] "POST /service/notes/addViewTimes_542.htm HTTP/1.0" 200 2 0.003
186.76.76.76 - - [03/Jul/2014:23:46:17 +0800] "GET /course/list/73.htm HTTP/1.0" 200 12125 0.010
8.8.8.8 - - [03/Jul/2014:23:46:58 +0800] "GET /html/notes/20140609/542.html HTTP/1.0" 200 94971 0.077

将其上传到 HDFS 中,路径为 /LogAnalyze/input

由于 Tomcat 日志是不规则的,需要先过滤清洗数据。这里使用 MapReduce 进行操作。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogAnalyze {

public static class LogMapper extends Mapper<LongWritable, Text, Text , IntWritable> {

private IntWritable val = new IntWritable(1);

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
String tmp = handlerLog(line);
if (tmp.length() > 0) {
context.write(new Text(tmp), val);
}
}
//196.168.2.1 - - [03/Jul/2014:23:36:38 +0800] "GET /course/detail/3.htm HTTP/1.0" 200 38435 0.038
private String handlerLog(String line) {
String result = "";
try {
if (line.length() > 20) {
if (line.indexOf("GET") > 0) {
result = line.substring(line.indexOf("GET"), line.indexOf("HTTP/1.0")).trim();
} else if (line.indexOf("POST") > 0) {
result = line.substring(line.indexOf("POST"), line.indexOf("HTTP/1.0")).trim();
}
}
} catch (Exception e) {
System.out.println(line);
}
return result;
}
}

public static class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Log Analyze");
job.setJarByClass(LogAnalyze.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/LogAnalyze/input/data.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/LogAnalyze/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

分析结果

分析结果

Log 分析完成!

作者

SJ Zhou

发布于

2019-04-22

更新于

2021-01-06

许可协议

评论