Skip to content

[IN PROGRESS] Support getting pod state using Informers/Listers #51396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,17 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("direct")

val KUBERNETES_EXECUTOR_POD_SNAPSHOT_SOURCES =
ConfigBuilder("spark.kubernetes.executor.pod.snapshotSources")
.doc("Class names of pod snapshot sources implementing " +
"ExecutorPodsCustomSnapshotSource. This is a developer API. Comma separated. " +
"If not specified, the default snapshot sources, ExecutorPodsWatchSnapshotSource" +
"and ExecutorPodsPollingSnapshotSource are used.")
.version("3.1.1")
.stringConf
.toSequence
.createWithDefault(Nil)

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down Expand Up @@ -515,6 +526,25 @@ private[spark] object Config extends Logging {
.createWithDefault(true)


val KUBERNETES_EXECUTOR_LISTER_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.listerPollingInterval")
.doc("Interval between polls against the Kubernetes informer lister to inspect the " +
"state of executors.")
.version("3.1.1")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"informer lister polling interval must be a" +
" positive time value.")
.createWithDefaultString("30s")

val KUBERNETES_EXECUTOR_INFORMER_RESYNC_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.informerResyncInterval")
.doc("Interval between informer cache resync")
.version("3.1.1")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"informer resync interval must be a" +
" positive time value.")
.createWithDefaultString("10m")

val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.annotation.{DeveloperApi, Unstable}

@Unstable
@DeveloperApi
trait ExecutorPodsCustomSnapshotSource extends ExecutorPodsSnapshotSource {
def init(sparkConf: SparkConf, kubernetesClient: KubernetesClient,
snapshotStore: ExecutorPodsSnapshotsStore): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf

abstract class ExecutorPodsInformerCustomSnapshotSource extends ExecutorPodsSnapshotSource {
def init(sparkConf: SparkConf, kubernetesClient: KubernetesClient,
snapshotStore: ExecutorPodsSnapshotsStore,
informerManager: InformerManager): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.ResourceEventHandler

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

class ExecutorPodsInformerSnapshotSource extends ExecutorPodsInformerCustomSnapshotSource {

private var client: KubernetesClient = _
private var store: ExecutorPodsSnapshotsStore = _
private var im: InformerManager = _

def init(conf: SparkConf, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
informerManager: InformerManager): Unit = {
client = kubernetesClient
store = snapshotsStore
im = informerManager
informerManager.getInformer.addEventHandler(new ExecutorPodsInformer)
}

override def start(applicationId: String): Unit = {
im.startInformer()
}

override def stop(): Unit = {
Utils.tryLogNonFatalError {
im.stopInformer()
}
}

private class ExecutorPodsInformer extends ResourceEventHandler[Pod] {
override def onAdd(pod: Pod): Unit = {
logInfo(s"Received add executor pod event for pod named ${pod.getMetadata.getName}")
store.updatePod(pod)
}

override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
logInfo(s"Received update executor pod event for pod named ${newPod.getMetadata.getName}")
store.updatePod(newPod)
}

override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = {
logInfo(s"Received delete executor pod update for pod named ${pod.getMetadata.getName}")
store.updatePod(pod)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters.asScalaBufferConverter

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.cache.Lister

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_LISTER_POLLING_INTERVAL
import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_EXECUTOR_ROLE, SPARK_ROLE_LABEL}
import org.apache.spark.util.{ThreadUtils, Utils}

class ExecutorPodsListerSnapshotSource extends ExecutorPodsInformerCustomSnapshotSource {

private var conf: SparkConf = _
private var store: ExecutorPodsSnapshotsStore = _
private var appId: String = _
private var client: KubernetesClient = _
private var pollingExecutor: ScheduledExecutorService = _
private var pollingFuture: Future[_] = _
private var im: InformerManager = _

override def init(sparkConf: SparkConf, kubernetesClient: KubernetesClient,
snapshotStore: ExecutorPodsSnapshotsStore,
informerManager: InformerManager): Unit = {
logDebug(s"Starting lister for pods with labels $SPARK_APP_ID_LABEL=$appId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
im = informerManager
store = snapshotStore
client = kubernetesClient
pollingExecutor = createExecutorService()
im = informerManager
conf = sparkConf
}

// for testing
def init(sparkConf: SparkConf, kubernetesClient: KubernetesClient,
snapshotStore: ExecutorPodsSnapshotsStore,
informerManager: InformerManager,
pollingExecutor: ScheduledExecutorService): Unit = {
this.pollingExecutor = pollingExecutor
this.init(sparkConf, kubernetesClient, snapshotStore, informerManager)
}

private def createExecutorService(): ScheduledExecutorService = {
if (pollingExecutor == null) {
pollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-lister-sync")
}
pollingExecutor
}

override def start(applicationId: String): Unit = {
appId = applicationId
im.initInformer()
im.startInformer()
val pollingInterval = conf.get(KUBERNETES_EXECUTOR_LISTER_POLLING_INTERVAL)
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(
new Lister(im.getInformer.getIndexer, client.getNamespace)),
pollingInterval,
pollingInterval,
TimeUnit.MILLISECONDS)
}

override def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
pollingFuture = null
}
ThreadUtils.shutdown(pollingExecutor)
im.stopInformer()
}

private class PollRunnable(lister: Lister[Pod]) extends Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
store.replaceSnapshot(lister.list().asScala)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}

/**
Expand All @@ -43,7 +42,7 @@ class ExecutorPodsPollingSnapshotSource(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
pollingExecutor: ScheduledExecutorService) extends Logging {
pollingExecutor: ScheduledExecutorService) extends ExecutorPodsSnapshotSource {

private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.internal.Logging

abstract class ExecutorPodsSnapshotSource extends Logging {
def start(applicationId: String): Unit
def stop(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
Expand All @@ -42,7 +41,7 @@ import org.apache.spark.util.Utils
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient,
conf: SparkConf) extends Logging {
conf: SparkConf) extends ExecutorPodsSnapshotSource {

private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)
Expand Down
Loading