Skip to content

Commit 97d4b14

Browse files
authored
Merge pull request #366 from liyinan926/ssa-fix
Fixed the controller logic for ScheduledSparkApplications
2 parents 8a84d6f + b867cc6 commit 97d4b14

File tree

6 files changed

+175
-214
lines changed

6 files changed

+175
-214
lines changed

manifest/spark-operator-with-metrics.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ spec:
4747
- name: sparkoperator
4848
image: gcr.io/spark-operator/spark-operator:v2.4.0-v1alpha1-latest
4949
imagePullPolicy: Always
50-
command: ["/usr/bin/spark-operator"]
5150
ports:
5251
- containerPort: 10254
5352
args:

manifest/spark-operator-with-webhook.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ spec:
5151
- name: webhook-certs
5252
mountPath: /etc/webhook-certs
5353
ports:
54-
- containerPort: 8080
55-
command: ["/usr/bin/spark-operator"]
54+
- containerPort: 8080
5655
args:
5756
- -logtostderr
5857
- -enable-webhook=true

pkg/config/constants.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ const (
6161
AffinityAnnotation = LabelAnnotationPrefix + "affinity"
6262
// SparkAppNameLabel is the name of the label for the SparkApplication object name.
6363
SparkAppNameLabel = LabelAnnotationPrefix + "app-name"
64+
// ScheduledSparkAppNameLabel is the name of the label for the ScheduledSparkApplication object name.
65+
ScheduledSparkAppNameLabel = LabelAnnotationPrefix + "scheduled-app-name"
6466
// LaunchedBySparkOperatorLabel is a label on Spark pods launched through the Spark Operator.
6567
LaunchedBySparkOperatorLabel = LabelAnnotationPrefix + "launched-by-spark-operator"
66-
6768
// SparkApplicationSelectorLabel is the AppID set by the spark-distribution on the driver/executors Pods.
6869
SparkApplicationSelectorLabel = "spark-app-selector"
6970
// SparkRoleLabel is the driver/executor label set by the operator/spark-distribution on the driver/executors Pods.
7071
SparkRoleLabel = "spark-role"
71-
7272
// TolerationsAnnotationPrefix is the prefix of annotations that specify a Toleration.
7373
TolerationsAnnotationPrefix = LabelAnnotationPrefix + "tolerations."
7474
)

pkg/controller/scheduledsparkapplication/controller.go

Lines changed: 78 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ package scheduledsparkapplication
1919
import (
2020
"fmt"
2121
"reflect"
22+
"sort"
2223
"time"
2324

2425
"github.com/golang/glog"
2526
"github.com/robfig/cron"
2627

2728
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
28-
"k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/labels"
3031
"k8s.io/apimachinery/pkg/util/clock"
3132
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3233
"k8s.io/apimachinery/pkg/util/wait"
@@ -41,6 +42,7 @@ import (
4142
crdscheme "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/scheme"
4243
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
4344
crdlisters "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/listers/sparkoperator.k8s.io/v1alpha1"
45+
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
4446
)
4547

4648
var (
@@ -176,38 +178,26 @@ func (c *Controller) syncScheduledSparkApplication(key string) error {
176178
status.NextRun = metav1.NewTime(nextRunTime)
177179
}
178180
if nextRunTime.Before(now) {
179-
// The next run is due. Check if this is the first run of the application.
180-
if status.LastRunName == "" {
181-
// This is the first run of the application.
182-
if err = c.startNextRun(app, status, schedule); err != nil {
183-
return err
184-
}
185-
} else {
186-
// Check if the condition for starting the next run is satisfied.
187-
ok, err := c.shouldStartNextRun(app)
188-
if err != nil {
189-
return err
190-
}
191-
if ok {
192-
// Start the next run if the condition is satisfied.
193-
if err = c.startNextRun(app, status, schedule); err != nil {
194-
return err
195-
}
196-
} else {
197-
// Otherwise, check and update past runs.
198-
if err = c.checkAndUpdatePastRuns(app, status); err != nil {
199-
return err
200-
}
201-
}
181+
// Check if the condition for starting the next run is satisfied.
182+
ok, err := c.shouldStartNextRun(app)
183+
if err != nil {
184+
return err
202185
}
203-
} else {
204-
// The next run is not due yet, check and update past runs.
205-
if status.LastRunName != "" {
206-
if err = c.checkAndUpdatePastRuns(app, status); err != nil {
186+
if ok {
187+
glog.Infof("Next run of ScheduledSparkApplication %s/%s is due, creating a new SparkApplication instance", app.Namespace, app.Name)
188+
name, err := c.startNextRun(app, now)
189+
if err != nil {
207190
return err
208191
}
192+
status.LastRun = metav1.NewTime(now)
193+
status.NextRun = metav1.NewTime(schedule.Next(status.LastRun.Time))
194+
status.LastRunName = name
209195
}
210196
}
197+
198+
if err = c.checkAndUpdatePastRuns(app, status); err != nil {
199+
return err
200+
}
211201
}
212202

213203
return c.updateScheduledSparkApplicationStatus(app, status)
@@ -257,7 +247,11 @@ func (c *Controller) createSparkApplication(
257247
Name: scheduledApp.Name,
258248
UID: scheduledApp.UID,
259249
})
260-
app.ObjectMeta.SetLabels(scheduledApp.GetLabels())
250+
app.ObjectMeta.Labels = make(map[string]string)
251+
for key, value := range scheduledApp.Labels {
252+
app.ObjectMeta.Labels[key] = value
253+
}
254+
app.ObjectMeta.Labels[config.ScheduledSparkAppNameLabel] = scheduledApp.Name
261255
_, err := c.crdClient.SparkoperatorV1alpha1().SparkApplications(scheduledApp.Namespace).Create(app)
262256
if err != nil {
263257
return "", err
@@ -266,69 +260,52 @@ func (c *Controller) createSparkApplication(
266260
}
267261

268262
func (c *Controller) shouldStartNextRun(app *v1alpha1.ScheduledSparkApplication) (bool, error) {
263+
sortedApps, err := c.listSparkApplications(app)
264+
if err != nil {
265+
return false, err
266+
}
267+
if len(sortedApps) == 0 {
268+
return true, nil
269+
}
270+
271+
// The last run (most recently started) is the first one in the sorted slice.
272+
lastRun := sortedApps[0]
269273
switch app.Spec.ConcurrencyPolicy {
270274
case v1alpha1.ConcurrencyAllow:
271275
return true, nil
272276
case v1alpha1.ConcurrencyForbid:
273-
finished, _, err := c.hasLastRunFinished(app.Namespace, app.Status.LastRunName)
274-
if err != nil {
275-
return false, err
276-
}
277-
return finished, nil
277+
return c.hasLastRunFinished(lastRun), nil
278278
case v1alpha1.ConcurrencyReplace:
279-
if err := c.killLastRunIfNotFinished(app.Namespace, app.Status.LastRunName); err != nil {
279+
if err := c.killLastRunIfNotFinished(lastRun); err != nil {
280280
return false, err
281281
}
282282
return true, nil
283283
}
284284
return true, nil
285285
}
286286

287-
func (c *Controller) startNextRun(
288-
app *v1alpha1.ScheduledSparkApplication,
289-
status *v1alpha1.ScheduledSparkApplicationStatus,
290-
schedule cron.Schedule) error {
291-
glog.Infof("Next run of ScheduledSparkApplication %s/%s is due, creating a new SparkApplication instance", app.Namespace, app.Name)
292-
now := c.clock.Now()
287+
func (c *Controller) startNextRun(app *v1alpha1.ScheduledSparkApplication, now time.Time) (string, error) {
293288
name, err := c.createSparkApplication(app, now)
294289
if err != nil {
295290
glog.Errorf("failed to create a SparkApplication instance for ScheduledSparkApplication %s/%s: %v", app.Namespace, app.Name, err)
296-
return err
291+
return "", err
297292
}
298-
299-
status.LastRun = metav1.NewTime(now)
300-
status.NextRun = metav1.NewTime(schedule.Next(status.LastRun.Time))
301-
status.LastRunName = name
302-
return nil
293+
return name, nil
303294
}
304295

305-
func (c *Controller) hasLastRunFinished(
306-
namespace string,
307-
lastRunName string) (bool, *v1alpha1.SparkApplication, error) {
308-
app, err := c.saLister.SparkApplications(namespace).Get(lastRunName)
309-
if err != nil {
310-
// The SparkApplication of the last run may have been deleted already (e.g., manually by the user).
311-
if errors.IsNotFound(err) {
312-
return true, nil, nil
313-
}
314-
return false, nil, err
315-
}
316-
296+
func (c *Controller) hasLastRunFinished(app *v1alpha1.SparkApplication) bool {
317297
return app.Status.AppState.State == v1alpha1.CompletedState ||
318-
app.Status.AppState.State == v1alpha1.FailedState, app, nil
298+
app.Status.AppState.State == v1alpha1.FailedState
319299
}
320300

321-
func (c *Controller) killLastRunIfNotFinished(namespace string, lastRunName string) error {
322-
finished, app, err := c.hasLastRunFinished(namespace, lastRunName)
323-
if err != nil {
324-
return err
325-
}
326-
if app == nil || finished {
301+
func (c *Controller) killLastRunIfNotFinished(app *v1alpha1.SparkApplication) error {
302+
finished := c.hasLastRunFinished(app)
303+
if finished {
327304
return nil
328305
}
329306

330307
// Delete the SparkApplication object of the last run.
331-
if err = c.crdClient.SparkoperatorV1alpha1().SparkApplications(namespace).Delete(lastRunName,
308+
if err := c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(app.Name,
332309
metav1.NewDeleteOptions(0)); err != nil {
333310
return err
334311
}
@@ -339,34 +316,29 @@ func (c *Controller) killLastRunIfNotFinished(namespace string, lastRunName stri
339316
func (c *Controller) checkAndUpdatePastRuns(
340317
app *v1alpha1.ScheduledSparkApplication,
341318
status *v1alpha1.ScheduledSparkApplicationStatus) error {
342-
lastRunName := status.LastRunName
343-
lastRunApp, err := c.saLister.SparkApplications(app.Namespace).Get(lastRunName)
319+
sortedApps, err := c.listSparkApplications(app)
344320
if err != nil {
345-
// The SparkApplication of the last run may have been deleted already (e.g., manually by the user).
346-
if errors.IsNotFound(err) {
347-
return nil
348-
}
349321
return err
350322
}
351323

352-
var toDelete []string
353-
if lastRunApp.Status.AppState.State == v1alpha1.CompletedState {
354-
limit := 1
355-
if app.Spec.SuccessfulRunHistoryLimit != nil {
356-
limit = int(*app.Spec.SuccessfulRunHistoryLimit)
357-
}
358-
status.PastSuccessfulRunNames, toDelete = recordPastRuns(status.PastSuccessfulRunNames, lastRunName, limit)
359-
} else if lastRunApp.Status.AppState.State == v1alpha1.FailedState {
360-
limit := 1
361-
if app.Spec.FailedRunHistoryLimit != nil {
362-
limit = int(*app.Spec.FailedRunHistoryLimit)
324+
var completedRuns []string
325+
var failedRuns []string
326+
for _, a := range sortedApps {
327+
if a.Status.AppState.State == v1alpha1.CompletedState {
328+
completedRuns = append(completedRuns, a.Name)
329+
} else if a.Status.AppState.State == v1alpha1.FailedState {
330+
failedRuns = append(failedRuns, a.Name)
363331
}
364-
status.PastFailedRunNames, toDelete = recordPastRuns(status.PastFailedRunNames, lastRunName, limit)
365332
}
366333

334+
var toDelete []string
335+
status.PastSuccessfulRunNames, toDelete = bookkeepPastRuns(completedRuns, app.Spec.SuccessfulRunHistoryLimit)
367336
for _, name := range toDelete {
368-
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name,
369-
metav1.NewDeleteOptions(0))
337+
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name, metav1.NewDeleteOptions(0))
338+
}
339+
status.PastFailedRunNames, toDelete = bookkeepPastRuns(failedRuns, app.Spec.FailedRunHistoryLimit)
340+
for _, name := range toDelete {
341+
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name, metav1.NewDeleteOptions(0))
370342
}
371343

372344
return nil
@@ -400,19 +372,28 @@ func (c *Controller) updateScheduledSparkApplicationStatus(
400372
})
401373
}
402374

403-
func recordPastRuns(names []string, newName string, limit int) (updatedNames []string, toDelete []string) {
404-
if len(names) > 0 && names[0] == newName {
405-
// The name has already been recorded.
406-
return names, nil
375+
func (c *Controller) listSparkApplications(app *v1alpha1.ScheduledSparkApplication) (sparkApps, error) {
376+
set := labels.Set{config.ScheduledSparkAppNameLabel: app.Name}
377+
apps, err := c.saLister.SparkApplications(app.Namespace).List(set.AsSelector())
378+
if err != nil {
379+
return nil, fmt.Errorf("failed to list SparkApplications: %v", err)
407380
}
381+
sortedApps := sparkApps(apps)
382+
sort.Sort(sortedApps)
383+
return sortedApps, nil
384+
}
408385

409-
rest := names
410-
if len(names) >= limit {
411-
rest = names[:limit-1]
412-
toDelete = names[limit-1:]
386+
func bookkeepPastRuns(names []string, runLimit *int32) (toKeep []string, toDelete []string) {
387+
limit := 1
388+
if runLimit != nil {
389+
limit = int(*runLimit)
390+
}
391+
392+
if len(names) <= limit {
393+
return names, nil
413394
}
414-
// Pre-append the name of the latest run.
415-
updatedNames = append([]string{newName}, rest...)
395+
toKeep = names[:limit]
396+
toDelete = names[limit:]
416397
return
417398
}
418399

0 commit comments

Comments
 (0)