Skip to content

Commit 22f41ed

Browse files
Add run --hadoop
1 parent f011d89 commit 22f41ed

File tree

9 files changed

+293
-8
lines changed

9 files changed

+293
-8
lines changed

modules/cli-options/src/main/scala/scala/cli/commands/RunOptions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ final case class RunOptions(
3434
@ExtraName("sparkStandalone")
3535
standaloneSpark: Option[Boolean] = None,
3636
@Group("Run")
37+
@HelpMessage("Run as a Hadoop job, using the \"hadoop jar\" command")
38+
@ExtraName("hadoop")
39+
hadoopJar: Boolean = false,
40+
@Group("Run")
3741
@HelpMessage("Print the command that would have been run (one argument per line), rather than running it")
3842
command: Boolean = false,
3943
@Group("Run")

modules/cli/src/main/scala/scala/cli/commands/Package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ object Package extends ScalaCommand[PackageOptions] {
760760
providedFiles
761761
}
762762

763-
private def assembly(
763+
def assembly(
764764
build: Build.Successful,
765765
destPath: os.Path,
766766
mainClassOpt: Option[String],

modules/cli/src/main/scala/scala/cli/commands/Run.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import scala.cli.internal.ProcUtil
1717
import scala.util.Properties
1818
import scala.cli.config.{ConfigDb, Keys}
1919
import scala.cli.commands.util.CommonOps.SharedDirectoriesOptionsOps
20-
import scala.cli.commands.util.RunSpark
20+
import scala.cli.commands.util.{RunHadoop, RunSpark}
2121

2222
object Run extends ScalaCommand[RunOptions] {
2323
override def group = "Main"
@@ -29,6 +29,8 @@ object Run extends ScalaCommand[RunOptions] {
2929
RunMode.StandaloneSparkSubmit
3030
else if (options.sparkSubmit.getOrElse(false))
3131
RunMode.SparkSubmit
32+
else if (options.hadoopJar)
33+
RunMode.HadoopJar
3234
else
3335
RunMode.Default
3436

@@ -61,24 +63,26 @@ object Run extends ScalaCommand[RunOptions] {
6163
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine),
6264
jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse {
6365
runMode(options) match {
64-
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => Some("8")
65-
case RunMode.Default => None
66+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
67+
Some("8")
68+
case RunMode.Default => None
6669
}
6770
}
6871
),
6972
internalDependencies = baseOptions.internalDependencies.copy(
7073
addRunnerDependencyOpt = baseOptions.internalDependencies.addRunnerDependencyOpt.orElse {
7174
runMode(options) match {
72-
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => Some(false)
73-
case RunMode.Default => None
75+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
76+
Some(false)
77+
case RunMode.Default => None
7478
}
7579
}
7680
),
7781
internal = baseOptions.internal.copy(
7882
keepResolution = baseOptions.internal.keepResolution || {
7983
runMode(options) match {
80-
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit => true
81-
case RunMode.Default => false
84+
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => true
85+
case RunMode.Default => false
8286
}
8387
}
8488
),
@@ -423,6 +427,18 @@ object Run extends ScalaCommand[RunOptions] {
423427
scratchDirOpt
424428
)
425429
}
430+
case RunMode.HadoopJar =>
431+
value {
432+
RunHadoop.run(
433+
build,
434+
mainClass,
435+
args,
436+
logger,
437+
allowExecve,
438+
showCommand,
439+
scratchDirOpt
440+
)
441+
}
426442
}
427443
}
428444
}

modules/cli/src/main/scala/scala/cli/commands/packaging/Spark.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@ object Spark {
2323

2424
def sparkModules: Seq[AnyModule] =
2525
names.map(name => mod"org.apache.spark::spark-$name")
26+
27+
def hadoopModules: Seq[AnyModule] =
28+
Seq(
29+
// TODO Add more for Hadoop 2, maybe for 3 too
30+
mod"org.apache.hadoop:hadoop-client-api"
31+
)
2632
}

modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ object RunMode {
66
case object Default extends RunMode
77
case object SparkSubmit extends RunMode
88
case object StandaloneSparkSubmit extends RunMode
9+
case object HadoopJar extends RunMode
910
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package scala.cli.commands.util
2+
3+
import scala.build.EitherCps.{either, value}
4+
import scala.build.{Build, Logger}
5+
import scala.build.errors.BuildException
6+
import scala.build.internal.Runner
7+
import scala.cli.commands.{Package => PackageCmd}
8+
import scala.cli.commands.packaging.Spark
9+
10+
object RunHadoop {
11+
12+
def run(
13+
build: Build.Successful,
14+
mainClass: String,
15+
args: Seq[String],
16+
logger: Logger,
17+
allowExecve: Boolean,
18+
showCommand: Boolean,
19+
scratchDirOpt: Option[os.Path]
20+
): Either[BuildException, Either[Seq[String], (Process, Option[() => Unit])]] = either {
21+
22+
// FIXME Get Spark.hadoopModules via provided settings?
23+
val providedModules = Spark.hadoopModules
24+
scratchDirOpt.foreach(os.makeDir.all(_))
25+
val assembly = os.temp(
26+
dir = scratchDirOpt.orNull,
27+
prefix = "hadoop-job",
28+
suffix = ".jar",
29+
deleteOnExit = scratchDirOpt.isEmpty
30+
)
31+
value {
32+
PackageCmd.assembly(
33+
build,
34+
assembly,
35+
// "hadoop jar" doesn't accept a main class as second argument if the jar as first argument has a main class in its manifest…
36+
None,
37+
providedModules,
38+
withPreamble = false,
39+
() => (),
40+
logger
41+
)
42+
}
43+
44+
val javaOpts = build.options.javaOptions.javaOpts.toSeq.map(_.value.value)
45+
val extraEnv =
46+
if (javaOpts.isEmpty) Map[String, String]()
47+
else
48+
Map(
49+
"HADOOP_CLIENT_OPTS" -> javaOpts.mkString(" ") // no escaping…
50+
)
51+
val hadoopJarCommand = Seq("hadoop", "jar")
52+
val finalCommand =
53+
hadoopJarCommand ++ Seq(assembly.toString, mainClass) ++ args
54+
if (showCommand)
55+
Left(Runner.envCommand(extraEnv) ++ finalCommand)
56+
else {
57+
val proc =
58+
if (allowExecve)
59+
Runner.maybeExec("hadoop", finalCommand, logger, extraEnv = extraEnv)
60+
else
61+
Runner.run(finalCommand, logger, extraEnv = extraEnv)
62+
Right((
63+
proc,
64+
if (scratchDirOpt.isEmpty) Some(() => os.remove(assembly, checkExists = true))
65+
else None
66+
))
67+
}
68+
69+
}
70+
71+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package scala.cli.integration
2+
3+
import com.eed3si9n.expecty.Expecty.expect
4+
5+
class HadoopTests extends munit.FunSuite {
6+
7+
protected lazy val extraOptions: Seq[String] = TestUtil.extraOptions
8+
9+
test("simple map-reduce") {
10+
val inputs = TestInputs(
11+
os.rel / "WordCount.java" ->
12+
"""//> using lib "org.apache.hadoop:hadoop-client-api:3.3.3"
13+
|
14+
|// from https://hadoop.apache.org/docs/r3.3.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
15+
|
16+
|package foo;
17+
|
18+
|import java.io.IOException;
19+
|import java.util.StringTokenizer;
20+
|
21+
|import org.apache.hadoop.conf.Configuration;
22+
|import org.apache.hadoop.fs.Path;
23+
|import org.apache.hadoop.io.IntWritable;
24+
|import org.apache.hadoop.io.Text;
25+
|import org.apache.hadoop.mapreduce.Job;
26+
|import org.apache.hadoop.mapreduce.Mapper;
27+
|import org.apache.hadoop.mapreduce.Reducer;
28+
|import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
29+
|import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
30+
|
31+
|public class WordCount {
32+
|
33+
| public static class TokenizerMapper
34+
| extends Mapper<Object, Text, Text, IntWritable>{
35+
|
36+
| private final static IntWritable one = new IntWritable(1);
37+
| private Text word = new Text();
38+
|
39+
| public void map(Object key, Text value, Context context
40+
| ) throws IOException, InterruptedException {
41+
| StringTokenizer itr = new StringTokenizer(value.toString());
42+
| while (itr.hasMoreTokens()) {
43+
| word.set(itr.nextToken());
44+
| context.write(word, one);
45+
| }
46+
| }
47+
| }
48+
|
49+
| public static class IntSumReducer
50+
| extends Reducer<Text,IntWritable,Text,IntWritable> {
51+
| private IntWritable result = new IntWritable();
52+
|
53+
| public void reduce(Text key, Iterable<IntWritable> values,
54+
| Context context
55+
| ) throws IOException, InterruptedException {
56+
| int sum = 0;
57+
| for (IntWritable val : values) {
58+
| sum += val.get();
59+
| }
60+
| result.set(sum);
61+
| context.write(key, result);
62+
| }
63+
| }
64+
|
65+
| public static void main(String[] args) throws Exception {
66+
| Configuration conf = new Configuration();
67+
| Job job = Job.getInstance(conf, "word count");
68+
| job.setJarByClass(WordCount.class);
69+
| job.setMapperClass(TokenizerMapper.class);
70+
| job.setCombinerClass(IntSumReducer.class);
71+
| job.setReducerClass(IntSumReducer.class);
72+
| job.setOutputKeyClass(Text.class);
73+
| job.setOutputValueClass(IntWritable.class);
74+
| FileInputFormat.addInputPath(job, new Path(args[0]));
75+
| FileOutputFormat.setOutputPath(job, new Path(args[1]));
76+
| System.exit(job.waitForCompletion(true) ? 0 : 1);
77+
| }
78+
|}
79+
|""".stripMargin
80+
)
81+
inputs.fromRoot { root =>
82+
val res = os.proc(
83+
TestUtil.cli,
84+
"run",
85+
TestUtil.extraOptions,
86+
".",
87+
"--hadoop",
88+
"--command",
89+
"--scratch-dir",
90+
"tmp",
91+
"--",
92+
"foo"
93+
)
94+
.call(cwd = root)
95+
val command = res.out.lines()
96+
pprint.err.log(command)
97+
expect(command.take(2) == Seq("hadoop", "jar"))
98+
expect(command.takeRight(2) == Seq("foo.WordCount", "foo"))
99+
}
100+
}
101+
}

website/docs/reference/cli-options.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,6 +1482,12 @@ Aliases: `--spark-standalone`
14821482

14831483
Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI
14841484

1485+
#### `--hadoop-jar`
1486+
1487+
Aliases: `--hadoop`
1488+
1489+
Run as a Hadoop job, using the "hadoop jar" command
1490+
14851491
#### `--command`
14861492

14871493
Print the command that would have been run (one argument per line), rather than running it

website/src/pages/spark.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,83 @@ distribution. For that to work, it downloads Spark JARs, and calls the main clas
7373
```bash
7474
scala-cli run --spark-standalone SparkJob.scala # same example as above
7575
```
76+
77+
## Running Hadoop jobs
78+
79+
The `run` sub-command can run Hadoop jobs, by calling the `hadoop jar` command under-the-hood:
80+
81+
<ChainedSnippets>
82+
83+
```java title=WordCount.java
84+
//> using lib "org.apache.hadoop:hadoop-client-api:3.3.3"
85+
86+
// from https://hadoop.apache.org/docs/r3.3.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
87+
88+
import java.io.IOException;
89+
import java.util.StringTokenizer;
90+
91+
import org.apache.hadoop.conf.Configuration;
92+
import org.apache.hadoop.fs.Path;
93+
import org.apache.hadoop.io.IntWritable;
94+
import org.apache.hadoop.io.Text;
95+
import org.apache.hadoop.mapreduce.Job;
96+
import org.apache.hadoop.mapreduce.Mapper;
97+
import org.apache.hadoop.mapreduce.Reducer;
98+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
99+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
100+
101+
public class WordCount {
102+
103+
public static class TokenizerMapper
104+
extends Mapper<Object, Text, Text, IntWritable>{
105+
106+
private final static IntWritable one = new IntWritable(1);
107+
private Text word = new Text();
108+
109+
public void map(Object key, Text value, Context context
110+
) throws IOException, InterruptedException {
111+
StringTokenizer itr = new StringTokenizer(value.toString());
112+
while (itr.hasMoreTokens()) {
113+
word.set(itr.nextToken());
114+
context.write(word, one);
115+
}
116+
}
117+
}
118+
119+
public static class IntSumReducer
120+
extends Reducer<Text,IntWritable,Text,IntWritable> {
121+
private IntWritable result = new IntWritable();
122+
123+
public void reduce(Text key, Iterable<IntWritable> values,
124+
Context context
125+
) throws IOException, InterruptedException {
126+
int sum = 0;
127+
for (IntWritable val : values) {
128+
sum += val.get();
129+
}
130+
result.set(sum);
131+
context.write(key, result);
132+
}
133+
}
134+
135+
public static void main(String[] args) throws Exception {
136+
Configuration conf = new Configuration();
137+
Job job = Job.getInstance(conf, "word count");
138+
job.setJarByClass(WordCount.class);
139+
job.setMapperClass(TokenizerMapper.class);
140+
job.setCombinerClass(IntSumReducer.class);
141+
job.setReducerClass(IntSumReducer.class);
142+
job.setOutputKeyClass(Text.class);
143+
job.setOutputValueClass(IntWritable.class);
144+
FileInputFormat.addInputPath(job, new Path(args[0]));
145+
FileOutputFormat.setOutputPath(job, new Path(args[1]));
146+
System.exit(job.waitForCompletion(true) ? 0 : 1);
147+
}
148+
}
149+
```
150+
151+
```bash
152+
scala-cli run --hadoop WordCount.java
153+
```
154+
155+
</ChainedSnippets>

0 commit comments

Comments
 (0)