Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,14 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 100, "Allocation batch delay must be greater than 0.1s.")
.createWithDefaultString("1s")

val KUBERNETES_ALLOCATION_MAXIMUM =
ConfigBuilder("spark.kubernetes.allocation.maximum")
.doc("The maximum number of executor pods to try to create during the whole job lifecycle.")
.version("4.1.0")
.intConf
.checkValue(value => value > 0, "Allocation maximum should be a positive integer")
.createWithDefault(Int.MaxValue)

val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
.doc("Time to wait for driver pod to get ready before creating executor pods. This wait " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class ExecutorPodsAllocator(

protected val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)

protected val podAllocationMaximum = conf.get(KUBERNETES_ALLOCATION_MAXIMUM)

protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)

protected val podCreationTimeout = math.max(
Expand Down Expand Up @@ -426,6 +428,9 @@ class ExecutorPodsAllocator(
return
}
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
if (newExecutorId >= podAllocationMaximum) {
throw new SparkException(s"Exceed the pod creation limit: $podAllocationMaximum")
}
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.resource(podWithAttachedContainerForId(podAllocationSize + 1))
}

test("SPARK-53907: Support spark.kubernetes.allocation.maximum") {
val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1")
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)

val m = intercept[SparkException] {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
}.getMessage
assert(m.contains("Exceed the pod creation limit: 1"))
}

test("Request executors in batches. Allow another batch to be requested if" +
" all pending executors start running.") {
val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
Expand Down