在 Windows 10 中安装和设置 Spark 开发环境 中,我们在 Windwos 中搭建了 Spark 的单机环境以方便开发调试。本文就是在这套环境的基础上演示如何使用 Spark 的 Java API 进行开发。当然,这只是一个简单的入门案例。

本文参考了 Spark 官方的案例。

在搭建好 Spark 的环境后, 我们使用如下的软件来进行 Spark 应用的开发:

  1. JDK: Java 8
  2. IDE: IDEA Intellij 社区版就可以

建立一个 maven 项目

打开 IDEA, 新建一个普通的 maven 项目, 建好以后,打开项目中的 pom.xml 文件,

首先添加 Spark 依赖库,内容如下:

1
2
3
4
5
6
7
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>

因为我们在那里中会使用 Java 8的特性,所以需要添加对 Java 8 的支持,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

编写 Java 类

在项目中新建一个名为: SimpleApp 的类,该类包含 main 方法,所以可以被直接执行。然后类中编写以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {

public static void main(String[] args) {

String logFile = "file:///d:/Devel/spark-2.4.4-bin-hadoop2.7/README.md";

SparkConf conf = new SparkConf().setAppName("Simple Application");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logData = sc.textFile(logFile).cache();

long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) {
return s.contains("a");
}
}).count();

long numBs = logData.filter(s -> {
return s.contains("b");
}).count();

System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

sc.stop();
}
}

我们简单的解释以下程序的内容,

1
2
3
SparkConf conf = new SparkConf().setAppName("Simple Application");

JavaSparkContext sc = new JavaSparkContext(conf);

这两句代码是为了构建一个 SparkContext, 在上面代码给定的 SparkConf 条件中, JavaSparkContext 会自动通过当前环境中的环境变量 SPARK_HOME 找到需要的 Spark 安装包并启动一个 Spark 运行实例。

1
JavaRDD<String> logData = sc.textFile(logFile).cache();

这行代码声明了一个弹性数据集,内容来自于 logFile。

1
2
3
4
5
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) {
return s.contains("a");
}
}).count();

这几行代码基于弹性数据集进行了一个简单的运算,首先是一个 filter 操作,该操作需要根据特定的筛选条件对数据进行筛选,而这个特定的筛选条件被封装到了接口 Function 中。

1
2
3
long numBs = logData.filter(s -> {
return s.contains("b");
}).count();

这几行代码使用了 Java 8 中的 Lambda 表达式简化了代码,功能和计算 numAs 的代码是一样的。

运行程序

在 IDEA 中选择运行,会弹出配置对话框,在对话框中选择 SimpleApp 作为启动程序, 然后在 VM options 选择中填入: -Dspark.master=local 以指名启用本地安装的 Spark

配置好后,就可以运行程序, 如果运行正常,会得到类似如下的结果(输出):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/12/01 07:52:42 INFO SparkContext: Running Spark version 2.1.0
20/12/01 07:52:43 INFO SecurityManager: Changing view acls to: stu
20/12/01 07:52:43 INFO SecurityManager: Changing modify acls to: stu
20/12/01 07:52:43 INFO SecurityManager: Changing view acls groups to:
20/12/01 07:52:43 INFO SecurityManager: Changing modify acls groups to:
20/12/01 07:52:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(stu); groups with view permissions: Set(); users with modify permissions: Set(stu); groups with modify permissions: Set()
20/12/01 07:52:45 INFO Utils: Successfully started service 'sparkDriver' on port 65032.
20/12/01 07:52:45 INFO SparkEnv: Registering MapOutputTracker
20/12/01 07:52:45 INFO SparkEnv: Registering BlockManagerMaster
20/12/01 07:52:45 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/12/01 07:52:45 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/12/01 07:52:45 INFO DiskBlockManager: Created local directory at C:\Users\stu\AppData\Local\Temp\blockmgr-b4f340e4-2ff9-48dd-ba8f-2b7b44f2e2f4
20/12/01 07:52:45 INFO MemoryStore: MemoryStore started with capacity 871.8 MB
20/12/01 07:52:45 INFO SparkEnv: Registering OutputCommitCoordinator
20/12/01 07:52:45 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/12/01 07:52:45 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.3.73:4040
20/12/01 07:52:46 INFO Executor: Starting executor ID driver on host localhost
20/12/01 07:52:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65046.
20/12/01 07:52:46 INFO NettyBlockTransferService: Server created on 192.168.3.73:65046
20/12/01 07:52:46 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/12/01 07:52:46 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.3.73, 65046, None)
20/12/01 07:52:46 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.3.73:65046 with 871.8 MB RAM, BlockManagerId(driver, 192.168.3.73, 65046, None)
20/12/01 07:52:46 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.3.73, 65046, None)
20/12/01 07:52:46 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.3.73, 65046, None)
20/12/01 07:52:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.1 KB, free 871.7 MB)
20/12/01 07:52:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.3 KB, free 871.7 MB)
20/12/01 07:52:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.3.73:65046 (size: 14.3 KB, free: 871.8 MB)
20/12/01 07:52:47 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.java:18
20/12/01 07:52:47 INFO FileInputFormat: Total input paths to process : 1
20/12/01 07:52:47 INFO SparkContext: Starting job: count at SimpleApp.java:24
20/12/01 07:52:47 INFO DAGScheduler: Got job 0 (count at SimpleApp.java:24) with 1 output partitions
20/12/01 07:52:47 INFO DAGScheduler: Final stage: ResultStage 0 (count at SimpleApp.java:24)
20/12/01 07:52:47 INFO DAGScheduler: Parents of final stage: List()
20/12/01 07:52:47 INFO DAGScheduler: Missing parents: List()
20/12/01 07:52:47 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at filter at SimpleApp.java:20), which has no missing parents
20/12/01 07:52:47 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 871.7 MB)
20/12/01 07:52:47 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.0 KB, free 871.7 MB)
20/12/01 07:52:47 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.3.73:65046 (size: 2.0 KB, free: 871.8 MB)
20/12/01 07:52:47 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
20/12/01 07:52:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at filter at SimpleApp.java:20)
20/12/01 07:52:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
20/12/01 07:52:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5914 bytes)
20/12/01 07:52:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/12/01 07:52:48 INFO HadoopRDD: Input split: file:/c:/Devel/spark-2.4.4-bin-hadoop2.7/README.md:0+3952
20/12/01 07:52:48 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
20/12/01 07:52:48 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
20/12/01 07:52:48 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
20/12/01 07:52:48 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
20/12/01 07:52:48 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
20/12/01 07:52:48 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 11.6 KB, free 871.6 MB)
20/12/01 07:52:48 INFO BlockManagerInfo: Added rdd_1_0 in memory on 192.168.3.73:65046 (size: 11.6 KB, free: 871.8 MB)
20/12/01 07:52:48 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2005 bytes result sent to driver
20/12/01 07:52:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 362 ms on localhost (executor driver) (1/1)
20/12/01 07:52:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/12/01 07:52:48 INFO DAGScheduler: ResultStage 0 (count at SimpleApp.java:24) finished in 0.395 s
20/12/01 07:52:48 INFO DAGScheduler: Job 0 finished: count at SimpleApp.java:24, took 0.559096 s
20/12/01 07:52:48 INFO SparkContext: Starting job: count at SimpleApp.java:30
20/12/01 07:52:48 INFO DAGScheduler: Got job 1 (count at SimpleApp.java:30) with 1 output partitions
20/12/01 07:52:48 INFO DAGScheduler: Final stage: ResultStage 1 (count at SimpleApp.java:30)
20/12/01 07:52:48 INFO DAGScheduler: Parents of final stage: List()
20/12/01 07:52:48 INFO DAGScheduler: Missing parents: List()
20/12/01 07:52:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at filter at SimpleApp.java:26), which has no missing parents
20/12/01 07:52:48 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.4 KB, free 871.6 MB)
20/12/01 07:52:48 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.0 KB, free 871.6 MB)
20/12/01 07:52:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.3.73:65046 (size: 2.0 KB, free: 871.8 MB)
20/12/01 07:52:48 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
20/12/01 07:52:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at filter at SimpleApp.java:26)
20/12/01 07:52:48 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
20/12/01 07:52:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 5914 bytes)
20/12/01 07:52:48 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
20/12/01 07:52:48 INFO BlockManager: Found block rdd_1_0 locally
20/12/01 07:52:48 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1131 bytes result sent to driver
20/12/01 07:52:48 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 17 ms on localhost (executor driver) (1/1)
20/12/01 07:52:48 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
20/12/01 07:52:48 INFO DAGScheduler: ResultStage 1 (count at SimpleApp.java:30) finished in 0.019 s
20/12/01 07:52:48 INFO DAGScheduler: Job 1 finished: count at SimpleApp.java:30, took 0.039857 s
20/12/01 07:52:48 INFO SparkContext: Starting job: count at SimpleApp.java:34
20/12/01 07:52:48 INFO DAGScheduler: Got job 2 (count at SimpleApp.java:34) with 1 output partitions
20/12/01 07:52:48 INFO DAGScheduler: Final stage: ResultStage 2 (count at SimpleApp.java:34)
20/12/01 07:52:48 INFO DAGScheduler: Parents of final stage: List()
20/12/01 07:52:48 INFO DAGScheduler: Missing parents: List()
20/12/01 07:52:48 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[4] at filter at SimpleApp.java:32), which has no missing parents
20/12/01 07:52:48 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 4.0 KB, free 871.6 MB)
20/12/01 07:52:48 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.3 KB, free 871.6 MB)
20/12/01 07:52:48 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.3.73:65046 (size: 2.3 KB, free: 871.8 MB)
20/12/01 07:52:48 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:996
20/12/01 07:52:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[4] at filter at SimpleApp.java:32)
20/12/01 07:52:48 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
20/12/01 07:52:48 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 5914 bytes)
20/12/01 07:52:48 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
20/12/01 07:52:48 INFO BlockManager: Found block rdd_1_0 locally
20/12/01 07:52:48 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1044 bytes result sent to driver
20/12/01 07:52:48 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 20 ms on localhost (executor driver) (1/1)
20/12/01 07:52:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
20/12/01 07:52:48 INFO DAGScheduler: ResultStage 2 (count at SimpleApp.java:34) finished in 0.022 s
20/12/01 07:52:48 INFO DAGScheduler: Job 2 finished: count at SimpleApp.java:34, took 0.035629 s
Lines with a: 62, lines with b: 31
20/12/01 07:52:48 INFO SparkUI: Stopped Spark web UI at http://192.168.3.73:4040
20/12/01 07:52:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/01 07:52:48 INFO MemoryStore: MemoryStore cleared
20/12/01 07:52:48 INFO BlockManager: BlockManager stopped
20/12/01 07:52:48 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/01 07:52:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/01 07:52:48 INFO SparkContext: Successfully stopped SparkContext
20/12/01 07:52:48 INFO ShutdownHookManager: Shutdown hook called
20/12/01 07:52:48 INFO ShutdownHookManager: Deleting directory C:\Users\stu\AppData\Local\Temp\spark-0d309824-2a28-4547-b2b8-50df4dae1482
TAGS