Skip to content

Commit de8ed99

Browse files
Merge pull request #1129 from alexarchambault/run-spark-hadoop
Add --spark / --standalone-spark / --hadoop to the run command
2 parents 2cf64ad + 22f41ed commit de8ed99

File tree

14 files changed

+726
-63
lines changed

14 files changed

+726
-63
lines changed

modules/build/src/main/scala/scala/build/internal/Runner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ object Runner {
9999
}
100100
}
101101

102-
private def envCommand(env: Map[String, String]): Seq[String] =
102+
def envCommand(env: Map[String, String]): Seq[String] =
103103
env.toVector.sortBy(_._1).map {
104104
case (k, v) =>
105105
s"$k=$v"

modules/cli-options/src/main/scala/scala/cli/commands/RunOptions.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ final case class RunOptions(
2525
@Recurse
2626
mainClass: MainClassOptions = MainClassOptions(),
2727
@Group("Run")
28+
@Hidden
29+
@HelpMessage("Run as a Spark job, using the spark-submit command")
30+
@ExtraName("spark")
31+
sparkSubmit: Option[Boolean] = None,
32+
@Group("Run")
33+
@HelpMessage("Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI")
34+
@ExtraName("sparkStandalone")
35+
standaloneSpark: Option[Boolean] = None,
36+
@Group("Run")
37+
@HelpMessage("Run as a Hadoop job, using the \"hadoop jar\" command")
38+
@ExtraName("hadoop")
39+
hadoopJar: Boolean = false,
40+
@Group("Run")
2841
@HelpMessage("Print the command that would have been run (one argument per line), rather than running it")
2942
command: Boolean = false,
3043
@Group("Run")

modules/cli/src/main/scala/scala/cli/commands/Package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ object Package extends ScalaCommand[PackageOptions] {
719719
* provided modules, and might have been bumped by other modules. This is strictly a subset of
720720
* the whole dependency graph.
721721
*/
722-
private def providedFiles(
722+
def providedFiles(
723723
build: Build.Successful,
724724
provided: Seq[dependency.AnyModule],
725725
logger: Logger
@@ -760,7 +760,7 @@ object Package extends ScalaCommand[PackageOptions] {
760760
providedFiles
761761
}
762762

763-
private def assembly(
763+
def assembly(
764764
build: Build.Successful,
765765
destPath: os.Path,
766766
mainClassOpt: Option[String],

modules/cli/src/main/scala/scala/cli/commands/Run.scala

Lines changed: 114 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,30 @@ import scala.build.internal.{Constants, Runner, ScalaJsLinkerConfig}
1010
import scala.build.options.{BuildOptions, JavaOpt, Platform}
1111
import scala.build.{Build, BuildThreads, Inputs, Logger, Positioned}
1212
import scala.cli.CurrentParams
13+
import scala.cli.commands.run.RunMode
1314
import scala.cli.commands.util.MainClassOptionsUtil._
1415
import scala.cli.commands.util.SharedOptionsUtil._
1516
import scala.cli.internal.ProcUtil
1617
import scala.util.Properties
1718
import scala.cli.config.{ConfigDb, Keys}
1819
import scala.cli.commands.util.CommonOps.SharedDirectoriesOptionsOps
20+
import scala.cli.commands.util.{RunHadoop, RunSpark}
1921

2022
object Run extends ScalaCommand[RunOptions] {
2123
override def group = "Main"
2224

2325
override def sharedOptions(options: RunOptions): Option[SharedOptions] = Some(options.shared)
2426

27+
private def runMode(options: RunOptions): RunMode =
28+
if (options.standaloneSpark.getOrElse(false) && !options.sparkSubmit.contains(false))
29+
RunMode.StandaloneSparkSubmit
30+
else if (options.sparkSubmit.getOrElse(false))
31+
RunMode.SparkSubmit
32+
else if (options.hadoopJar)
33+
RunMode.HadoopJar
34+
else
35+
RunMode.Default
36+
2537
private def scratchDirOpt(options: RunOptions): Option[os.Path] =
2638
options.scratchDir
2739
.filter(_.trim.nonEmpty)
@@ -48,7 +60,31 @@ object Run extends ScalaCommand[RunOptions] {
4860
javaOptions = baseOptions.javaOptions.copy(
4961
javaOpts =
5062
baseOptions.javaOptions.javaOpts ++
51-
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine)
63+
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine),
64+
jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse {
65+
runMode(options) match {
66+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
67+
Some("8")
68+
case RunMode.Default => None
69+
}
70+
}
71+
),
72+
internalDependencies = baseOptions.internalDependencies.copy(
73+
addRunnerDependencyOpt = baseOptions.internalDependencies.addRunnerDependencyOpt.orElse {
74+
runMode(options) match {
75+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
76+
Some(false)
77+
case RunMode.Default => None
78+
}
79+
}
80+
),
81+
internal = baseOptions.internal.copy(
82+
keepResolution = baseOptions.internal.keepResolution || {
83+
runMode(options) match {
84+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => true
85+
case RunMode.Default => false
86+
}
87+
}
5288
),
5389
notForBloopOptions = baseOptions.notForBloopOptions.copy(
5490
runWithManifest = options.useManifest
@@ -76,6 +112,7 @@ object Run extends ScalaCommand[RunOptions] {
76112
def maybeRun(
77113
build: Build.Successful,
78114
allowTerminate: Boolean,
115+
runMode: RunMode,
79116
showCommand: Boolean,
80117
scratchDirOpt: Option[os.Path]
81118
): Either[BuildException, Option[(Process, CompletableFuture[_])]] = either {
@@ -95,15 +132,17 @@ object Run extends ScalaCommand[RunOptions] {
95132
allowExecve = allowTerminate,
96133
jvmRunner = build.artifacts.hasJvmRunner,
97134
potentialMainClasses,
135+
runMode,
98136
showCommand,
99137
scratchDirOpt
100138
)
101139
}
102140

103141
processOrCommand match {
104-
case Right(process) =>
142+
case Right((process, onExitOpt)) =>
105143
val onExitProcess = process.onExit().thenApply { p1 =>
106144
val retCode = p1.exitValue()
145+
onExitOpt.foreach(_())
107146
if (retCode != 0)
108147
if (allowTerminate)
109148
sys.exit(retCode)
@@ -170,6 +209,7 @@ object Run extends ScalaCommand[RunOptions] {
170209
val maybeProcess = maybeRun(
171210
s,
172211
allowTerminate = false,
212+
runMode = runMode(options),
173213
showCommand = options.command,
174214
scratchDirOpt = scratchDirOpt(options)
175215
)
@@ -206,6 +246,7 @@ object Run extends ScalaCommand[RunOptions] {
206246
val res = maybeRun(
207247
s,
208248
allowTerminate = true,
249+
runMode = runMode(options),
209250
showCommand = options.command,
210251
scratchDirOpt = scratchDirOpt(options)
211252
)
@@ -226,9 +267,10 @@ object Run extends ScalaCommand[RunOptions] {
226267
allowExecve: Boolean,
227268
jvmRunner: Boolean,
228269
potentialMainClasses: Seq[String],
270+
runMode: RunMode,
229271
showCommand: Boolean,
230272
scratchDirOpt: Option[os.Path]
231-
): Either[BuildException, Either[Seq[String], Process]] = either {
273+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
232274

233275
val mainClassOpt = build.options.mainClass.filter(_.nonEmpty) // trim it too?
234276
.orElse {
@@ -250,6 +292,7 @@ object Run extends ScalaCommand[RunOptions] {
250292
finalArgs,
251293
logger,
252294
allowExecve,
295+
runMode,
253296
showCommand,
254297
scratchDirOpt
255298
)
@@ -262,9 +305,10 @@ object Run extends ScalaCommand[RunOptions] {
262305
args: Seq[String],
263306
logger: Logger,
264307
allowExecve: Boolean,
308+
runMode: RunMode,
265309
showCommand: Boolean,
266310
scratchDirOpt: Option[os.Path]
267-
): Either[BuildException, Either[Seq[String], Process]] = either {
311+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
268312

269313
build.options.platform.value match {
270314
case Platform.JS =>
@@ -308,7 +352,7 @@ object Run extends ScalaCommand[RunOptions] {
308352
esModule = esModule
309353
)
310354
process.onExit().thenApply(_ => if (os.exists(jsDest)) os.remove(jsDest))
311-
Right(process)
355+
Right((process, None))
312356
}
313357
}
314358
value(res)
@@ -327,33 +371,74 @@ object Run extends ScalaCommand[RunOptions] {
327371
logger,
328372
allowExecve = allowExecve
329373
)
330-
Right(proc)
374+
Right((proc, None))
331375
}
332376
}
333377
case Platform.JVM =>
334-
if (showCommand) {
335-
val command = Runner.jvmCommand(
336-
build.options.javaHome().value.javaCommand,
337-
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
338-
build.fullClassPath,
339-
mainClass,
340-
args,
341-
useManifest = build.options.notForBloopOptions.runWithManifest
342-
)
343-
Left(command)
344-
}
345-
else {
346-
val proc = Runner.runJvm(
347-
build.options.javaHome().value.javaCommand,
348-
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
349-
build.fullClassPath,
350-
mainClass,
351-
args,
352-
logger,
353-
allowExecve = allowExecve,
354-
useManifest = build.options.notForBloopOptions.runWithManifest
355-
)
356-
Right(proc)
378+
runMode match {
379+
case RunMode.Default =>
380+
if (showCommand) {
381+
val command = Runner.jvmCommand(
382+
build.options.javaHome().value.javaCommand,
383+
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
384+
build.fullClassPath,
385+
mainClass,
386+
args,
387+
useManifest = build.options.notForBloopOptions.runWithManifest,
388+
scratchDirOpt = scratchDirOpt
389+
)
390+
Left(command)
391+
}
392+
else {
393+
val proc = Runner.runJvm(
394+
build.options.javaHome().value.javaCommand,
395+
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
396+
build.fullClassPath,
397+
mainClass,
398+
args,
399+
logger,
400+
allowExecve = allowExecve,
401+
useManifest = build.options.notForBloopOptions.runWithManifest,
402+
scratchDirOpt = scratchDirOpt
403+
)
404+
Right((proc, None))
405+
}
406+
case RunMode.SparkSubmit =>
407+
value {
408+
RunSpark.run(
409+
build,
410+
mainClass,
411+
args,
412+
logger,
413+
allowExecve,
414+
showCommand,
415+
scratchDirOpt
416+
)
417+
}
418+
case RunMode.StandaloneSparkSubmit =>
419+
value {
420+
RunSpark.runStandalone(
421+
build,
422+
mainClass,
423+
args,
424+
logger,
425+
allowExecve,
426+
showCommand,
427+
scratchDirOpt
428+
)
429+
}
430+
case RunMode.HadoopJar =>
431+
value {
432+
RunHadoop.run(
433+
build,
434+
mainClass,
435+
args,
436+
logger,
437+
allowExecve,
438+
showCommand,
439+
scratchDirOpt
440+
)
441+
}
357442
}
358443
}
359444
}

modules/cli/src/main/scala/scala/cli/commands/packaging/Spark.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@ object Spark {
2323

2424
def sparkModules: Seq[AnyModule] =
2525
names.map(name => mod"org.apache.spark::spark-$name")
26+
27+
def hadoopModules: Seq[AnyModule] =
28+
Seq(
29+
// TODO Add more for Hadoop 2, maybe for 3 too
30+
mod"org.apache.hadoop:hadoop-client-api"
31+
)
2632
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package scala.cli.commands.run
2+
3+
sealed abstract class RunMode extends Product with Serializable
4+
5+
object RunMode {
6+
case object Default extends RunMode
7+
case object SparkSubmit extends RunMode
8+
case object StandaloneSparkSubmit extends RunMode
9+
case object HadoopJar extends RunMode
10+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package scala.cli.commands.util
2+
3+
import scala.build.EitherCps.{either, value}
4+
import scala.build.{Build, Logger}
5+
import scala.build.errors.BuildException
6+
import scala.build.internal.Runner
7+
import scala.cli.commands.{Package => PackageCmd}
8+
import scala.cli.commands.packaging.Spark
9+
10+
object RunHadoop {
11+
12+
def run(
13+
build: Build.Successful,
14+
mainClass: String,
15+
args: Seq[String],
16+
logger: Logger,
17+
allowExecve: Boolean,
18+
showCommand: Boolean,
19+
scratchDirOpt: Option[os.Path]
20+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
21+
22+
// FIXME Get Spark.hadoopModules via provided settings?
23+
val providedModules = Spark.hadoopModules
24+
scratchDirOpt.foreach(os.makeDir.all(_))
25+
val assembly = os.temp(
26+
dir = scratchDirOpt.orNull,
27+
prefix = "hadoop-job",
28+
suffix = ".jar",
29+
deleteOnExit = scratchDirOpt.isEmpty
30+
)
31+
value {
32+
PackageCmd.assembly(
33+
build,
34+
assembly,
35+
// "hadoop jar" doesn't accept a main class as second argument if the jar as first argument has a main class in its manifest…
36+
None,
37+
providedModules,
38+
withPreamble = false,
39+
() => (),
40+
logger
41+
)
42+
}
43+
44+
val javaOpts = build.options.javaOptions.javaOpts.toSeq.map(_.value.value)
45+
val extraEnv =
46+
if (javaOpts.isEmpty) Map[String, String]()
47+
else
48+
Map(
49+
"HADOOP_CLIENT_OPTS" -> javaOpts.mkString(" ") // no escaping…
50+
)
51+
val hadoopJarCommand = Seq("hadoop", "jar")
52+
val finalCommand =
53+
hadoopJarCommand ++ Seq(assembly.toString, mainClass) ++ args
54+
if (showCommand)
55+
Left(Runner.envCommand(extraEnv) ++ finalCommand)
56+
else {
57+
val proc =
58+
if (allowExecve)
59+
Runner.maybeExec("hadoop", finalCommand, logger, extraEnv = extraEnv)
60+
else
61+
Runner.run(finalCommand, logger, extraEnv = extraEnv)
62+
Right((
63+
proc,
64+
if (scratchDirOpt.isEmpty) Some(() => os.remove(assembly, checkExists = true))
65+
else None
66+
))
67+
}
68+
69+
}
70+
71+
}

0 commit comments

Comments
 (0)