Skip to content

Commit 39cd360

Browse files
Add maxExecutors configuration for streaming queries (#326) (#328)
* Add maxExecutors configuration for streaming queries * scala fmt * update IT --------- (cherry picked from commit 20b761c) Signed-off-by: Peng Huo <penghuo@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent cafd79e commit 39cd360

File tree

4 files changed

+29
-0
lines changed

4 files changed

+29
-0
lines changed

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ object FlintJob extends Logging with FlintJobExecutor {
6060
* Without this setup, Spark would not recognize names in the format `my_glue1.default`.
6161
*/
6262
conf.set("spark.sql.defaultCatalog", dataSource)
63+
configDYNMaxExecutors(conf, jobType)
64+
6365
val streamingRunningCount = new AtomicInteger(0)
6466
val jobOperator =
6567
JobOperator(

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ trait FlintJobExecutor {
8585
"org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions")
8686
}
8787

88+
/*
89+
* Override dynamicAllocation.maxExecutors with streaming maxExecutors. more detail at
90+
* https://github.com/opensearch-project/opensearch-spark/issues/324
91+
*/
92+
def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = {
93+
if (jobType.equalsIgnoreCase("streaming")) {
94+
conf.set(
95+
"spark.dynamicAllocation.maxExecutors",
96+
conf
97+
.get("spark.flint.streaming.dynamicAllocation.maxExecutors", "10"))
98+
}
99+
}
100+
88101
def createSparkSession(conf: SparkConf): SparkSession = {
89102
val builder = SparkSession.builder().config(conf)
90103
if (enableHiveSupport) {

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
9191

9292
if (jobType.equalsIgnoreCase("streaming")) {
9393
logInfo(s"""streaming query ${query}""")
94+
configDYNMaxExecutors(conf, jobType)
9495
val streamingRunningCount = new AtomicInteger(0)
9596
val jobOperator =
9697
JobOperator(

spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
9797
|""".stripMargin
9898
assert(FlintJob.isSuperset(input, mapping))
9999
}
100+
101+
test("default streaming query maxExecutors is 10") {
102+
val conf = spark.sparkContext.conf
103+
FlintJob.configDYNMaxExecutors(conf, "streaming")
104+
conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "10"
105+
}
106+
107+
test("override streaming query maxExecutors") {
108+
spark.sparkContext.conf.set("spark.flint.streaming.dynamicAllocation.maxExecutors", "30")
109+
FlintJob.configDYNMaxExecutors(spark.sparkContext.conf, "streaming")
110+
spark.sparkContext.conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "30"
111+
}
112+
100113
}

0 commit comments

Comments
 (0)