Skip to content

Commit d5f8434

Browse files
Add run --spark
1 parent 12500d3 commit d5f8434

File tree

10 files changed

+291
-60
lines changed

10 files changed

+291
-60
lines changed

modules/build/src/main/scala/scala/build/internal/Runner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ object Runner {
9999
}
100100
}
101101

102-
private def envCommand(env: Map[String, String]): Seq[String] =
102+
def envCommand(env: Map[String, String]): Seq[String] =
103103
env.toVector.sortBy(_._1).map {
104104
case (k, v) =>
105105
s"$k=$v"

modules/cli-options/src/main/scala/scala/cli/commands/RunOptions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ final case class RunOptions(
2525
@Recurse
2626
mainClass: MainClassOptions = MainClassOptions(),
2727
@Group("Run")
28+
@Hidden
29+
@HelpMessage("Run as a Spark job, using the spark-submit command")
30+
@ExtraName("spark")
31+
sparkSubmit: Option[Boolean] = None,
32+
@Group("Run")
2833
@HelpMessage("Print the command that would have been run (one argument per line), rather than running it")
2934
command: Boolean = false,
3035
@Group("Run")

modules/cli/src/main/scala/scala/cli/commands/Package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ object Package extends ScalaCommand[PackageOptions] {
719719
* provided modules, and might have been bumped by other modules. This is strictly a subset of
720720
* the whole dependency graph.
721721
*/
722-
private def providedFiles(
722+
def providedFiles(
723723
build: Build.Successful,
724724
provided: Seq[dependency.AnyModule],
725725
logger: Logger

modules/cli/src/main/scala/scala/cli/commands/Run.scala

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,26 @@ import scala.build.internal.{Constants, Runner, ScalaJsLinkerConfig}
1010
import scala.build.options.{BuildOptions, JavaOpt, Platform}
1111
import scala.build.{Build, BuildThreads, Inputs, Logger, Positioned}
1212
import scala.cli.CurrentParams
13+
import scala.cli.commands.run.RunMode
1314
import scala.cli.commands.util.MainClassOptionsUtil._
1415
import scala.cli.commands.util.SharedOptionsUtil._
1516
import scala.cli.internal.ProcUtil
1617
import scala.util.Properties
1718
import scala.cli.config.{ConfigDb, Keys}
1819
import scala.cli.commands.util.CommonOps.SharedDirectoriesOptionsOps
20+
import scala.cli.commands.util.RunSpark
1921

2022
object Run extends ScalaCommand[RunOptions] {
2123
override def group = "Main"
2224

2325
override def sharedOptions(options: RunOptions): Option[SharedOptions] = Some(options.shared)
2426

27+
private def runMode(options: RunOptions): RunMode =
28+
if (options.sparkSubmit.getOrElse(false))
29+
RunMode.SparkSubmit
30+
else
31+
RunMode.Default
32+
2533
private def scratchDirOpt(options: RunOptions): Option[os.Path] =
2634
options.scratchDir
2735
.filter(_.trim.nonEmpty)
@@ -48,7 +56,29 @@ object Run extends ScalaCommand[RunOptions] {
4856
javaOptions = baseOptions.javaOptions.copy(
4957
javaOpts =
5058
baseOptions.javaOptions.javaOpts ++
51-
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine)
59+
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine),
60+
jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse {
61+
runMode(options) match {
62+
case RunMode.SparkSubmit => Some("8")
63+
case RunMode.Default => None
64+
}
65+
}
66+
),
67+
internalDependencies = baseOptions.internalDependencies.copy(
68+
addRunnerDependencyOpt = baseOptions.internalDependencies.addRunnerDependencyOpt.orElse {
69+
runMode(options) match {
70+
case RunMode.SparkSubmit => Some(false)
71+
case RunMode.Default => None
72+
}
73+
}
74+
),
75+
internal = baseOptions.internal.copy(
76+
keepResolution = baseOptions.internal.keepResolution || {
77+
runMode(options) match {
78+
case RunMode.SparkSubmit => true
79+
case RunMode.Default => false
80+
}
81+
}
5282
),
5383
notForBloopOptions = baseOptions.notForBloopOptions.copy(
5484
runWithManifest = options.useManifest
@@ -76,6 +106,7 @@ object Run extends ScalaCommand[RunOptions] {
76106
def maybeRun(
77107
build: Build.Successful,
78108
allowTerminate: Boolean,
109+
runMode: RunMode,
79110
showCommand: Boolean,
80111
scratchDirOpt: Option[os.Path]
81112
): Either[BuildException, Option[(Process, CompletableFuture[_])]] = either {
@@ -95,15 +126,17 @@ object Run extends ScalaCommand[RunOptions] {
95126
allowExecve = allowTerminate,
96127
jvmRunner = build.artifacts.hasJvmRunner,
97128
potentialMainClasses,
129+
runMode,
98130
showCommand,
99131
scratchDirOpt
100132
)
101133
}
102134

103135
processOrCommand match {
104-
case Right(process) =>
136+
case Right((process, onExitOpt)) =>
105137
val onExitProcess = process.onExit().thenApply { p1 =>
106138
val retCode = p1.exitValue()
139+
onExitOpt.foreach(_())
107140
if (retCode != 0)
108141
if (allowTerminate)
109142
sys.exit(retCode)
@@ -170,6 +203,7 @@ object Run extends ScalaCommand[RunOptions] {
170203
val maybeProcess = maybeRun(
171204
s,
172205
allowTerminate = false,
206+
runMode = runMode(options),
173207
showCommand = options.command,
174208
scratchDirOpt = scratchDirOpt(options)
175209
)
@@ -206,6 +240,7 @@ object Run extends ScalaCommand[RunOptions] {
206240
val res = maybeRun(
207241
s,
208242
allowTerminate = true,
243+
runMode = runMode(options),
209244
showCommand = options.command,
210245
scratchDirOpt = scratchDirOpt(options)
211246
)
@@ -226,9 +261,10 @@ object Run extends ScalaCommand[RunOptions] {
226261
allowExecve: Boolean,
227262
jvmRunner: Boolean,
228263
potentialMainClasses: Seq[String],
264+
runMode: RunMode,
229265
showCommand: Boolean,
230266
scratchDirOpt: Option[os.Path]
231-
): Either[BuildException, Either[Seq[String], Process]] = either {
267+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
232268

233269
val mainClassOpt = build.options.mainClass.filter(_.nonEmpty) // trim it too?
234270
.orElse {
@@ -250,6 +286,7 @@ object Run extends ScalaCommand[RunOptions] {
250286
finalArgs,
251287
logger,
252288
allowExecve,
289+
runMode,
253290
showCommand,
254291
scratchDirOpt
255292
)
@@ -262,9 +299,10 @@ object Run extends ScalaCommand[RunOptions] {
262299
args: Seq[String],
263300
logger: Logger,
264301
allowExecve: Boolean,
302+
runMode: RunMode,
265303
showCommand: Boolean,
266304
scratchDirOpt: Option[os.Path]
267-
): Either[BuildException, Either[Seq[String], Process]] = either {
305+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
268306

269307
build.options.platform.value match {
270308
case Platform.JS =>
@@ -308,7 +346,7 @@ object Run extends ScalaCommand[RunOptions] {
308346
esModule = esModule
309347
)
310348
process.onExit().thenApply(_ => if (os.exists(jsDest)) os.remove(jsDest))
311-
Right(process)
349+
Right((process, None))
312350
}
313351
}
314352
value(res)
@@ -327,33 +365,50 @@ object Run extends ScalaCommand[RunOptions] {
327365
logger,
328366
allowExecve = allowExecve
329367
)
330-
Right(proc)
368+
Right((proc, None))
331369
}
332370
}
333371
case Platform.JVM =>
334-
if (showCommand) {
335-
val command = Runner.jvmCommand(
336-
build.options.javaHome().value.javaCommand,
337-
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
338-
build.fullClassPath,
339-
mainClass,
340-
args,
341-
useManifest = build.options.notForBloopOptions.runWithManifest
342-
)
343-
Left(command)
344-
}
345-
else {
346-
val proc = Runner.runJvm(
347-
build.options.javaHome().value.javaCommand,
348-
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
349-
build.fullClassPath,
350-
mainClass,
351-
args,
352-
logger,
353-
allowExecve = allowExecve,
354-
useManifest = build.options.notForBloopOptions.runWithManifest
355-
)
356-
Right(proc)
372+
runMode match {
373+
case RunMode.Default =>
374+
if (showCommand) {
375+
val command = Runner.jvmCommand(
376+
build.options.javaHome().value.javaCommand,
377+
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
378+
build.fullClassPath,
379+
mainClass,
380+
args,
381+
useManifest = build.options.notForBloopOptions.runWithManifest,
382+
scratchDirOpt = scratchDirOpt
383+
)
384+
Left(command)
385+
}
386+
else {
387+
val proc = Runner.runJvm(
388+
build.options.javaHome().value.javaCommand,
389+
build.options.javaOptions.javaOpts.toSeq.map(_.value.value),
390+
build.fullClassPath,
391+
mainClass,
392+
args,
393+
logger,
394+
allowExecve = allowExecve,
395+
useManifest = build.options.notForBloopOptions.runWithManifest,
396+
scratchDirOpt = scratchDirOpt
397+
)
398+
Right((proc, None))
399+
}
400+
case RunMode.SparkSubmit =>
401+
value {
402+
RunSpark.run(
403+
build,
404+
mainClass,
405+
args,
406+
logger,
407+
allowExecve,
408+
showCommand,
409+
scratchDirOpt
410+
)
411+
}
357412
}
358413
}
359414
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package scala.cli.commands.run
2+
3+
sealed abstract class RunMode extends Product with Serializable
4+
5+
object RunMode {
6+
case object Default extends RunMode
7+
case object SparkSubmit extends RunMode
8+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package scala.cli.commands.util
2+
3+
import scala.build.EitherCps.{either, value}
4+
import scala.build.{Build, Logger}
5+
import scala.build.errors.BuildException
6+
import scala.build.internal.Runner
7+
import scala.cli.commands.{Package => PackageCmd}
8+
import scala.cli.commands.packaging.Spark
9+
import scala.cli.commands.run.RunMode
10+
import scala.cli.packaging.Library
11+
import scala.util.Properties
12+
13+
object RunSpark {
14+
15+
def run(
16+
build: Build.Successful,
17+
mainClass: String,
18+
args: Seq[String],
19+
logger: Logger,
20+
allowExecve: Boolean,
21+
showCommand: Boolean,
22+
scratchDirOpt: Option[os.Path]
23+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
24+
25+
// FIXME Get Spark.sparkModules via provided settings?
26+
val providedModules = Spark.sparkModules
27+
val providedFiles =
28+
value(PackageCmd.providedFiles(build, providedModules, logger)).toSet
29+
val depCp = build.dependencyClassPath.filterNot(providedFiles)
30+
val javaHomeInfo = build.options.javaHome().value
31+
val javaOpts = build.options.javaOptions.javaOpts.toSeq.map(_.value.value)
32+
val ext = if (Properties.isWin) ".cmd" else ""
33+
val submitCommand: String =
34+
Option(System.getenv("SPARK_HOME"))
35+
.map(os.Path(_, os.pwd))
36+
.map(_ / "bin" / s"spark-submit$ext")
37+
.filter(os.exists(_))
38+
.map(_.toString)
39+
.getOrElse(s"spark-submit$ext")
40+
val jarsArgs =
41+
if (depCp.isEmpty) Nil
42+
else Seq("--jars", depCp.mkString(","))
43+
44+
scratchDirOpt.foreach(os.makeDir.all(_))
45+
val library = os.temp(
46+
Library.libraryJar(build),
47+
dir = scratchDirOpt.orNull,
48+
deleteOnExit = scratchDirOpt.isEmpty,
49+
prefix = "spark-job",
50+
suffix = ".jar"
51+
)
52+
53+
val finalCommand =
54+
Seq(submitCommand, "--class", mainClass) ++
55+
jarsArgs ++
56+
javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++
57+
Seq(library.toString) ++
58+
args
59+
val envUpdates = javaHomeInfo.envUpdates(sys.env)
60+
if (showCommand)
61+
Left(Runner.envCommand(envUpdates) ++ finalCommand)
62+
else {
63+
val proc =
64+
if (allowExecve)
65+
Runner.maybeExec("spark-submit", finalCommand, logger, extraEnv = envUpdates)
66+
else
67+
Runner.run(finalCommand, logger, extraEnv = envUpdates)
68+
Right((
69+
proc,
70+
if (scratchDirOpt.isEmpty) Some(() => os.remove(library, checkExists = true))
71+
else None
72+
))
73+
}
74+
}
75+
76+
}

0 commit comments

Comments
 (0)