@@ -38,6 +38,7 @@ import (
38
38
v1 "k8s.io/client-go/listers/core/v1"
39
39
"k8s.io/client-go/tools/cache"
40
40
"k8s.io/client-go/tools/record"
41
+ "k8s.io/client-go/util/retry"
41
42
"k8s.io/client-go/util/workqueue"
42
43
43
44
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
@@ -56,7 +57,6 @@ const (
56
57
podAlreadyExistsErrorCode = "code=409"
57
58
queueTokenRefillRate = 50
58
59
queueTokenBucketSize = 500
59
- maximumUpdateRetries = 3
60
60
)
61
61
62
62
var (
@@ -704,46 +704,47 @@ func (c *Controller) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (boo
704
704
if c .batchSchedulerMgr == nil || app .Spec .BatchScheduler == nil || * app .Spec .BatchScheduler == "" {
705
705
return false , nil
706
706
}
707
- if scheduler , err := c .batchSchedulerMgr .GetScheduler (* app .Spec .BatchScheduler ); err != nil {
708
- glog .Errorf ("failed to get batch scheduler from name %s" , * app .Spec .BatchScheduler )
707
+
708
+ scheduler , err := c .batchSchedulerMgr .GetScheduler (* app .Spec .BatchScheduler )
709
+ if err != nil {
710
+ glog .Errorf ("failed to get batch scheduler for name %s" , * app .Spec .BatchScheduler )
709
711
return false , nil
710
- } else {
711
- return scheduler .ShouldSchedule (app ), scheduler
712
712
}
713
+ return scheduler .ShouldSchedule (app ), scheduler
713
714
}
714
715
715
716
func (c * Controller ) updateApplicationStatusWithRetries (
716
717
original * v1beta2.SparkApplication ,
717
718
updateFunc func (status * v1beta2.SparkApplicationStatus )) (* v1beta2.SparkApplication , error ) {
718
719
toUpdate := original .DeepCopy ()
719
-
720
- var lastUpdateErr error
721
- for i := 0 ; i < maximumUpdateRetries ; i ++ {
720
+ updateErr := wait .ExponentialBackoff (retry .DefaultBackoff , func () (ok bool , err error ) {
722
721
updateFunc (& toUpdate .Status )
723
722
if equality .Semantic .DeepEqual (original .Status , toUpdate .Status ) {
724
- return toUpdate , nil
723
+ return true , nil
725
724
}
726
- _ , err := c .crdClient .SparkoperatorV1beta2 ().SparkApplications (toUpdate .Namespace ).Update (toUpdate )
725
+
726
+ toUpdate , err = c .crdClient .SparkoperatorV1beta2 ().SparkApplications (original .Namespace ).Update (toUpdate )
727
727
if err == nil {
728
- return toUpdate , nil
728
+ return true , nil
729
+ }
730
+ if ! errors .IsConflict (err ) {
731
+ return false , err
729
732
}
730
733
731
- lastUpdateErr = err
732
-
733
- // Failed to update to the API server.
734
- // Get the latest version from the API server first and re-apply the update.
735
- name := toUpdate .Name
736
- toUpdate , err = c .crdClient .SparkoperatorV1beta2 ().SparkApplications (toUpdate .Namespace ).Get (name ,
737
- metav1.GetOptions {})
734
+ // There was a conflict updating the SparkApplication, fetch the latest version from the API server.
735
+ toUpdate , err = c .crdClient .SparkoperatorV1beta2 ().SparkApplications (original .Namespace ).Get (original .Name , metav1.GetOptions {})
738
736
if err != nil {
739
- glog .Errorf ("failed to get SparkApplication %s/%s: %v" , original .Namespace , name , err )
740
- return nil , err
737
+ glog .Errorf ("failed to get SparkApplication %s/%s: %v" , original .Namespace , original . Name , err )
738
+ return false , err
741
739
}
742
- }
743
740
744
- if lastUpdateErr != nil {
745
- glog .Errorf ("failed to update SparkApplication %s/%s: %v" , original .Namespace , original .Name , lastUpdateErr )
746
- return nil , lastUpdateErr
741
+ // Retry with the latest version.
742
+ return false , nil
743
+ })
744
+
745
+ if updateErr != nil {
746
+ glog .Errorf ("failed to update SparkApplication %s/%s: %v" , original .Namespace , original .Name , updateErr )
747
+ return nil , updateErr
747
748
}
748
749
749
750
return toUpdate , nil
0 commit comments