Spark —— 淘宝双 11 数据分析

问题定义

实验内容

使用 Spark 对数据进行处理,并分析双十一的用户交易等信息,用 Scala 语言进行程序编写,最后将处理的数据结果使用 Echarts 进行可视化。

实验环境以及使用的相关应用

  • Spark 2.4.3
  • Scala 2.12
  • Tomcat 9.0.20
  • 可视化工具:ECharts
  • Java 包:fastjson
  • 系统环境:macOS Mojave 10.14.5

数据集

本案列主要分析淘宝双十一的数据,数据集是淘宝 2015 年双十一前 6 个月(包含双十一)的交易数据,数据集 user_log.csv,是记录了用户的行为的日志文件。

日志 user_log.csv 的字段定义如下:

序号 字段 定义
0 user_id 买家 id
1 item_id 商品 id
2 cat_id 商品类别 id
3 merchant_id 卖家 id
4 brand_id 品牌 id
5 month 交易时间:月
6 day 交易事件:日
7 action 行为,取值范围 {0,1,2,3},0 表示点击,1 表示加入购物车,2 表示购买,3 表示关注商品
8 age_range 买家年龄分段:1 表示年龄 <18;2 表示年龄在 [18,24];3 表示年龄在 [25,29];4 表示年龄在 [30,34];5 表示年龄在 [35,39];6 表示年龄在 [40,49];7 和 8 表示年龄 >=50;0 和 NULL 则表示未知
9 gender 性别:0 表示女性,1 表示男性,2 和 NULL 表示未知
10 province 收货地址省份

前五条记录样例:

1
2
3
4
5
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古

SparkRDD 实验过程及其结果

实验一:查看日志前 10 个交易日志的商品品牌

分析:使用 take() 函数,取去掉首行后的前 10 行数据,按条件为 brand_id 的项进行筛选,并逐个输出

  1. 使用 take() 函数取出去掉首行后的前 10 行数据
  2. 进行 map 操作,将数据处理成 ($商品品牌) 样式
  3. 输出 map 后的结果

核心代码如下:

1
2
3
4
5
6
7
8
9
10
def transformationOps(sc: SparkContext): Unit = {
// 查看日志前10个交易日志的商品品牌
val raw_data = sc.textFile("./data/user_log.csv")
val header = raw_data.first()
val lines = raw_data.filter(row => row != header)

lines.take(10).flatMap(line => line.split(" "))
.map(line => (line.split(",")(4)))
.foreach(println)
}

结果如图:

实验二:查询前 20 个交易日志中购买商品时的时间和商品的种类

分析:使用 take() 函数,取去掉首行后的前 20 行数据,按条件为 month, day, cat_id 的项进行筛选,并逐个输出。(这里将 monthday 合并为一个字符串)

  1. 使用 take() 函数取出去掉首行后的前 20 行数据
  2. 进行 map 操作,将数据处理成 ($日期, $商品种类) 样式
  3. 输出 map 后的结果

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
def transformationOps(sc: SparkContext): Unit = {
// 查询前20个交易日志中购买商品时的时间和商品的种类
val raw_data = sc.textFile("./data/user_log.csv")
val header = raw_data.first()
val lines = raw_data.filter(row => row != header)
val data = lines.take(20)
.flatMap(line => line.split(" "))
.map(line => (line.split(",")(5) +line.split(",")(6), line.split(",")(2)))

data.foreach(println)
}

结果如图:

实验三:查询双十一那天有多少人购买了商品

分析:

  1. 首先进行 map 操作,将数据处理成 ($日期, $用户ID, $用户行为) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日所有用户行为为购买的条目,具体为日期 month == 11, day == 11,行为 action == 2
  3. 考虑到单个用户可能购买多个商品,所以需要去重,使用 distinct() 函数去重
  4. 最后使用 count() 函数计数并输出

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
def transformationOps(sc: SparkContext): Unit = {
// 查询双十一那天有多少人购买了商品
val raw_data = sc.textFile("./data/user_log.csv")
val Users = raw_data.flatMap(line => line.split(" "))
.map(line => (line.split(",")(5) + line.split(",")(6), line.split(",")(0), line.split(",")(7)))
val output = Users.filter(line => line._3.contains("2"))
.filter(line => line._1.contains("1111"))
.distinct()

println("双十一有 " + output.count() + " 人购买了商品。")
}

结果如图:

实验四:取给定时间和给定品牌,求当天购买的此品牌商品的数量

分析:

  1. 首先进行 map 操作,将数据处理成 ($品牌ID, $日期, $用户行为) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日品牌 2661 中用户行为为购买的条目,具体为日期 month == 11, day == 11,品牌 IDbrand_id == 2661,行为 action == 2
  3. 使用 count() 函数计数并输出

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
def transformationOps(sc: SparkContext): Unit = {
// 取给定时间和给定品牌,求当天购买的此品牌商品的数量
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split(" "))
.map(line => (line.split(",")(4), line.split(",")(5) + line.split(",")(6), line.split(",")(7)))
val output = data.filter(line => line._1.contains("2661"))
.filter(line => line._2.contains("1111"))
.filter(line => line._3.contains("2"))

println("11月11日,品牌2661的商品售出的数量为:" + output.count())
}

结果如图:

实验五:查询有多少用户在双十一点击了该店

分析:

  1. 首先进行 map 操作,将数据处理成 ($用户ID, $卖家ID, $日期, $用户行为) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日店铺 2882 中用户行为为点击的用户,具体为日期 month == 11, day == 11,卖家 IDmerchant_id == 2882,行为 action == 0
  3. 考虑到每个用户可能多次点击同一店铺,所以需要去重,使用 distinct() 函数。

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def transformationOps(sc: SparkContext): Unit = {
// 查询有多少用户在双十一点击了该店
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split(" "))
.map(line => (line.split(",")(0), // 用户ID
line.split(",")(3), // 卖家ID
line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7))) // 行为
val output = data.filter(line => line._3.contains("1111"))
.filter(line => line._2.contains("2882"))
.filter(line => line._4.contains("0")).distinct()

println("一共有" + output.count() + "个用户在双十一点击了店铺2882")
}

结果如图:

实验六:查询双十一那天女性购买商品的数量

分析:统计同时满足日期 month == 11, day == 11,行为 action == 2,性别 gender == 0 的个数。

  1. 首先进行 map 操作,将数据处理成 (($用户ID, $日期, $用户行为), 1) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日用户行为为购买的用户,具体为日期 month == 11, day == 11,行为 action == 2
  3. 然后使用 reduceByKey() 统计所有用户的购买量,然后使用 filter() 筛选出大于 5 次的用户,并逐个输出

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def transformationOps(sc: SparkContext): Unit = {
// 查询双十一那天女性购买商品的数量
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split(" "))
.map(line => (line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7), // 行为
line.split(",")(9))) // 性别
val res = data.filter(line => line._1.contains("1111"))
.filter(line => line._2.contains("2"))
.filter(line => line._3.contains("0"))

println("双十一那天女性购买商品的数量为:" + res.count())
}

结果如图:

实验七:查询双十一那天男性购买商品的数量

分析:

  1. 首先进行 map 操作,将数据处理成 ($日期, $用户行为, $性别) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日用户行为为购买的男性用户,具体为日期 month == 11, day == 11,行为 action == 2,性别 gender == 1
  3. 然后使用 count() 函数统计所有过滤后的数据量,并输出

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def transformationOps(sc: SparkContext): Unit = {
// 查询双十一那天男性购买商品的数量
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split(" "))
.map(line => (line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7), // 行为
line.split(",")(9))) // 性别
val res = data.filter(line => line._1.contains("1111"))
.filter(line => line._2.contains("2"))
.filter(line => line._3.contains("1"))

println("双十一那天男性购买商品的数量为:" + res.count())
}

结果如图:

实验八:查询某一天在该网站购买商品超过 5 次的用户 ID

分析:

  1. 首先进行 map 操作,将数据处理成 (($用户ID, $日期, $用户行为), 1) 样式
  2. 然后使用 filter() 函数过滤出 11 月 11 日用户行为为购买的用户,具体为日期 month == 11, day == 11,行为 action == 2
  3. 然后使用 reduceByKey() 统计所有用户的购买量,然后使用 filter() 筛选出大于 5 次的用户,并逐个输出

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def transformationOps(sc: SparkContext): Unit = {
// 查询某一天在该网站购买商品超过5次的用户ID
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split(" "))
.map(line => ((line.split(",")(0), //用户ID
line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7)), 1)) // 行为
val lines = data.filter(line => line._1._2.contains("1111")).filter(line => line._1._3.contains("2"))
val Users = lines.reduceByKey((v1, v2) => v1 + v2).filter(t => t._2 > 5)

Users.foreach(userID => println("用户 " + userID._1._1 + " 在 " + userID._1._2 + " 购买了 " + userID._2 + " 件商品"))
println("11月11日购买超过5次的用户一共有:" + Users.count() + "个")
}

结果如图:

使用 ECharts 进行可视化

ECharts,一个使用 JavaScript 实现的开源可视化库,可以流畅的运行在 PC 和移动设备上,兼容当前绝大部分浏览器(IE8/9/10/11,Chrome,Firefox,Safari 等),底层依赖轻量级的矢量图形库 ZRender,提供直观,交互丰富,可高度个性化定制的数据可视化图表。

首先使用 SparkRDD 处理我们需要的数据,使用阿里巴巴的 Java 库 fastjson,将 SparkRDD 输出的数据转换成 JSON 格式的文档。然后使用 Tomcat 搭建 JSP 服务,用 jQuery*.json 文件内的数据导入到 ECharts,最后呈现数据。

可视化实验一:双十一所有买家消费行为比例

数据处理

分析:

  1. 首先进行 map 操作,将数据处理成 ($日期, $用户行为) 样式
  2. 然后使用 filter() 函数确定时间为 11 月 11 日,具体为日期 month == 11, day == 11
  3. 使用 map() 函数将数据处理为 ($用户行为, 1)
  4. 使用 reduceByKey() 计算出每个用户行为对应的条目数
  5. 将结果转换成 JSON 格式文件,并输出处理结果

数据处理核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def transformationOps(sc: SparkContext): Unit = {
// 双十一所有买家消费行为比例
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split("\n"))
.map(line => ((line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7)), 1)) // 行为
.filter(line => line._1._1.equals("1111"))
.map(t => (t._1._2, t._2))
val Output = data.reduceByKey((v1, v2) => v1 + v2).collect()

toJSON_Pie(Output, sc) // 输出成JSON格式

Output.foreach(println)
}

转换成 JSON 文件

编写 toJSON_Pie() 函数,将 SparkRDD 处理的结果转换成 JSON 格式的文件,用于生成饼状图

  1. 使用阿里巴巴 Java 库 fastjson,新建一个 JSONObject 对象
  2. 新建一个数组准备存放绘制 EChart 饼状图要求的 {"name":"","value":""}
  3. 使用循环,按行读取,放入 SparkRDD 处理的结果
  4. 导出,保存

函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def toJSON_Pie(data: Array[(String, Int)], sc: SparkContext): Unit = {
val JSONObject = new JSONObject()
val m = data.length
val list: util.List[Any] = new util.ArrayList[Any]()
for (i <- 0 until m) {
val pair = new util.HashMap[String, Any]()
pair.put("name", data(i)._1.toString)
pair.put("value", data(i)._2.toString)
list.add(pair)
}
JSONObject.put("data", list)
val output = sc.parallelize(List(JSONObject))
output.repartition(1).saveAsTextFile("../Visualization/web/data/v_1")
}

导出后的文件为 Spark 处理的文件,名称为 part-00000,需要手动将其加上 json 后缀为 part-00000.json

由于原始数据中代表用户行为的数据为 0~3,数字对可视化呈现不直观,所以手动更改了 json 文件的对应内容

part-00000.json 文件内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"data": [
{
"name": "点击",
"value": "9188104"
},
{
"name": "加入购物车",
"value": "14725"
},
{
"name": "购买",
"value": "1223354"
},
{
"name": "关注",
"value": "156450"
}
]
}

可视化呈现

使用 Tomcat 搭建本地 Java Web Server,编写 jsp 网页,将 *.json 文件导入,使用 ECharts 呈现

根据 ECharts 官方文档:异步数据加载和更新,使用 jQuery 工具将 json 文件导入

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
var myChart = echarts.init(document.getElementById('main'), 'shine');

myChart.setOption({
title: {
text: '双十一所有买家消费行为比例',
x: 'center'
},
tooltip: {
trigger: 'item',
formatter: "{a} <br/> {b} : {c} ({d}%)"
},
legend: {
orient: 'vertical',
right: 50,
top: 150,
// bottom: 20,
data: ['点击', '加入购物车', '购买', '关注']
},
label: {
formatter: '{b}: {c}'
},
series: [
{
name: '行为',
type: 'pie',
radius: '60%',
center: ['40%', '50%'],
data: [],
selectedMode: 'single',
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
});

$.get('./data/v_1/part-00000.json').done(function (data) {
myChart.setOption({
series: [{
data: data.data
}]
});
});

可视化结果如图所示:

可视化实验二:双十一当天销量前十的商品类别

数据处理

分析:

  1. 首先进行 map 操作,将数据处理成 ($日期, $商品类别, $用户行为) 样式
  2. 使用 filter() 函数确定时间为 11 月 11 日,用户行为为购买的数据,具体为日期 month == 11, day == 11,用户行为 action == 2
  3. 使用 map() 函数将数据处理为 ($商品类别, 1)
  4. 使用 reduceByKey() 计算出每个商品类别对应的条目数
  5. 使用 sortBy() 按条目数量降序排序
  6. 使用 take() 取前 10 个数据,然后将结果转换成 JSON 格式文件,并输出处理结果

数据处理核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def transformationOps(sc: SparkContext): Unit = {
// 双十一当天销量前十的商品类别
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split("\n"))
.map(line => ((line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(2), // 商品类别
line.split(",")(7)), 1))
.filter(_._1._1.equals("1111"))
.filter(_._1._3.equals("2"))
.map(t => (t._1._2, t._2))
val Output = data.reduceByKey((v1, v2) => v1 + v2)
.sortBy(_._2, ascending = false, numPartitions = 1).collect()

toJSON_Bar(Output.take(10), sc)

Output.take(10).foreach(println)
}

转换成 JSON 文件

编写 toJSON_Bar() 函数,将 SparkRDD 处理的结果转换成 JSON 格式的文件,用于生成柱状图

  1. 使用阿里巴巴 Java 库 fastjson,新建一个 JSONObject 对象
  2. 新建两个数组准备存放绘制 EChart 柱状图要求的数据
  3. 使用循环,按行读取,放入 SparkRDD 处理的结果
  4. 导出,保存

函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def toJSON_Bar(data: Array[(String, Int)], sc: SparkContext): Unit = {
val JSONObject = new JSONObject()
val m = data.length
val key: Array[Any] = new Array[Any](m)
val value: Array[Any] = new Array[Any](m)
for (i <- 0 until m) {
key(i) = data(i)._1.toString
value(i) = data(i)._2.toString
}
JSONObject.put("key", key)
JSONObject.put("value", value)
val output = sc.parallelize(List(JSONObject))
output.repartition(1).saveAsTextFile("../Visualization/web/data/v_2")
}

导出后的文件为 Spark 处理的文件,名称为 part-00000,需要手动将其加上 json 后缀为 part-00000.json

part-00000.json 文件内容为:

1
2
3
4
{
"value":["48687","41951","29531","28883","28530","28097","27372","26264","25851","23519"],
"key":["656","1208","602","662","737","1142","389","177","1213","1438"]
}

其中,key 表示商品种类,value 表示对应商品种类的销量

可视化呈现

使用 Tomcat 搭建本地 Java Web Server,编写 jsp 网页,将 *.json 文件导入,使用 ECharts 呈现

根据 ECharts 官方文档:异步数据加载和更新,使用 jQuery 工具将 json 文件导入

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'), 'shine');

myChart.setOption({
title: {
text: '双十一当天销量前10的商品类别',
// x: 'center'
},
legend: {
data: ["销量"],
top: '3'
},
tooltip: {
trigger: 'axis',
axisPointer: { // 坐标轴指示器,坐标轴触发有效
type: 'shadow' // 默认为直线,可选为:'line' | 'shadow'
}
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: [
{
type: 'category',
data: [],
axisTick: {
alignWithLabel: true
}
}
],
yAxis: [
{
type: 'value'
}
],
series: [
{
name: '销量',
type: 'bar',
barWidth: '60%',
data: [],
label: {
normal: {
show: true,
position: 'top'
}
}
}
]
});

$.get('./data/v_2/part-00000.json').done(function (data) {
myChart.setOption({
xAxis: [{
data: data.key
}],
series: [{
data: data.value
}]
});
});

可视化结果如图所示:

可视化实验三:双十一男女买家各个年龄段交易对比

数据处理

分析:

  1. 首先进行 map 操作,将数据处理成 (($日期, $用户行为, $年龄段, $性别), 1) 样式
  2. 然后使用 filter() 函数确定时间为 11 月 11 日,用户行为为购买,具体为日期 month == 11, day == 11,用户行为 action == 2
  3. 使用 map() 函数将数据处理为 (($年龄段, $性别), 1)
  4. 使用 reduceByKey() 计算出每个年龄段每个性别对应的条目数
  5. 为了方便后续生成对应的 JSON 格式数据,使用两个 sortBy() 函数将数据按年龄段从小到大,性别 “女,男,未知” 顺序排序
  6. 将结果转换成 JSON 格式文件,并输出处理结果

数据处理核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def transformationOps(sc: SparkContext): Unit = {
// 双十一男女买家各个年龄段交易对比
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split("\n"))
.map(line => ((line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7), // 行为
line.split(",")(8), // 年龄段
line.split(",")(9)), 1)) // 性别
.filter(line => line._1._1.equals("1111"))
.filter(line => line._1._2.equals("2"))
.map(t => ((t._1._3, t._1._4), t._2))
val Output = data.reduceByKey((v1, v2) => v1 + v2)
.sortBy(_._1._2).sortBy(_._1._1).collect()

toJSON_MultiBar(Output, sc)

Output.foreach(println)
}

转换成 JSON 文件

编写 toJSON_MultiBar() 函数,将 SparkRDD 处理的结果转换成 JSON 格式的文件,用于生成柱状图

  1. 使用阿里巴巴 Java 库 fastjson,新建一个 JSONObject 对象
  2. 新建四个数组分别存放:年龄段;女性、男性、其他在各个年龄段的交易量
  3. 使用循环,按行读取,按照不同性别筛选,将 SparkRDD 处理的结果放入对应的数组
  4. 导出,保存

函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def toJSON_MultiBar(data: Array[((String, String), Int)], sc: SparkContext): Unit = {
val JSONObject = new JSONObject()
val m = data.length
val age: Array[Any] = new Array[Any](8)
val woman: Array[Any] = new Array[Any](8)
val man: Array[Any] = new Array[Any](8)
val other: Array[Any] = new Array[Any](8)

for (j <- 0 to 7) {
age(j) = j.toString // 7个对应的年龄段
}
JSONObject.put("age", age)

var x1 = 0
var x2 = 0
var x3 = 0

for (i <- 0 until m) {
if (data(i)._1._2 == "0") {
woman(x1) = data(i)._2.toString // 将性别女对应的条目写入 woman 数组
x1 = x1 + 1
}
else if (data(i)._1._2 == "1") {
man(x2) = data(i)._2.toString // 将性别男对应的条目写入 man 数组
x2 += 1
}
else {
other(x3) = data(i)._2.toString // 将性别未知对应的条目写入 other 数组
x3 += 1
}
}

JSONObject.put("woman", woman)
JSONObject.put("man", man)
JSONObject.put("other", other)

val output = sc.parallelize(List(JSONObject))
output.repartition(1).saveAsTextFile("./Visualization/web/data/v_3")
}

导出后的文件为 Spark 处理的文件,名称为 part-00000,需要手动将其加上 json 后缀为 part-00000.json

part-00000.json 文件内容为:

1
2
3
4
5
6
{
"other":["51126","50628","50989","51169","50916","50706","51183","51029"],
"woman":["50590","51337","50854","50978","51259","51244","51517","50872"],
"man":["50855","50732","51195","51076","50798","50555","50855","50891"],
"age":["0","1","2","3","4","5","6","7"]
}

可视化呈现

使用 Tomcat 搭建本地 Java Web Server,编写 jsp 网页,将 *.json 文件导入,使用 ECharts 呈现

根据 ECharts 官方文档:异步数据加载和更新,使用 jQuery 工具将 json 文件导入

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'), 'shine');

$.get('./data/v_3/part-00000.json').done(function (data) {
myChart.setOption({
title: {
text: '双十一男女买家各个年龄段交易对比',
x: 'center'
},
legend: {
data: ["女", "男", "未知"],
right: '20',
top: '30'
},
tooltip: {
trigger: 'axis',
axisPointer: { // 坐标轴指示器,坐标轴触发有效
type: 'shadow' // 默认为直线,可选为:'line' | 'shadow'
}
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: [
{
type: 'category',
data: data.age,
axisTick: {
alignWithLabel: true
}
}
],
yAxis: [
{
type: 'value'
}
],
series: [
{
type: 'bar',
name: '女',
data: data.woman
},
{
type: 'bar',
name: '男',
data: data.man
},
{
type: 'bar',
name: '未知',
data: data.other
}
]
});
});

可视化结果如图所示:

可视化实验四:双十一男女买家交易对比

数据处理

分析:

  1. 首先进行 map 操作,将数据处理成 (($日期, $用户行为, $性别), 1) 样式
  2. 然后使用 filter() 函数确定时间为 11 月 11 日,用户行为为购买,具体为日期 month == 11, day == 11,用户行为 action == 2
  3. 使用 map() 函数将数据处理为 ($性别, 1)
  4. 使用 reduceByKey() 计算出每个性别对应的条目数
  5. 将结果转换成 JSON 格式文件,并输出处理结果

数据处理核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def transformationOps(sc: SparkContext): Unit = {
// 双十一男女买家交易对比
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split("\n"))
.map(line => ((line.split(",")(5) + line.split(",")(6), // 日期
line.split(",")(7), // 行为
line.split(",")(9)), 1)) // 性别
.filter(line => line._1._1.equals("1111"))
.filter(line => line._1._2.equals("2"))
.map(t => (t._1._3, t._2))
val Output = data.reduceByKey((v1, v2) => v1 + v2).collect()

toJSON_Pie(Output, sc)

Output.foreach(println)
}

转换成 JSON 文件

编写 toJSON_Pie() 函数,将 SparkRDD 处理的结果转换成 JSON 格式的文件,用于生成饼状图

  1. 使用阿里巴巴 Java 库 fastjson,新建一个 JSONObject 对象
  2. 新建一个数组准备存放绘制 EChart 饼状图要求的 {"name":"","value":""}
  3. 使用循环,按行读取,放入 SparkRDD 处理的结果
  4. 导出,保存

函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def toJSON_Pie(data: Array[(String, Int)], sc: SparkContext): Unit = {
val JSONObject = new JSONObject()
val m = data.length
val list: util.List[Any] = new util.ArrayList[Any]()
for (i <- 0 until m) {
val pair = new util.HashMap[String, Any]()
pair.put("name", data(i)._1.toString)
pair.put("value", data(i)._2.toString)
list.add(pair)
}
JSONObject.put("data", list)
val output = sc.parallelize(List(JSONObject))
output.repartition(1).saveAsTextFile("./Visualization/web/data/v_4")
}

导出后的文件为 Spark 处理的文件,名称为 part-00000,需要手动将其加上 json 后缀为 part-00000.json

由于原始数据中代表用户行为的数据为 0~3,数字对可视化呈现不直观,所以手动更改了 json 文件的对应内容

part-00000.json 文件内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"data": [
{
"name": "女",
"value": "408651"
},
{
"name": "男",
"value": "406957"
},
{
"name": "未知",
"value": "407746"
}
]
}

可视化呈现

使用 Tomcat 搭建本地 Java Web Server,编写 jsp 网页,将 *.json 文件导入,使用 ECharts 呈现

根据 ECharts 官方文档:异步数据加载和更新,使用 jQuery 工具将 json 文件导入

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'), 'shine');

// $.getJSON('./data/v_1/part-00000.json').done(function (data) {
myChart.setOption({
title: {
text: '双十一男女买家交易对比',
x: 'center'
},
tooltip: {
trigger: 'item',
formatter: "{a} <br/> {b}: {c} ({d}%)"
},
label: {
formatter: "{b}: {c}"
},
legend: {
orient: 'vertical',
right: 50,
top: 150,
// bottom: 20,
data: ['女', '男', '未知']
},
series: [
{
name: '性别',
type: 'pie',
radius: '60%',
center: ['40%', '50%'],
data: [],
selectedMode: 'single',
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
});

$.get('./data/v_4/part-00000.json').done(function (data) {
myChart.setOption({
series: [{
data: data.data
}]
});
});

可视化结果如图所示:

可视化实验五:各个省份的总成交量对比

数据处理

分析:

  1. 首先进行 map 操作,将数据处理成 (($省份, $用户行为), 1) 样式
  2. 然后使用 filter() 函数确定用户行为为购买,具体为用户行为 action == 2
  3. 使用 map() 函数将数据处理为 ($省份, 1)
  4. 使用 reduceByKey() 计算出每个省份对应的条目数
  5. 将结果转换成 JSON 格式文件,并输出处理结果

数据处理核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def transformationOps(sc: SparkContext): Unit = {
// 各个省份的总成交量对比
val raw_data = sc.textFile("./data/user_log.csv")
val data = raw_data.flatMap(line => line.split("\n"))
.map(line => ((line.split(",")(10), line.split(",")(7)), 1))
.filter(line => line._1._2.contains("2"))
.map(t => (t._1._1, t._2))
val Output = data.reduceByKey((v1, v2) => v1 + v2).collect()

toJSON_Map(Output, sc)

Output.foreach(println)
}

转换成 JSON 文件

编写 toJSON_Pie() 函数,将 SparkRDD 处理的结果转换成 JSON 格式的文件,用于生成饼状图

  1. 使用阿里巴巴 Java 库 fastjson,新建一个 JSONObject 对象
  2. 新建一个数组准备存放绘制 EChart 地图要求的 {"name":"","value":""}
  3. 使用循环,按行读取,放入 SparkRDD 处理的结果
  4. 导出,保存

函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def toJSON_Map(data: Array[(String, Int)], sc: SparkContext): Unit = {
val JSONObject = new JSONObject()
val m = data.length
val list: util.List[Any] = new util.ArrayList[Any]()
for (i <- 0 until m) {
val pair = new util.HashMap[String, Any]()
pair.put("name", data(i)._1.toString)
pair.put("value", data(i)._2.toString)
list.add(pair)
}
JSONObject.put("data", list)
val output = sc.parallelize(List(JSONObject))
output.repartition(1).saveAsTextFile("./Visualization/web/data/v_5")
}

导出后的文件为 Spark 处理的文件,名称为 part-00000,需要手动将其加上 json 后缀为 part-00000.json

part-00000.json 文件内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"data":[
{"name":"天津","value":"97134"},
{"name":"吉林","value":"96992"},
{"name":"四川","value":"96743"},
{"name":"内蒙古","value":"96619"},
{"name":"上海市","value":"96893"},
{"name":"浙江","value":"97162"},
{"name":"青海","value":"96991"},
{"name":"河北","value":"96674"},
{"name":"云南","value":"96377"},
{"name":"香港","value":"96587"},
{"name":"宁夏","value":"96494"},
{"name":"甘肃","value":"96822"},
{"name":"安徽","value":"96488"},
{"name":"江西","value":"96679"},
{"name":"江苏","value":"96802"},
{"name":"湖南","value":"97125"},
{"name":"西藏","value":"97116"},
{"name":"福建","value":"96625"},
{"name":"北京","value":"96911"},
{"name":"陕西","value":"96517"},
{"name":"山西","value":"96861"},
{"name":"广西","value":"96675"},
{"name":"澳门","value":"96710"},
{"name":"湖北","value":"96943"},
{"name":"山东","value":"97123"},
{"name":"广东","value":"97338"},
{"name":"台湾","value":"96543"},
{"name":"辽宁","value":"96714"},
{"name":"新疆","value":"96960"},
{"name":"黑龙江","value":"97169"},
{"name":"重庆","value":"96813"},
{"name":"海南","value":"96722"},
{"name":"河南","value":"96980"},
{"name":"贵州","value":"96842"}
]
}

可视化呈现

使用 Tomcat 搭建本地 Java Web Server,编写 jsp 网页,将 *.json 文件导入,使用 ECharts 呈现

根据 ECharts 官方文档:异步数据加载和更新,使用 jQuery 工具将 json 文件导入

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'), 'shine');
myChart.showLoading();

$.getJSON('./data/china.json', function (geoJson) {

myChart.hideLoading();

echarts.registerMap('CN', geoJson);

myChart.setOption({
title: {
text: '各个省份的总成交量对比',
x: 'center'
},
tooltip: {
trigger: 'item',
formatter: "{b} <br/> {c} (件)"
},
toolbox: {
show: true,
orient: 'vertical',
left: 'right',
top: 'center',
feature: {
dataView: {readOnly: false},
restore: {},
saveAsImage: {}
}
},
visualMap: {
min: 96000,
max: 98000,
text: ['高', '低'],
realtime: false,
calculable: true,
inRange: {
color: ['lightskyblue', 'yellow', 'orangered']
}
},
series: [
{
name: '各个省份的总成交量对比',
type: 'map',
map: 'CN', // 自定义扩展图表类型
itemStyle: {
normal: {label: {show: true}},
emphasis: {label: {show: true}}
},
data: []
}
]
})
});


$.get('./data/v_5/part-00000.json').done(function (data) {
myChart.setOption({
series: [{
data: data.data
}]
});
});

可视化结果如图所示:

实验过程中发现的问题

在本次实验过程中发现了不少问题,有些通过在网络搜索解决了,但有些没有找到很好的解决方法,在此提出。

  • 在编写可视化部分的代码时,使用 SparkRDD 生成的 JSON 格式的文件没有后缀,这时 jQuery 无法读取到文件的内容,必须手动更改 part-00000 文件名为 part-00000.json
  • 没有找到合适的方法修改 SparkRDD 里的数据,当绘制可视化实验一:双十一所有买家消费行为比例可视化实验四:双十一男女买家交易对比时,由于原始数据中代表用户行为的数据为 0 ~ 3,代表用户性别的数据为 0 ~ 1,数字对可视化呈现不直观,所以手动需要更改 json 文件的对应内容。同样在绘制可视化实验五:各个省份的总成交量对比时,数据里有些省份的名字和 china.js 中的省份名称不一样,比如北京和北京市,这样可视化的时候数据会不显示,也需要手动更改 JSON 文件内容。

Spark —— 淘宝双 11 数据分析

https://morooi.com/2019/spark-taobao/

作者

SJ Zhou

发布于

2019-06-07

更新于

2021-01-06

许可协议

评论