diff --git a/charts/fleet-crd/templates/crds.yaml b/charts/fleet-crd/templates/crds.yaml index 38aa2b8a24..043354c021 100644 --- a/charts/fleet-crd/templates/crds.yaml +++ b/charts/fleet-crd/templates/crds.yaml @@ -6475,7 +6475,6 @@ spec: commit: description: Commit is the Git commit hash from the last git job run. - nullable: true type: string conditions: description: 'Conditions is a list of Wrangler conditions that describe @@ -6890,6 +6889,10 @@ spec: spec.forceSyncGeneration is set format: int64 type: integer + webhookCommit: + description: WebhookCommit is the latest Git commit hash received + from a webhook + type: string type: object type: object served: true diff --git a/e2e/single-cluster/gitrepo_test.go b/e2e/single-cluster/gitrepo_test.go index fb82655b6b..14102801f7 100644 --- a/e2e/single-cluster/gitrepo_test.go +++ b/e2e/single-cluster/gitrepo_test.go @@ -302,6 +302,7 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup" By("updating the gitrepo's status") expectedStatus := fleet.GitRepoStatus{ Commit: commit, + WebhookCommit: commit, ReadyClusters: 1, DesiredReadyClusters: 1, GitJobStatus: "Current", @@ -425,6 +426,7 @@ func (matcher *gitRepoStatusMatcher) Match(actual interface{}) (success bool, er } return got.Commit == want.Commit && + got.WebhookCommit == want.WebhookCommit && got.ReadyClusters == want.ReadyClusters && got.DesiredReadyClusters == want.DesiredReadyClusters && got.GitJobStatus == want.GitJobStatus && diff --git a/integrationtests/gitjob/controller/controller_test.go b/integrationtests/gitjob/controller/controller_test.go index 8e63ca50da..66aa626e10 100644 --- a/integrationtests/gitjob/controller/controller_test.go +++ b/integrationtests/gitjob/controller/controller_test.go @@ -173,9 +173,10 @@ var _ = Describe("GitJob controller", func() { g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue()) }).Should(Succeed()) - // it should log 2 events + // it should log 3 events // first one is to log the new commit from the poller // second one is to inform that the job was created + // third one is to inform that the job was deleted because it succeeded Eventually(func(g Gomega) { events, _ := k8sClientSet.CoreV1().Events(gitRepo.Namespace).List(context.TODO(), metav1.ListOptions{ @@ -183,7 +184,7 @@ var _ = Describe("GitJob controller", func() { TypeMeta: metav1.TypeMeta{Kind: "GitRepo"}, }) g.Expect(events).ToNot(BeNil()) - g.Expect(len(events.Items)).To(Equal(2)) + g.Expect(len(events.Items)).To(Equal(3)) g.Expect(events.Items[0].Reason).To(Equal("GotNewCommit")) g.Expect(events.Items[0].Message).To(Equal("9ca3a0ad308ed8bffa6602572e2a1343af9c3d2e")) g.Expect(events.Items[0].Type).To(Equal("Normal")) @@ -192,7 +193,17 @@ var _ = Describe("GitJob controller", func() { g.Expect(events.Items[1].Message).To(Equal("GitJob was created")) g.Expect(events.Items[1].Type).To(Equal("Normal")) g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller")) + g.Expect(events.Items[2].Reason).To(Equal("JobDeleted")) + g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded")) + g.Expect(events.Items[2].Type).To(Equal("Normal")) + g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller")) }).Should(Succeed()) + + // job should not be present + Consistently(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }, 10*time.Second, 1*time.Second).Should(BeTrue()) }) }) @@ -312,13 +323,6 @@ var _ = Describe("GitJob controller", func() { g.Expect(checkCondition(&gitRepo, "Ready", corev1.ConditionTrue, "")).To(BeTrue()) g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue()) }).Should(Succeed()) - - By("verifying that the job is deleted if Spec.Generation changed") - Expect(simulateIncreaseGitRepoGeneration(gitRepo)).ToNot(HaveOccurred()) - Eventually(func() bool { - jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) - return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)) - }).Should(BeTrue()) }) }) }) @@ -373,9 +377,51 @@ var _ = Describe("GitJob controller", func() { jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) }).Should(Not(HaveOccurred())) + // simulate job was successful + Eventually(func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + // We could be checking this when the job is still not created + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + job.Status.Succeeded = 1 + job.Status.Conditions = []batchv1.JobCondition{ + { + Type: "Complete", + Status: "True", + }, + } + return k8sClient.Status().Update(ctx, &job) + }).Should(Not(HaveOccurred())) + // wait until the job has finished + Eventually(func() bool { + jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }).Should(BeTrue()) + // store the generation value to compare against later generationValue = gitRepo.Spec.ForceSyncGeneration Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred()) + // simulate job was successful + Eventually(func() error { + jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + // We could be checking this when the job is still not created + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + job.Status.Succeeded = 1 + job.Status.Conditions = []batchv1.JobCondition{ + { + Type: "Complete", + Status: "True", + }, + } + return k8sClient.Status().Update(ctx, &job) + }).Should(Not(HaveOccurred())) + // wait until the job has finished + Eventually(func() bool { + jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }).Should(BeTrue()) }) BeforeEach(func() { expectedCommit = commit @@ -414,7 +460,7 @@ var _ = Describe("GitJob controller", func() { g.Expect(events.Items[1].Type).To(Equal("Normal")) g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller")) g.Expect(events.Items[2].Reason).To(Equal("JobDeleted")) - g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because of ForceUpdateGeneration")) + g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded")) g.Expect(events.Items[2].Type).To(Equal("Normal")) g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller")) }).Should(Succeed()) @@ -446,6 +492,26 @@ var _ = Describe("GitJob controller", func() { jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) }).Should(Not(HaveOccurred())) + // simulate job was successful + Eventually(func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + // We could be checking this when the job is still not created + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + job.Status.Succeeded = 1 + job.Status.Conditions = []batchv1.JobCondition{ + { + Type: "Complete", + Status: "True", + }, + } + return k8sClient.Status().Update(ctx, &job) + }).Should(Not(HaveOccurred())) + // wait until the job has finished + Eventually(func() bool { + jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }).Should(BeTrue()) // change a gitrepo field, this will change the Generation field. This simulates changing fleet apply parameters. Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index c64b275483..19a1db9c97 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -102,7 +102,6 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { ), ), ). - Owns(&batchv1.Job{}). Watches( // Fan out from bundle to gitrepo &v1alpha1.Bundle{}, @@ -190,12 +189,17 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } else if repoPolled && oldCommit != gitrepo.Status.Commit { r.Recorder.Event(gitrepo, fleetevent.Normal, "GotNewCommit", gitrepo.Status.Commit) } + + // check for webhook commit + if gitrepo.Status.WebhookCommit != "" && gitrepo.Status.WebhookCommit != gitrepo.Status.Commit { + gitrepo.Status.Commit = gitrepo.Status.WebhookCommit + } // From this point onwards we have to take into account if the poller // task was executed. // If so, we need to return a Result with EnqueueAfter set. res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled) - if err != nil { + if err != nil || res.Requeue { return res, err } @@ -261,30 +265,26 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, } } - if gitrepo.Status.Commit != "" { + if r.shouldCreateJob(gitrepo, oldCommit) { + r.updateGenerationValuesIfNeeded(gitrepo) if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil { r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error()) return result(repoPolled, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) } - logger.V(1).Info("Creating Git job resources") - if err := r.createJobRBAC(ctx, gitrepo); err != nil { - return result(repoPolled, gitrepo), fmt.Errorf("failed to create RBAC resources for git job: %w", err) - } - if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil { - return result(repoPolled, gitrepo), fmt.Errorf("failed to create targets config map for git job: %w", err) - } - if err := r.createCABundleSecret(ctx, gitrepo); err != nil { - return result(repoPolled, gitrepo), fmt.Errorf("failed to create cabundle secret for git job: %w", err) - } - if err := r.createJob(ctx, gitrepo); err != nil { - return result(repoPolled, gitrepo), fmt.Errorf("error creating git job: %w", err) + if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil { + return result(repoPolled, gitrepo), err } - r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created") } - } else if gitrepo.Status.Commit != "" { - if err = r.deleteJobIfNeeded(ctx, gitrepo, &job); err != nil { + } else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit { + err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job) + if err != nil { return result(repoPolled, gitrepo), fmt.Errorf("error deleting git job: %w", err) } + // job was deleted and we need to recreate it + // Requeue so the reconciler creates the job again + if recreateGitJob { + return reconcile.Result{Requeue: true}, nil + } } gitrepo.Status.ObservedGeneration = gitrepo.Generation @@ -330,6 +330,37 @@ func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logge return nil } +// shouldCreateJob checks if the conditions to create a new job are met. +// It checks for all the conditions so, in case more than one is met, it sets all the +// values related in one single reconciler loop +func (r *GitJobReconciler) shouldCreateJob(gitrepo *v1alpha1.GitRepo, oldCommit string) bool { + if gitrepo.Status.Commit != "" && gitrepo.Status.Commit != oldCommit { + return true + } + + if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration { + return true + } + + // k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed. + // Avoid deleting the job twice + if generationChanged(gitrepo) { + return true + } + + return false +} + +func (r *GitJobReconciler) updateGenerationValuesIfNeeded(gitrepo *v1alpha1.GitRepo) { + if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration { + gitrepo.Status.UpdateGeneration = gitrepo.Spec.ForceSyncGeneration + } + + if generationChanged(gitrepo) { + gitrepo.Status.ObservedGeneration = gitrepo.Generation + } +} + func (r *GitJobReconciler) addGitRepoFinalizer(ctx context.Context, nsName types.NamespacedName) error { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { gitrepo := &v1alpha1.GitRepo{} @@ -442,38 +473,60 @@ func (r *GitJobReconciler) createJob(ctx context.Context, gitRepo *v1alpha1.GitR return r.Create(ctx, job) } -func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) error { +func (r *GitJobReconciler) createJobAndResources(ctx context.Context, gitrepo *v1alpha1.GitRepo, logger logr.Logger) error { + logger.V(1).Info("Creating Git job resources") + if err := r.createJobRBAC(ctx, gitrepo); err != nil { + return fmt.Errorf("failed to create RBAC resources for git job: %w", err) + } + if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil { + return fmt.Errorf("failed to create targets config map for git job: %w", err) + } + if err := r.createCABundleSecret(ctx, gitrepo); err != nil { + return fmt.Errorf("failed to create cabundle secret for git job: %w", err) + } + if err := r.createJob(ctx, gitrepo); err != nil { + return fmt.Errorf("error creating git job: %w", err) + } + r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created") + return nil +} + +func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) (error, bool) { logger := log.FromContext(ctx) - jobDeleted := false - jobDeletedMessage := "" - // if force delete is set, delete the job to make sure a new job is created + + // the following cases imply that the job is still running but we need to stop it and + // create a new one if gitRepo.Spec.ForceSyncGeneration != gitRepo.Status.UpdateGeneration { - gitRepo.Status.UpdateGeneration = gitRepo.Spec.ForceSyncGeneration - jobDeletedMessage = "job deletion triggered because of ForceUpdateGeneration" + jobDeletedMessage := "job deletion triggered because of ForceUpdateGeneration" logger.Info(jobDeletedMessage) if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { - return err + return err, true } - jobDeleted = true + return nil, true } // k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed. // Avoid deleting the job twice - if !jobDeleted && generationChanged(gitRepo) { - jobDeletedMessage = "job deletion triggered because of generation change" - gitRepo.Status.ObservedGeneration = gitRepo.Generation + if generationChanged(gitRepo) { + jobDeletedMessage := "job deletion triggered because of generation change" logger.Info(jobDeletedMessage) if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { - return err + return err, true } - jobDeleted = true + return nil, true } - if jobDeleted { + // check if the job finished and was successful + if job.Status.Succeeded == 1 { + jobDeletedMessage := "job deletion triggered because job succeeded" + logger.Info(jobDeletedMessage) + if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { + return err, false + } r.Recorder.Event(gitRepo, fleetevent.Normal, "JobDeleted", jobDeletedMessage) } - return nil + return nil, false } func generationChanged(r *v1alpha1.GitRepo) bool { diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go index 74467ac5f1..4d3ad43856 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go @@ -168,8 +168,11 @@ type GitRepoStatus struct { // Update generation is the force update generation if spec.forceSyncGeneration is set UpdateGeneration int64 `json:"updateGeneration,omitempty"` // Commit is the Git commit hash from the last git job run. - // +nullable + // +optional Commit string `json:"commit,omitempty"` + // WebhookCommit is the latest Git commit hash received from a webhook + // +optional + WebhookCommit string `json:"webhookCommit,omitempty"` // ReadyClusters is the lowest number of clusters that are ready over // all the bundles of this GitRepo. // +optional diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 4894a36974..414d3d2189 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -222,7 +222,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } - if gitrepo.Status.Commit != revision && revision != "" { + if gitrepo.Status.WebhookCommit != revision && revision != "" { if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { var gitRepoFromCluster v1alpha1.GitRepo err := w.client.Get( @@ -235,7 +235,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) { if err != nil { return err } - gitRepoFromCluster.Status.Commit = revision + gitRepoFromCluster.Status.WebhookCommit = revision // if PollingInterval is not set and webhook is configured, set it to 1 hour if gitrepo.Spec.PollingInterval == nil { gitRepoFromCluster.Spec.PollingInterval = &metav1.Duration{ diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index 80b8f3fc07..f8db5e1249 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -95,8 +95,8 @@ func TestAzureDevopsWebhook(t *testing.T) { if err != nil { t.Errorf("unexpected err %v", err) } - if updatedGitRepo.Status.Commit != commit { - t.Errorf("expected commit %v, but got %v", commit, updatedGitRepo.Status.Commit) + if updatedGitRepo.Status.WebhookCommit != commit { + t.Errorf("expected webhook commit %v, but got %v", commit, updatedGitRepo.Status.WebhookCommit) } } @@ -141,8 +141,8 @@ func TestAzureDevopsWebhookWithSSHURL(t *testing.T) { if err != nil { t.Errorf("unexpected err %v", err) } - if updatedGitRepo.Status.Commit != commit { - t.Errorf("expected commit %v, but got %v", commit, updatedGitRepo.Status.Commit) + if updatedGitRepo.Status.WebhookCommit != commit { + t.Errorf("expected webhook commit %v, but got %v", commit, updatedGitRepo.Status.WebhookCommit) } } @@ -387,8 +387,8 @@ func TestGitHubRightSecretAndCommitUpdated(t *testing.T) { statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( func(ctx context.Context, repo *v1alpha1.GitRepo, opts ...interface{}) { // check that the commit is the expected one - if repo.Status.Commit != expectedCommit { - t.Errorf("expecting girepo commit %s, got %s", expectedCommit, repo.Status.Commit) + if repo.Status.WebhookCommit != expectedCommit { + t.Errorf("expecting girepo webhook commit %s, got %s", expectedCommit, repo.Status.WebhookCommit) } }, ).Times(1)