Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

12.1 通过RDD实现分析电影的用户行为信息

在本节中,我们首先搭建IDEA的开发环境。电影点评系统基于IDEA开发环境进行开发,本节对大数据电影点评系统中电影数据格式和来源进行了说明,然后通过RDD方式实现分析电影的用户行为信息的功能。

12.1.1 搭建IDEA开发环境

1.IntelliJ IDEA环境的安装

如图12-1所示,登录IDEA的官网,打开http://www.jetbrains.com/idea/网站,单击DOWNLOAD进行IDEA的下载。IDEA全称IntelliJ IDEA,是Java语言开发的集成环境,具备智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查等方面的功能,支持Maven、Gradle和STS,集成Git、SVN、Mercurial等,在Spark开发程序时通常使用IDEA。单击DOWNLOAD下载,下载安装包以后根据IDEA安装提示一步步完成安装。

图12-1 IDEA的官网

IDEA在本地计算机上安装完成以后,打开IDEA的默认显示主题风格是Darcula的主题格式,也是众多IDEA开发者喜欢的格式。但为了便于读者阅读,这里将IDEA的显示主题风格调整为IntelliJ格式,单击File→Settings→Appearance→Theme→IntelliJ,这样书本纸质显示更清晰,如图12-2所示。

图12-2 修改IDEA显示主题格式

IDEA安装完成,在Windows系统中完成Windows JDK的安装与配置。安装和配置完成以后,测试验证JDK能否在设备上运行。选择“开始”→“运行”命令,在运行窗口中输入CMD命令,进入DOS环境,在命令行提示符中直接输入java-version,按回车键,系统会显示JDK的版本,说明JDK已经安装成功,如下所示。

1.  C:\Windows\System32>java -version
2.  java version "1.8.0_121"
3.  Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
4.  Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
2.新建Maven工程(SparkApps工程),导入Spark 2.0相关JAR包及源码

(1)在IDEA菜单栏中新建工程,单击File→Project,如图12-3所示。

图12-3 新建工程

(2)在弹出的New Project对话框中选择Maven方式,单击Next按钮,如图12-4所示。

图12-4 选择Maven方式

(3)在弹出的对话框中输入GroupId及ArtifactId,如图12-5所示。

图12-5 输入GroupId及ArtifactId

(4)在弹出的对话框中输入工程名及工程保存位置,单击Finish按钮完成,如图12-6所示。

图12-6 输入工程名及工程保存位置

(5)在SparkApps工程中设置Maven配置参数。单击File→Settings→Build,Execution, Deployment→Maven→User settings file及Local repository,输入用户配置文件及本地库保存地址,如图12-7所示。

图12-7 输入用户配置文件及本地库保存地址

其中,setting.xml代码完整的配置内容如例12-1所示。

【例12-1】setting.xmll文件内容。

1.  <?xml version="1.0" encoding="utf-8"?>
2.  <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
3.            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4.            xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
               http://maven.apache.org/xsd/settings-1.0.0.xsd">
5.  <localRepository>F:\SparkMaven2017\repository64bit</localRepository>
6.  </settings>

(6)在SparkApps工程中单击pom.xml,编辑pom.xml文件,如图12-8所示。

图12-8 编辑pom文件

pom代表“项目对象模型”。这是一个文件名为pom.xml的Maven项目的XML表示形式。在Maven系统中,一个项目除了代码文件外,还包含配置文件,包括开发者需要遵循的规则、缺陷管理系统、组织和许可证、项目的url、项目的依赖,以及其他所有的项目相关因素。在Maven系统中,项目不需要包含代码,只是一个pom.xml。pom.xml包括了所有的项目信息。

本书的案例基于Maven方式进行开发,项目中依赖的JAR包都从pom.xml中下载获取,这里提供了一份完整的pom.xml文件,读者可以根据pom.xml文件搭建开发环境,测试运行本书稿的各综合案例。

pom.xml代码完整的配置内容如例12-2所示。

【例12-2】pom.xml文件内容。

1.  <?xml version="1.0" encoding="utf-8"?>
2.  <project xmlns="http://maven.apache.org/POM/4.0.0"
3.           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.
            apache.org/xsd/maven-4.0.0.xsd">
5.      <modelVersion>4.0.0</modelVersion>
6.  <!-- 基础配置 -->
7.      <groupId>2017SparkCase100</groupId>
8.      <artifactId>2017SparkCase100</artifactId>
9.      <version>1.0-SNAPSHOT</version>
10.
11.     <properties>
12.         <scala.version>2.11.8</scala.version>
13.         <spark.version>2.1.0</spark.version>
14.         <jedis.version>2.8.2</jedis.version>
15.         <fastjson.version>1.2.14</fastjson.version>
16.         <jetty.version>9.2.5.v20141112</jetty.version>
17.         <container.version>2.17</container.version>
18.         <java.version>1.8</java.version>
19.     </properties>
20.
21.     <repositories>
22.         <repository>
23.             <id>scala-tools.org</id>
24.             <name>Scala-Tools Maven2 Repository</name>
25.             <url>http://scala-tools.org/repo-releases</url>
26.         </repository>
27.     </repositories>
28.
29.     <pluginRepositories>
30.         <pluginRepository>
31.             <id>scala-tools.org</id>
32.             <name>Scala-Tools Maven2 Repository</name>
33.             <url>http://scala-tools.org/repo-releases</url>
34.         </pluginRepository>
35.     </pluginRepositories>
36. <!-- 依赖关系-->
37.     <dependencies>
38.         <!-- put javax.ws.rs as the first dependency, it is important!!! -->
39.         <dependency>
40.             <groupId>javax.ws.rs</groupId>
41.             <artifactId>javax.ws.rs-api</artifactId>
42.             <version>2.0</version>
43.         </dependency>
44.
45.         <dependency>
46.             <groupId>org.scala-lang</groupId>
47.             <artifactId>scala-library</artifactId>
48.             <version>${scala.version}</version>
49.         </dependency>
50.         <dependency>
51.             <groupId>org.scala-lang</groupId>
52.             <artifactId>scala-compiler</artifactId>
53.             <version>${scala.version}</version>
54.         </dependency>
55.         <dependency>
56.             <groupId>org.scala-lang</groupId>
57.             <artifactId>scala-reflect</artifactId>
58.             <version>${scala.version}</version>
59.         </dependency>
60.
61.         <dependency>
62.             <groupId>org.scala-lang</groupId>
63.             <artifactId>scalap</artifactId>
64.             <version>${scala.version}</version>
65.   </dependency>
66.
67.   <dependency>
68.       <groupId>junit</groupId>
69.       <artifactId>junit</artifactId>
70.       <version>4.4</version>
71.       <scope>test</scope>
72.   </dependency>
73.   <dependency>
74.       <groupId>org.specs</groupId>
75.       <artifactId>specs</artifactId>
76.       <version>1.2.5</version>
77.       <scope>test</scope>
78.   </dependency>
79.   <dependency>
80.       <groupId>org.apache.spark</groupId>
81.       <artifactId>spark-core_2.11</artifactId>
82.       <version>${spark.version}</version>
83.   </dependency>
84.   <dependency>
85.       <groupId>org.apache.spark</groupId>
86.       <artifactId>spark-launcher_2.11</artifactId>
87.       <version>2.1.0</version>
88.   </dependency>
89.   <dependency>
90.       <groupId>org.apache.spark</groupId>
91.       <artifactId>spark-network-shuffle_2.11</artifactId>
92.       <version>2.1.0</version>
93.   </dependency>
94.   <dependency>
95.       <groupId>org.apache.spark</groupId>
96.       <artifactId>spark-sql_2.11</artifactId>
97.       <version>${spark.version}</version>
98.   </dependency>
99.   <dependency>
100.           <groupId>org.apache.spark</groupId>
101.           <artifactId>spark-hive_2.11</artifactId>
102.           <version>2.1.0</version>
103.       </dependency>
104.       <dependency>
105.           <groupId>org.apache.spark</groupId>
106.           <artifactId>spark-catalyst_2.11</artifactId>
107.           <version>2.1.0</version>
108.       </dependency>
109.       <dependency>
110.           <groupId>org.apache.spark</groupId>
111.           <artifactId>spark-streaming-flume-assembly_2.11</artifactId>
112.           <version>2.1.0</version>
113.       </dependency>
114.       <dependency>
115.           <groupId>org.apache.spark</groupId>
116.           <artifactId>spark-streaming-flume_2.11</artifactId>
117.           <version>2.1.0</version>
118.       </dependency>
119.       <dependency>
120.           <groupId>org.apache.spark</groupId>
121.           <artifactId>spark-streaming_2.11</artifactId>
122.           <version>${spark.version}</version>
123.       </dependency>
124.       <dependency>
125.      <groupId>org.apache.spark</groupId>
126.      <artifactId>spark-graphx_2.11</artifactId>
127.      <version>2.1.0</version>
128.  </dependency>
129.  <dependency>
130.      <groupId>org.scalanlp</groupId>
131.      <artifactId>breeze_2.11</artifactId>
132.      <version>0.11.2</version>
133.      <scope>compile</scope>
134.      <exclusions>
135.          <exclusion>
136.              <artifactId>junit</artifactId>
137.              <groupId>junit</groupId>
138.          </exclusion>
139.          <exclusion>
140.              <artifactId>commons-math3</artifactId>
141.              <groupId>org.apache.commons</groupId>
142.          </exclusion>
143.      </exclusions>
144.  </dependency>
145.  <dependency>
146.      <groupId>org.apache.commons</groupId>
147.      <artifactId>commons-math3</artifactId>
148.      <version>3.4.1</version>
149.      <scope>compile</scope>
150.  </dependency>
151.  <dependency>
152.      <groupId>org.apache.spark</groupId>
153.      <artifactId>spark-mllib_2.11</artifactId>
154.      <version>2.1.0</version>
155.  </dependency>
156.  <dependency>
157.      <groupId>org.apache.spark</groupId>
158.      <artifactId>spark-mllib-local_2.11</artifactId>
159.      <version>2.1.0</version>
160.      <scope>compile</scope>
161.  </dependency>
162.  <dependency>
163.      <groupId>org.apache.spark</groupId>
164.      <artifactId>spark-mllib-local_2.11</artifactId>
165.      <version>2.1.0</version>
166.      <type>test-jar</type>
167.      <scope>test</scope>
168.  </dependency>
169.  <dependency>
170.      <groupId>org.apache.spark</groupId>
171.      <artifactId>spark-repl_2.11</artifactId>
172.      <version>2.1.0</version>
173.  </dependency>
174.  <dependency>
175.      <groupId>org.apache.hadoop</groupId>
176.      <artifactId>hadoop-client</artifactId>
177.      <version>2.6.0</version>
178.  </dependency>
179.  <dependency>
180.      <groupId>org.apache.spark</groupId>
181.      <artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
182.      <version>2.1.0</version>
183.  </dependency>
184.  <dependency>
185.      <groupId>org.apache.spark</groupId>
186.      <artifactId>spark-streaming-flume_2.11</artifactId>
187.      <version>${spark.version}</version>
188.  </dependency>
189.  <dependency>
190.      <groupId>mysql</groupId>
191.      <artifactId>mysql-connector-java</artifactId>
192.      <version>5.1.6</version>
193.  </dependency>
194.  <dependency>
195.      <groupId>org.apache.hive</groupId>
196.      <artifactId>hive-jdbc</artifactId>
197.      <version>1.2.1</version>
198.  </dependency>
199.  <dependency>
200.      <groupId>org.apache.httpcomponents</groupId>
201.      <artifactId>httpclient</artifactId>
202.      <version>4.4.1</version>
203.  </dependency>
204.  <dependency>
205.      <groupId>org.apache.httpcomponents</groupId>
206.      <artifactId>httpcore</artifactId>
207.      <version>4.4.1</version>
208.  </dependency>
209.
210.  <!-- https://mvnrepository.com/artifact/org.apache.hadoop/
      hadoop- common -->
211.  <dependency>
212.      <groupId>org.apache.hadoop</groupId>
213.      <artifactId>hadoop-common</artifactId>
214.      <version>2.6.0</version>
215.  </dependency>
216.
217.  <dependency>
218.      <groupId>org.apache.hadoop</groupId>
219.      <artifactId>hadoop-client</artifactId>
220.      <version>2.6.0</version>
221.  </dependency>
222.
223.  <!--https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-->
224.  <dependency>
225.      <groupId>org.apache.hadoop</groupId>
226.      <artifactId>hadoop-hdfs</artifactId>
227.      <version>2.6.0</version>
228.  </dependency>
229.
230.
231.  <dependency>
232.      <groupId>redis.clients</groupId>
233.      <artifactId>jedis</artifactId>
234.      <version>${jedis.version}</version>
235.  </dependency>
236.  <dependency>
237.      <groupId>org.json</groupId>
238.      <artifactId>json</artifactId>
239.      <version>20090211</version>
240.  </dependency>
241.  <dependency>
242.      <groupId>com.fasterxml.jackson.core</groupId>
243.      <artifactId>jackson-core</artifactId>
244.      <version>2.6.3</version>
245.  </dependency>
246.  <dependency>
247.      <groupId>com.fasterxml.jackson.core</groupId>
248.      <artifactId>jackson-databind</artifactId>
249.      <version>2.6.3</version>
250.  </dependency>
251.  <dependency>
252.      <groupId>com.fasterxml.jackson.core</groupId>
253.      <artifactId>jackson-annotations</artifactId>
254.      <version>2.6.3</version>
255.  </dependency>
256.  <dependency>
257.      <groupId>com.alibaba</groupId>
258.      <artifactId>fastjson</artifactId>
259.      <version>1.1.41</version>
260.  </dependency>
261.  <dependency>
262.      <groupId>fastutil</groupId>
263.      <artifactId>fastutil</artifactId>
264.      <version>5.0.9</version>
265.  </dependency>
266.  <dependency>
267.      <groupId>org.eclipse.jetty</groupId>
268.      <artifactId>jetty-server</artifactId>
269.      <version>${jetty.version}</version>
270.  </dependency>
271.
272.  <dependency>
273.      <groupId>org.eclipse.jetty</groupId>
274.      <artifactId>jetty-servlet</artifactId>
275.      <version>${jetty.version}</version>
276.  </dependency>
277.
278.  <dependency>
279.      <groupId>org.eclipse.jetty</groupId>
280.      <artifactId>jetty-util</artifactId>
281.      <version>${jetty.version}</version>
282.  </dependency>
283.
284.  <dependency>
285.      <groupId>org.glassfish.jersey.core</groupId>
286.      <artifactId>jersey-server</artifactId>
287.      <version>${container.version}</version>
288.  </dependency>
289.  <dependency>
290.      <groupId>org.glassfish.jersey.containers</groupId>
291.      <artifactId>jersey-container-servlet-core</artifactId>
292.      <version>${container.version}</version>
293.          </dependency>
294.          <dependency>
295.              <groupId>org.glassfish.jersey.containers</groupId>
296.              <artifactId>jersey-container-jetty-http</artifactId>
297.              <version>${container.version}</version>
298.          </dependency>
299.          <dependency>
300.              <groupId>org.apache.hadoop</groupId>
301.              <artifactId>hadoop-mapreduce-client-core</artifactId>
302.              <version>2.6.0</version>
303.          </dependency>
304.
305.          <dependency>
306.              <groupId>org.antlr</groupId>
307.              <artifactId>antlr4-runtime</artifactId>
308.              <version>4.5.3</version>
309.          </dependency>
310.
311.      </dependencies>
312.  <!-- 编译配置 -->
313.      <build>
314.          <plugins>
315.              <plugin>
316.                  <artifactId>maven-assembly-plugin</artifactId>
317.                  <configuration>
318.                      <classifier>dist</classifier>
319.                      <appendAssemblyId>true</appendAssemblyId>
320.                      <descriptorRefs>
321.                          <descriptor>jar-with-dependencies</descriptor>
322.                      </descriptorRefs>
323.                  </configuration>
324.                  <executions>
325.                      <execution>
326.                          <id>make-assembly</id>
327.                          <phase>package</phase>
328.                          <goals>
329.                              <goal>single</goal>
330.                          </goals>
331.                      </execution>
332.                  </executions>
333.              </plugin>
334.
335.              <plugin>
336.                  <artifactId>maven-compiler-plugin</artifactId>
337.                  <configuration>
338.                      <source>1.7</source>
339.                      <target>1.7</target>
340.                  </configuration>
341.              </plugin>
342.
343.              <plugin>
344.                  <groupId>net.alchim31.maven</groupId>
345.                  <artifactId>scala-maven-plugin</artifactId>
346.                  <version>3.2.2</version>
347.                  <executions>
348.                      <execution>
349.                          <id>scala-compile-first</id>
350.                          <phase>process-resources</phase>
351.                          <goals>
352.                              <goal>compile</goal>
353.                          </goals>
354.                      </execution>
355.                  </executions>
356.                  <configuration>
357.                      <scalaVersion>${scala.version}</scalaVersion>
358.                      <recompileMode>incremental</recompileMode>
359.                      <useZincServer>true</useZincServer>
360.                      <args>
361.                          <arg>-unchecked</arg>
362.                          <arg>-deprecation</arg>
363.                          <arg>-feature</arg>
364.                      </args>
365.                      <jvmArgs>
366.                          <jvmArg>-Xms1024m</jvmArg>
367.                          <jvmArg>-Xmx1024m</jvmArg>
368.                      </jvmArgs>
369.                      <javacArgs>
370.                          <javacArg>-source</javacArg>
371.                          <javacArg>${java.version}</javacArg>
372.                          <javacArg>-target</javacArg>
373.                          <javacArg>${java.version}</javacArg>
374.                          <javacArg>-Xlint:all,-serial,-path</javacArg>
375.                      </javacArgs>
376.                  </configuration>
377.              </plugin>
378.
379.              <plugin>
380.                  <groupId>org.antlr</groupId>
381.                  <artifactId>antlr4-maven-plugin</artifactId>
382.                  <version>4.3</version>
383.                  <executions>
384.                      <execution>
385.                          <id>antlr</id>
386.                          <goals>
387.                              <goal>antlr4</goal>
388.                          </goals>
389.                          <phase>none</phase>
390.                      </execution>
391.                  </executions>
392.                  <configuration>
393.                      <outputDirectory>src/test/java</outputDirectory>
394.                      <listener>true</listener>
395.                      <treatWarningsAsErrors>true</treatWarningsAsErrors>
396.                  </configuration>
397.              </plugin>
398.          </plugins>
399.      </build>
400.  </project>

(7)在pom.xml中,按Ctrl+S快捷键保存pom.xml文件,IDEA会自动从网上下载各类Jar包,下载的时间根据网络带宽的情况可能需要几十分钟,也可能需要几个小时,全部下载好以后,可以看到工程中External Libraries已经加载了Spark相关的Jar包及源码,如图12-9所示。

图12-9 使用Maven方式加载Jar包

3.在SparkApps工程中建立scala目录

单击SparkApps工程下的src/main目录,右击main,从弹出的快捷菜单中选择New→Directory命令,新建一个目录scala,如图12-10所示。

图12-10 新建目录

单击SparkApps工程下的src/main/scala目录,右击scala,从弹出的快捷菜单中选择Mark Directory as→Resource Root命令,标识目录scala为源码目录。至此,IDEA本地开发环境搭建完成,如图12-11所示。

图12-11 设置为代码目录

12.1.2 大数据电影点评系统中电影数据说明

1.大数据电影点评系统电影数据的来源

电影推荐系统(MovieLens)是美国明尼苏达大学(Minnesota)计算机科学与工程学院的GroupLens项目组创办的,是一个非商业性质的、以研究为目的的实验性站点。电影推荐系统主要使用协同过滤和关联规则相结合的技术,向用户推荐他们感兴趣的电影。这个项目是由John Riedl教授和Joseph Konstan教授领导的。该项目从1992年开始研究自动化协同过滤,在1996年使用自动化协同过滤系统应用于USENET新闻组中。自那以后,项目组扩大了研究范围,基于内容方法以及改进当前的协作过滤技术来研究所有的信息过滤解决方案。

电影推荐系统(MovieLens)的数据下载地址为: https://grouplens.org/datasets/movielens/。GroupLens项目研究收集了从电影推荐系统MovieLens站点提供评级的数据集(http://MovieLens.org),收集了不同时间段的数据,我们可以根据电影分析业务需求下载不同规模大小的数据源文件。

2.大数据电影点评系统电影数据的格式说明

这里下载的是中等规模的电影推荐系统数据集。在本地目录moviedata/medium包含的电影点评系统数据源中提供了在2000年6040个用户观看约3900部电影发表的1 000 209条匿名评级数据信息。

评级文件ratings.dat的格式描述如下。

1.  UserID::MovieID::Rating::Timestamp
2.  用户ID、电影ID、评分数据、时间戳
3.  - 用户ID范围在1~6040之间
4.  - 电影ID范围在1~3952之间
5.  - 评级:使用五星评分方式
6.  - 时间戳表示系统记录的时间
7.  - 每个用户至少有20个评级

评级文件ratings.dat中摘取部分记录如下。

1.  1::1193::5::978300760
2.  1::661::3::978302109
3.  1::914::3::978301968
4.  1::3408::4::978300275
5.  1::2355::5::978824291
6.  1::1197::3::978302268
7.  1::1287::5::978302039
8.  1::2804::5::978300719
9.  1::594::4::978302268
10. 1::919::4::978301368

用户文件users.dat的格式描述如下。

1.  UserID::Gender::Age::Occupation::Zip-code
2.  用户ID、性别、年龄、职业、邮编代码
3.  -所有的用户资料由用户自愿提供, GroupLens项目组不会去检查用户数据的准确性。这个数
    据集中包含用户提供的用户数据
4.  -性别:“M”是男性、“F”是女性
5.  -年龄由以下范围选择:
6.  * 1:  "少于 18岁"
7.  * 18:  "18年龄段:从18岁到24岁"
8.  * 25:  "25年龄段:从25岁到34岁"
9.  * 35:  "35岁年龄段:从35岁到44岁"
10. * 45:  "45岁年龄段:从45岁到49岁"
11. * 50:  "50岁年龄段:从50岁到55岁"
12. * 56:  "56岁年龄段:大于56岁"

从用户文件users.dat中摘取部分记录如下。

1.  1::F::1::10::48067
2.  2::M::56::16::70072
3.  3::M::25::15::55117
4.  4::M::45::7::02460
5.  5::M::25::20::55455
6.  6::F::50::9::55117
7.  7::M::35::1::06810
8.  8::M::25::12::11413
9.  9::M::25::17::61614
10. 10::F::35::1::95370

电影文件movies.dat的格式描述如下。

1.  MovieID::Title::Genres
2.  电影ID、电影名、电影类型
3.  -标题是由亚马逊公司的互联网电影资料库(IMDB)提供的,包括电影发布年份
4.  -电影类型包括以下类型:
5.  * Action:行动
6.  * Adventure:冒险
7.  * Animation:动画
8.  * Children's:儿童
9.  * Comedy:喜剧
10. * Crime:犯罪
11. * Documentary:纪录片
12. * Drama:戏剧
13. * Fantasy:幻想
14. * Film-Noir:黑色电影
15. * Horror:恐怖
16. * Musical:音乐
17. * Mystery:神秘
18. * Romance:浪漫
19. * Sci-Fi:科幻
20. * Thriller:惊悚
21. * War:战争
22. * Western:西方
23. -由于偶然重复的电影记录或者电影记录测试,一些电影ID和电影名可能不一致。
24. -电影记录大多是GroupLens手工输入的,因此不一定准确。

电影文件movies.dat中摘取的部分记录如下。

1.  1::Toy Story (1995)::Animation|Children's|Comedy
2.  2::Jumanji (1995)::Adventure|Children's|Fantasy
3.  3::Grumpier Old Men (1995)::Comedy|Romance
4.  4::Waiting to Exhale (1995)::Comedy|Drama
5.  5::Father of the Bride Part II (1995)::Comedy
6.  6::Heat (1995)::Action|Crime|Thriller
7.  7::Sabrina (1995)::Comedy|Romance
8.  8::Tom and Huck (1995)::Adventure|Children's
9.  9::Sudden Death (1995)::Action
10. 10::GoldenEye (1995)::Action|Adventure|Thriller

职业文件occupations.dat的格式描述如下。

1.  OccupationID::Occupation
2.  职业ID、职业名
3.  -职业包含如下选择:
4.  * 0:“其他”或未指定
5.  * 1:“学术/教育者”
6.  * 2:“艺术家”
7.  * 3:“文书/行政”
8.  * 4:“高校毕业生”
9.  * 5:“客户服务”
10. * 6:“医生/保健”
11. * 7:“行政/管理”
12. * 8:“农民”
13. * 9:“家庭主妇”
14. * 10:“中小学生”
15. * 11:“律师”
16. * 12:“程序员”
17. * 13:“退休”
18. * 14:“销售/市场营销”
19. * 15:“科学家”
20. * 16:“个体户”
21. * 17:“技术员/工程师”
22. * 18:“商人和工匠”
23. * 19:“失业”
24. * 20:“作家”

从职业文件occupations.dat中摘取部分记录如下。

1.  0::other or not specified
2.  1::academic/educator
3.  2::artist
4.  3::clerical/admin
5.  4::college/grad student
6.  5::customer service
7.  6::doctor/health care
8.  7::executive/managerial
9.  8::farmer
10. 9::homemaker
11. 10::K-12 student
12. 11::lawyer
13. 12::programmer
14. 13::retired
15. 14::sales/marketing
16. 15::scientist
17. 16::self-employed
18. 17::technician/engineer
19. 18::tradesman/craftsman
20. 19::unemployed
21. 20::writer

12.1.3 电影点评系统用户行为分析统计实战

在本节大数据电影点评系统用户行为分析统计实战中,我们需统计用户观看电影和点评电影行为数据的采集、过滤、处理和展示。对于用户行为的数据采集:在生产环境中,企业通常使用Kafka的方式实时收集前端服务器中发送的用户行为日志记录信息;对于用户行为的数据过滤:可以在前端服务器端进行用户行为数据的过滤和格式化,也可以采用Spark SQL的方式进行数据过滤。在大数据电影点评系统用户行为分析统计实战中,基于GroupLens项目组电影推荐系统(MovieLens)已经采集的用户电影观看和点评数据文件,我们直接基于ratings.dat、users.dat、movies.dat、occupations.dat文件进行用户行为实战分析。

用户行为分析统计的数据处理:①一个基本的技巧是,先使用传统的SQL去实现一个数据处理的业务逻辑(自己可以手动模拟一些数据);②在Spark2.x的时候,再一次推荐使用DataSet去实现业务功能,尤其是统计分析功能;③如果想成为专家级别的顶级Spark人才,请使用RDD实现业务功能,为什么?原因很简单,因为使用Spark DataSet方式有一个底层的引擎catalyst,基于DataSet的编程,catalyst的引擎会对我们的代码进行优化,有很多优化的言外之意是你看不到问题到底是怎么来的,假设出错了,优化后的RDD跑在Spark上,打印的错误不是直接基于DataSet产生的错误,DataSet是在内核上的封装,运行的时候是基于RDD的!因此,打印的错误是基于RDD的。 DataSet的优化引擎catalyst涉及Spark底层的代码封装。在DataSet的解析过程中,基于抽象语法树和语法规则的相互配合,引擎catalyst完成了词法分析、未解析的逻辑计划、解析以后的逻辑计划、优化后的逻辑计划、物理计划、可执行的物理计划、物理计划执行、生成RDD等一系列过程。如果使用DataSet出现问题,我们可能不知其所以然。而在业务代码编码中,如果我们直接使用RDD,可以直接基于RDD来排查问题。在本节大数据电影点评系统用户行为分析统计实战中,我们通过RDD的方式直接统计分析用户的电影行为。

用户行为分析统计的数据格式:在生产环境中,强烈建议大家使用Parquet的文件格式。Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月成为Apache顶级项目。Parquet是列式存储格式的一种文件类型,可以适配多种计算框架,是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件。在本节大数据电影点评系统用户行为分析统计实战中,我们研究试验中小规模的用户电影点评数据的分析,专注于大数据Spark RDD的算子实现,这里仍使用GroupLens项目组提供的文本文件格式,不进行Parquet格式的转换。

大数据电影点评系统用户行为分析统计的数据源格式:

1."ratings.dat":UserID::MovieID::Rating::Timestamp
2."users.dat":UserID::Gender::Age::OccupationID::Zip-code
3."movies.dat":MovieID::Title::Genres
4.  "occupations.dat":OccupationID::OccupationName

大数据电影点评系统用户行为分析统计实战,我们使用Spark本地模式进行开发,在IDEA开发环境的SparkApps工程中的src/main/scala目录中新建包com.dt.spark.cores,然后在com.dt.spark.cores下新建Movie_Users_Analyzer_RDD.scala文件。

在Movie_Users_Analyzer_RDD.scala文件中导入电影点评数据。

1.   //设置打印日志的输出级别
2.    Logger.getLogger("org").setLevel(Level.ERROR)
3.
4.    var masterUrl = "local[4]" //默认程序运行在本地Local模式中,主要用于学习和测试
5.    var dataPath = "data/moviedata/medium/"  //数据存放的目录
6.
7.    /**
        *当我们把程序打包运行在集群上的时候,一般都会传入集群的URL信息,这里我们假设
        *如果传入参数,第一个参数只传入Spark集群的URL,第二个参数传入的是数据的地址信息
8.      */
9.
10.   if(args.length > 0) {
11.     masterUrl = args(0)
12.   } else if (args.length > 1) {
13.     dataPath = args(1)
14.   }
15.
16.   /**
        *创建Spark集群上下文sc,在sc中可以进行各种依赖和参数的设置等,大家可以通
        *过SparkSubmit脚本的help去看设置信息
17.     */
18.   val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName
      ("Movie_Users_Analyzer"))
19.
20.   /**
        *读取数据,用什么方式读取数据呢?这里使用的是RDD
21.     */
22.   val usersRDD = sc.textFile(dataPath + "users.dat")
23.   val moviesRDD = sc.textFile(dataPath + "movies.dat")
24.   val occupationsRDD = sc.textFile(dataPath + "occupations.dat")
25.        val ratingsRDD = sc.textFile(dataPath + "ratings.dat")
26.   val ratingsRDD = sc.textFile("data/moviedata/large/" + "ratings.dat")

电影点评系统用户行为分析之一,统计具体某部电影观看的用户信息,如电影ID为1193的用户信息(用户的ID、Age、Gender、Occupation)。为了便于阅读,我们在Spark Driver端collect()获取到RDD的元素集合以后,使用collect().take(2)算子打印输出RDD的两个元素,最后的用户信息的输出结果,我们使用collect().take(20)显示10个元素。

1.   /**
       *电影点评系统用户行为分析之一:分析具体某部电影观看的用户信息,如电影ID为
       *1193的用户信息(用户的ID、Age、Gender、Occupation)
2.     */
3.
4.   val usersBasic: RDD[(String, (String, String, String))] = usersRDD.
     map(_.split("::")).map { user =>
5.     ( //UserID::Gender::Age::OccupationID
6.       user(3),
7.       (user(0), user(1), user(2))
8.     )
9.   }
10.  for (elem <- usersBasic.collect().take(2)) {
11.    println("usersBasicRDD (职业ID,(用户ID,性别,年龄)): " + elem)
12.  }
13.  val occupations: RDD[(String, String)] = occupationsRDD.map(_.split
     ("::")).map(job => (job(0), job(1)))
14.  for (elem <- occupations.collect().take(2)) {
15.    println("occupationsRDD(职业ID,职业名): " + elem)
16.  }
17.  val userInformation: RDD[(String, ((String, String, String), String))]
     = usersBasic.join(occupations)
18.  userInformation.cache()
19.
20.  for (elem <- userInformation.collect().take(2)) {
21.    println("userInformationRDD (职业ID,((用户ID,性别,年龄),职业名)): " + elem)
22.  }
23.
24.  val targetMovie: RDD[(String, String)] = ratingsRDD.map(_.split
     ("::")).map(x => (x(0), x(1))).filter(_._2.equals("1193"))
25.
26.  for (elem <- targetMovie.collect().take(2)) {
27.    println("targetMovie(用户ID,电影ID) : " + elem)
28.  }
29.
30.  val targetUsers: RDD[(String, ((String, String, String), String))] =
     userInformation.map(x => (x._2._1._1, x._2))
31.  for (elem <- targetUsers.collect().take(2)) {
32.    println("targetUsers (用户ID, ((用户ID,性别,年龄), 职业名)): " + elem)
33.  }
34.  println("电影点评系统用户行为分析,统计观看电影ID为1193的电影用户信息:用户
     的ID、性别、年龄、职业名 ")
35.  val userInformationForSpecificMovie: RDD[(String, (String, ((String,
     String, String), String)))] = targetMovie.join(targetUsers)
36.  for (elem <- userInformationForSpecificMovie.collect().take(10)) {
37.    println("userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,
       性别,年龄), 职业名))) : " + elem)
38.  }

在IDEA中运行代码,结果如下。

1.  Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
     properties
2.  [Stage 0:> (0 + 0) / 2]
3.  usersBasicRDD (职业ID,(用户ID,性别,年龄)): (10,(1,F,1))
4.  usersBasicRDD (职业ID,(用户ID,性别,年龄)): (16,(2,M,56))
5.
6.  occupationsRDD(职业ID,职业名): (0,other or not specified)
7.  occupationsRDD(职业ID,职业名): (1,academic/educator)
8.
9.  userInformationRDD (职业ID, ((用户ID,性别,年龄), 职业名)): (4,((25,M,18),
    college/grad student))
10. userInformationRDD (职业ID, ((用户ID,性别,年龄), 职业名)): (4,((38,F,18),
    college/grad student))
11.
12. targetMovie(用户ID,电影ID) : (6,1193)
13. targetMovie(用户ID,电影ID) : (10,1193)
14.
15. targetUsers (用户     ID, ((用户    ID,性别,年龄), 职业名)): (25,((25,M,18),
    college/grad student))
16. targetUsers (用户     ID, ((用户    ID,性别,年龄), 职业名)): (38,((38,F,18),
    college/grad student))
17. 电影点评系统用户行为分析,统计观看电影ID为1193的电影用户信息:用户的ID、性别、年
    龄、职业名
18. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (3638,(1193,((3638,M,25),artist)))
19. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (2060,(1193,((2060,M,1),academic/educator)))
20. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (91,(1193,((91,M,35),executive/managerial)))
21. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (4150,(1193,((4150,M,25),other or not specified)))
22. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (3168,(1193,((3168,F,35),customer service)))
23. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (2596,(1193,((2596,M,50),executive/managerial)))
24. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (2813,(1193,((2813,M,25),writer)))
25. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (5445,(1193,((5445,M,25),other or not specified)))
26. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (3652,(1193,((3652,M,25),programmer)))
27. userInformationForSpecificMovie(用户ID, (电影ID, ((用户ID,性别,年龄), 职
    业名))) : (3418,(1193,((3418,F,18),clerical/admin)))