Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 81 additions & 37 deletions pkg/operator/target_config_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"path/filepath"
"reflect"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -43,9 +44,11 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerror "k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -126,7 +129,7 @@ func NewTargetConfigReconciler(
apiRegistrationClient: apiRegistrationClient,
}

_, err := operatorClientInformer.Informer().AddEventHandler(c.eventHandler(queueItem{kind: "kueue"}))
_, err := operatorClientInformer.Informer().AddEventHandler(c.eventHandlerWithStatusFilter(queueItem{kind: "kueue"}))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,28 +168,31 @@ func NewTargetConfigReconciler(
return nil, err
}

return factory.New().WithInformers(
return factory.New().
// for the operator changes
kueueClient.Informer(),
// CRD informer
crdInformer.Apiextensions().V1().CustomResourceDefinitions().Informer(),
// for the deployment and its configmap, secret, daemonsets.
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Apps().V1().Deployments().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().ConfigMaps().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().Secrets().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().ServiceAccounts().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().Services().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().ClusterRoleBindings().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().ClusterRoles().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().RoleBindings().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().Roles().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Networking().V1().NetworkPolicies().Informer(),
kubeInformer.Flowcontrol().V1().FlowSchemas().Informer(),
kubeInformer.Flowcontrol().V1().PriorityLevelConfigurations().Informer(),
apiregistrationInformer.Apiregistration().V1().APIServices().Informer(),
).ResyncEvery(5*time.Minute).
WithInformersQueueKeyFunc(func(runtime.Object) string {
return "kueue-operator"
}, kueueClient.Informer()).
WithInformers(
// CRD informer
crdInformer.Apiextensions().V1().CustomResourceDefinitions().Informer(),
// for the deployment and its configmap, secret, daemonsets.
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we have informers for the operator namespace for validating or mutating webhooks?

I thought they were cluster wide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is supposed to have informers for all the objects being created by the operator to use as an eventer. We might be able to remove the cluster wide ones though.

kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Apps().V1().Deployments().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().ConfigMaps().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().Secrets().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().ServiceAccounts().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Core().V1().Services().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().ClusterRoleBindings().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().ClusterRoles().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().RoleBindings().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Rbac().V1().Roles().Informer(),
kubeInformersForNamespaces.InformersFor(c.operatorNamespace).Networking().V1().NetworkPolicies().Informer(),
kubeInformer.Flowcontrol().V1().FlowSchemas().Informer(),
kubeInformer.Flowcontrol().V1().PriorityLevelConfigurations().Informer(),
apiregistrationInformer.Apiregistration().V1().APIServices().Informer(),
).ResyncEvery(5*time.Minute).
WithSync(c.sync).
ToController("KueueOperator", c.eventRecorder), nil
}
Expand Down Expand Up @@ -463,26 +469,33 @@ func (c TargetConfigReconciler) sync(ctx context.Context, syncCtx factory.SyncCo
specAnnotations["servicemonitor/"+serviceMonitor.GetName()] = serviceMonitor.GetResourceVersion()
}

deployment, _, err := c.manageDeployment(kueue, specAnnotations, ownerReference)
deployment, updated, err := c.manageDeployment(kueue, specAnnotations, ownerReference)
if err != nil {
klog.Error("unable to manage deployment")
return err
}

_, _, err = v1helpers.UpdateStatus(c.ctx, c.kueueClient, func(status *operatorv1.OperatorStatus) error {
resourcemerge.SetDeploymentGeneration(&status.Generations, deployment)
if deployment != nil {
status.ReadyReplicas = deployment.Status.ReadyReplicas
status.Conditions = c.buildOperatorConditions(deployment)
if updated {
_, _, err = v1helpers.UpdateStatus(c.ctx, c.kueueClient, func(status *operatorv1.OperatorStatus) error {
*status = *c.generateStatus(kueue, deployment)
return nil
})
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}

func (c *TargetConfigReconciler) generateStatus(kueue *kueuev1.Kueue, deployment *appsv1.Deployment) *operatorv1.OperatorStatus {
newStatus := kueue.Status.OperatorStatus.DeepCopy()
resourcemerge.SetDeploymentGeneration(&newStatus.Generations, deployment)
newStatus.ObservedGeneration = kueue.GetGeneration()
newStatus.ReadyReplicas = deployment.Status.ReadyReplicas
newStatus.Conditions = c.buildOperatorConditions(deployment)
return newStatus
}

func (c *TargetConfigReconciler) buildOperatorConditions(deployment *appsv1.Deployment) []operatorv1.OperatorCondition {
desired := int32(1)
if deployment.Spec.Replicas != nil {
Expand Down Expand Up @@ -1446,12 +1459,43 @@ func (c *TargetConfigReconciler) manageServiceMonitor(ctx context.Context, kueue
return resourceapply.ApplyServiceMonitor(ctx, c.dynamicClient, c.eventRecorder, required)
}

// eventHandler queues the operator to check spec and status
func (c *TargetConfigReconciler) eventHandler(item queueItem) cache.ResourceEventHandler {
// returns true if ONLY .status changed (i.e., spec the same; generation unchanged)
func (c *TargetConfigReconciler) onlyStatusChanged(oldObj, newObj any) bool {
oldMeta, _ := meta.Accessor(oldObj)
newMeta, _ := meta.Accessor(newObj)

// If generation changed, spec changed → not status-only.
if oldMeta.GetGeneration() != newMeta.GetGeneration() {
return false
}

oldCR := oldObj.(*kueuev1.Kueue)
newCR := newObj.(*kueuev1.Kueue)

// If spec is identical but status differs → status-only update.
if reflect.DeepEqual(oldCR.Spec, newCR.Spec) {
return !reflect.DeepEqual(oldCR.Status, newCR.Status)
}

return false
}

// eventHandlerWithStatusFilter queues the operator to check spec and status
func (c *TargetConfigReconciler) eventHandlerWithStatusFilter(item queueItem) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(item) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(item) },
DeleteFunc: func(obj interface{}) { c.queue.Add(item) },
AddFunc: func(obj any) {
c.queue.Add(item)
},
UpdateFunc: func(old, new any) {
statusChanged := c.onlyStatusChanged(old, new)
if statusChanged {
return
}
c.queue.Add(item)
},
DeleteFunc: func(obj any) {
c.queue.Add(item)
},
}
}

Expand Down