Skip to content

Commit f011d89

Browse files
Add run --spark-standalone
1 parent d5f8434 commit f011d89

File tree

7 files changed

+134
-9
lines changed

7 files changed

+134
-9
lines changed

modules/cli-options/src/main/scala/scala/cli/commands/RunOptions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ final case class RunOptions(
3030
@ExtraName("spark")
3131
sparkSubmit: Option[Boolean] = None,
3232
@Group("Run")
33+
@HelpMessage("Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI")
34+
@ExtraName("sparkStandalone")
35+
standaloneSpark: Option[Boolean] = None,
36+
@Group("Run")
3337
@HelpMessage("Print the command that would have been run (one argument per line), rather than running it")
3438
command: Boolean = false,
3539
@Group("Run")

modules/cli/src/main/scala/scala/cli/commands/Run.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ object Run extends ScalaCommand[RunOptions] {
2525
override def sharedOptions(options: RunOptions): Option[SharedOptions] = Some(options.shared)
2626

2727
private def runMode(options: RunOptions): RunMode =
28-
if (options.sparkSubmit.getOrElse(false))
28+
if (options.standaloneSpark.getOrElse(false) && !options.sparkSubmit.contains(false))
29+
RunMode.StandaloneSparkSubmit
30+
else if (options.sparkSubmit.getOrElse(false))
2931
RunMode.SparkSubmit
3032
else
3133
RunMode.Default
@@ -59,24 +61,24 @@ object Run extends ScalaCommand[RunOptions] {
5961
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine),
6062
jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse {
6163
runMode(options) match {
62-
case RunMode.SparkSubmit => Some("8")
63-
case RunMode.Default => None
64+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => Some("8")
65+
case RunMode.Default => None
6466
}
6567
}
6668
),
6769
internalDependencies = baseOptions.internalDependencies.copy(
6870
addRunnerDependencyOpt = baseOptions.internalDependencies.addRunnerDependencyOpt.orElse {
6971
runMode(options) match {
70-
case RunMode.SparkSubmit => Some(false)
71-
case RunMode.Default => None
72+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => Some(false)
73+
case RunMode.Default => None
7274
}
7375
}
7476
),
7577
internal = baseOptions.internal.copy(
7678
keepResolution = baseOptions.internal.keepResolution || {
7779
runMode(options) match {
78-
case RunMode.SparkSubmit => true
79-
case RunMode.Default => false
80+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => true
81+
case RunMode.Default => false
8082
}
8183
}
8284
),
@@ -409,6 +411,18 @@ object Run extends ScalaCommand[RunOptions] {
409411
scratchDirOpt
410412
)
411413
}
414+
case RunMode.StandaloneSparkSubmit =>
415+
value {
416+
RunSpark.runStandalone(
417+
build,
418+
mainClass,
419+
args,
420+
logger,
421+
allowExecve,
422+
showCommand,
423+
scratchDirOpt
424+
)
425+
}
412426
}
413427
}
414428
}

modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scala.cli.commands.run
33
sealed abstract class RunMode extends Product with Serializable
44

55
object RunMode {
6-
case object Default extends RunMode
7-
case object SparkSubmit extends RunMode
6+
case object Default extends RunMode
7+
case object SparkSubmit extends RunMode
8+
case object StandaloneSparkSubmit extends RunMode
89
}

modules/cli/src/main/scala/scala/cli/commands/util/RunSpark.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,74 @@ object RunSpark {
7373
}
7474
}
7575

76+
def runStandalone(
77+
build: Build.Successful,
78+
mainClass: String,
79+
args: Seq[String],
80+
logger: Logger,
81+
allowExecve: Boolean,
82+
showCommand: Boolean,
83+
scratchDirOpt: Option[os.Path]
84+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
85+
86+
// FIXME Get Spark.sparkModules via provided settings?
87+
val providedModules = Spark.sparkModules
88+
val sparkClassPath = value(PackageCmd.providedFiles(build, providedModules, logger))
89+
90+
scratchDirOpt.foreach(os.makeDir.all(_))
91+
val library = os.temp(
92+
Library.libraryJar(build),
93+
dir = scratchDirOpt.orNull,
94+
deleteOnExit = scratchDirOpt.isEmpty,
95+
prefix = "spark-job",
96+
suffix = ".jar"
97+
)
98+
99+
val finalMainClass = "org.apache.spark.deploy.SparkSubmit"
100+
val depCp = build.dependencyClassPath.filterNot(sparkClassPath.toSet)
101+
val javaHomeInfo = build.options.javaHome().value
102+
val javaOpts = build.options.javaOptions.javaOpts.toSeq.map(_.value.value)
103+
val jarsArgs =
104+
if (depCp.isEmpty) Nil
105+
else Seq("--jars", depCp.mkString(","))
106+
val finalArgs =
107+
Seq("--class", mainClass) ++
108+
jarsArgs ++
109+
javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++
110+
Seq(library.toString) ++
111+
args
112+
val envUpdates = javaHomeInfo.envUpdates(sys.env)
113+
if (showCommand) {
114+
val command = Runner.jvmCommand(
115+
javaHomeInfo.javaCommand,
116+
javaOpts,
117+
sparkClassPath,
118+
finalMainClass,
119+
finalArgs,
120+
extraEnv = envUpdates,
121+
useManifest = build.options.notForBloopOptions.runWithManifest,
122+
scratchDirOpt = scratchDirOpt
123+
)
124+
Left(command)
125+
}
126+
else {
127+
val proc = Runner.runJvm(
128+
javaHomeInfo.javaCommand,
129+
javaOpts,
130+
sparkClassPath,
131+
finalMainClass,
132+
finalArgs,
133+
logger,
134+
allowExecve = allowExecve,
135+
extraEnv = envUpdates,
136+
useManifest = build.options.notForBloopOptions.runWithManifest,
137+
scratchDirOpt = scratchDirOpt
138+
)
139+
Right((
140+
proc,
141+
if (scratchDirOpt.isEmpty) Some(() => os.remove(library, checkExists = true))
142+
else None
143+
))
144+
}
145+
}
76146
}

modules/integration/src/test/scala/scala/cli/integration/SparkTests212.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,18 @@ class SparkTests212 extends SparkTestDefinitions {
154154
expect(output.contains(expectedOutput))
155155
}
156156

157+
def simpleRunStandaloneSparkJobTest(spark: Spark): Unit =
158+
simpleJobInputs(spark).fromRoot { root =>
159+
val res = os.proc(TestUtil.cli, "run", extraOptions, "--spark-standalone", "--jvm", "8", ".")
160+
.call(cwd = root)
161+
162+
val expectedOutput = "Result: 55"
163+
164+
val output = res.out.trim().linesIterator.toVector
165+
166+
expect(output.contains(expectedOutput))
167+
}
168+
157169
test("package spark 2.4") {
158170
simplePackageSparkJobTest(spark24)
159171
}
@@ -174,4 +186,12 @@ class SparkTests212 extends SparkTestDefinitions {
174186
simpleRunSparkJobTest(spark30, usePath = true)
175187
}
176188

189+
test("run spark 2.4 standalone") {
190+
simpleRunStandaloneSparkJobTest(spark24)
191+
}
192+
193+
test("run spark 3.0 standalone") {
194+
simpleRunStandaloneSparkJobTest(spark30)
195+
}
196+
177197
}

website/docs/reference/cli-options.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,12 @@ Aliases: `--spark`
14761476

14771477
Run as a Spark job, using the spark-submit command
14781478

1479+
#### `--standalone-spark`
1480+
1481+
Aliases: `--spark-standalone`
1482+
1483+
Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI
1484+
14791485
#### `--command`
14801486

14811487
Print the command that would have been run (one argument per line), rather than running it

website/src/pages/spark.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,13 @@ scala-cli run --spark SparkJob.scala # same example as above
6363
Note that this requires either
6464
- `spark-submit` to be in available in `PATH`
6565
- `SPARK_HOME` to be set in the environment
66+
67+
## Running Spark jobs in a standalone way
68+
69+
The `run` sub-command can not only run Spark jobs, but it can also work without a Spark
70+
distribution. For that to work, it downloads Spark JARs, and calls the main class of
71+
`spark-submit` itself via these JARs:
72+
73+
```bash
74+
scala-cli run --spark-standalone SparkJob.scala # same example as above
75+
```

0 commit comments

Comments
 (0)