Skip to content

Commit 62db1d6

Browse files
authored
Merge pull request #340 from liyinan926/update-fix
Fixed SparkApplication update in updateAndExportMetrics
2 parents 7c4d9be + aa53cf4 commit 62db1d6

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

pkg/controller/sparkapplication/controller.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (c *Controller) onUpdate(oldObj, newObj interface{}) {
183183
// and end up in an inconsistent state.
184184
if !reflect.DeepEqual(oldApp.Spec, newApp.Spec) {
185185
// Force-set the application status to Invalidating which handles clean-up and application re-run.
186-
if _, err := c.updateSparkApplicationStatusWithRetries(newApp, func(status *v1alpha1.SparkApplicationStatus) {
186+
if _, err := c.updateApplicationStatusWithRetries(newApp, func(status *v1alpha1.SparkApplicationStatus) {
187187
status.AppState.State = v1alpha1.InvalidatingState
188188
}); err != nil {
189189
c.recorder.Eventf(
@@ -511,7 +511,7 @@ func (c *Controller) syncSparkApplication(key string) error {
511511

512512
if appToUpdate != nil {
513513
glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status)
514-
err = c.updateAppAndExportMetrics(app, appToUpdate)
514+
err = c.updateApplicationAndExportMetrics(app, appToUpdate)
515515
if err != nil {
516516
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
517517
return err
@@ -612,7 +612,7 @@ func (c *Controller) submitSparkApplication(app *v1alpha1.SparkApplication) *v1a
612612
return app
613613
}
614614

615-
func (c *Controller) updateSparkApplicationStatusWithRetries(
615+
func (c *Controller) updateApplicationStatusWithRetries(
616616
original *v1alpha1.SparkApplication,
617617
updateFunc func(status *v1alpha1.SparkApplicationStatus)) (*v1alpha1.SparkApplication, error) {
618618
toUpdate := original.DeepCopy()
@@ -649,19 +649,31 @@ func (c *Controller) updateSparkApplicationStatusWithRetries(
649649
return toUpdate, nil
650650
}
651651

652-
func (c *Controller) updateAppAndExportMetrics(oldApp, newApp *v1alpha1.SparkApplication) error {
652+
func (c *Controller) updateApplicationAndExportMetrics(oldApp, newApp *v1alpha1.SparkApplication) error {
653653
// Skip update if nothing changed.
654654
if reflect.DeepEqual(oldApp, newApp) {
655655
return nil
656656
}
657657

658-
app, err := c.updateSparkApplicationStatusWithRetries(oldApp, func(status *v1alpha1.SparkApplicationStatus) {
659-
*status = newApp.Status
660-
})
658+
updatedApp := newApp
659+
var err error
660+
for i := 0; i < maximumUpdateRetries; i++ {
661+
updatedApp, err = c.crdClient.SparkoperatorV1alpha1().SparkApplications(newApp.Namespace).Update(updatedApp)
662+
if err == nil {
663+
break
664+
}
665+
updatedApp, err = c.crdClient.SparkoperatorV1alpha1().SparkApplications(newApp.Namespace).Get(newApp.Name, metav1.GetOptions{})
666+
if err == nil {
667+
updatedApp.Finalizers = newApp.Finalizers
668+
updatedApp.Status = newApp.Status
669+
}
670+
}
671+
661672
// Export metrics if the update was successful.
662-
if err == nil && c.metrics != nil {
663-
c.metrics.exportMetrics(oldApp, app)
673+
if updatedApp != nil && c.metrics != nil {
674+
c.metrics.exportMetrics(oldApp, updatedApp)
664675
}
676+
665677
return err
666678
}
667679

0 commit comments

Comments
 (0)