Skip to content

Commit c18349b

Browse files
authored
Merge pull request #458 from aramase/load
feat: add filtered watch for reconcile
2 parents cae7e8f + 14480c8 commit c18349b

File tree

14 files changed

+1537
-225
lines changed

14 files changed

+1537
-225
lines changed

cmd/secrets-store-csi-driver/main.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,19 @@ import (
2525
"strings"
2626
"time"
2727

28+
"sigs.k8s.io/secrets-store-csi-driver/pkg/cache"
2829
"sigs.k8s.io/secrets-store-csi-driver/pkg/metrics"
2930
"sigs.k8s.io/secrets-store-csi-driver/pkg/rotation"
3031
"sigs.k8s.io/secrets-store-csi-driver/pkg/version"
3132

33+
"k8s.io/apimachinery/pkg/fields"
3234
"k8s.io/apimachinery/pkg/runtime"
35+
"k8s.io/apimachinery/pkg/runtime/schema"
3336
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3437
json "k8s.io/component-base/logs/json"
3538
"k8s.io/klog/v2"
3639

3740
ctrl "sigs.k8s.io/controller-runtime"
38-
"sigs.k8s.io/controller-runtime/pkg/client"
39-
"sigs.k8s.io/controller-runtime/pkg/client/config"
4041

4142
"sigs.k8s.io/secrets-store-csi-driver/apis/v1alpha1"
4243
"sigs.k8s.io/secrets-store-csi-driver/controllers"
@@ -52,7 +53,6 @@ var (
5253
logFormatJSON = flag.Bool("log-format-json", false, "set log formatter to json")
5354
providerVolumePath = flag.String("provider-volume", "/etc/kubernetes/secrets-store-csi-providers", "Volume path for provider")
5455
// this will be removed in a future release
55-
_ = flag.String("min-provider-version", "", "[DEPRECATED] set minimum supported provider versions with current driver")
5656
metricsAddr = flag.String("metrics-addr", ":8095", "The address the metric endpoint binds to")
5757
// grpcSupportedProviders is a ; separated string that can contain a list of providers. The reason it's a string is to allow scenarios
5858
// where the driver is being used with 2 providers, one which supports grpc and other using binary for provider.
@@ -62,6 +62,11 @@ var (
6262
enableProfile = flag.Bool("enable-pprof", false, "enable pprof profiling")
6363
profilePort = flag.Int("pprof-port", 6065, "port for pprof profiling")
6464

65+
// enable filtered watch for NodePublishSecretRef secrets. The filtering is done on the csi driver label: secrets-store.csi.k8s.io/used=true
66+
// For Kubernetes secrets used to provide credentials for use with the CSI driver, set the label by running: kubectl label secret secrets-store-creds secrets-store.csi.k8s.io/used=true
67+
// This feature flag will be enabled by default after n+2 releases giving time for users to label all their existing credential secrets.
68+
filteredWatchSecret = flag.Bool("filtered-watch-secret", false, "enable filtered watch for NodePublishSecretRef secrets with label secrets-store.csi.k8s.io/used=true")
69+
6570
scheme = runtime.NewScheme()
6671
)
6772

@@ -90,6 +95,9 @@ func main() {
9095
klog.ErrorS(http.ListenAndServe(addr, nil), "unable to start profiling server")
9196
}()
9297
}
98+
if *filteredWatchSecret {
99+
klog.Infof("Filtered watch for nodePublishSecretRef secret based on secrets-store.csi.k8s.io/used=true label enabled")
100+
}
93101

94102
// initialize metrics exporter before creating measurements
95103
err := metrics.InitMetricsExporter()
@@ -100,10 +108,28 @@ func main() {
100108
cfg := ctrl.GetConfigOrDie()
101109
cfg.UserAgent = version.GetUserAgent("controller")
102110

111+
// this enables filtered watch of pods based on the node name
112+
// only pods running on the same node as the csi driver will be cached
113+
fieldSelectorByResource := map[schema.GroupResource]string{
114+
{Group: "", Resource: "pods"}: fields.OneTermEqualSelector("spec.nodeName", *nodeID).String(),
115+
}
116+
labelSelectorByResource := map[schema.GroupResource]string{
117+
// this enables filtered watch of secretproviderclasspodstatuses based on the internal node label
118+
// internal.secrets-store.csi.k8s.io/node-name=<node name> added by csi driver
119+
{Group: v1alpha1.GroupVersion.Group, Resource: "secretproviderclasspodstatuses"}: fmt.Sprintf("%s=%s", v1alpha1.InternalNodeLabel, *nodeID),
120+
// this enables filtered watch of secrets based on the label (secrets-store.csi.k8s.io/managed=true)
121+
// added to the secrets created by the CSI driver
122+
{Group: "", Resource: "secrets"}: fmt.Sprintf("%s=true", controllers.SecretManagedLabel),
123+
}
124+
103125
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
104126
Scheme: scheme,
105127
MetricsBindAddress: *metricsAddr,
106128
LeaderElection: false,
129+
NewCache: cache.Builder(cache.Options{
130+
FieldSelectorByResource: fieldSelectorByResource,
131+
LabelSelectorByResource: labelSelectorByResource,
132+
}),
107133
})
108134
if err != nil {
109135
klog.Fatalf("failed to start manager, error: %+v", err)
@@ -152,23 +178,15 @@ func main() {
152178
}()
153179

154180
if *enableSecretRotation {
155-
rec, err := rotation.NewReconciler(scheme, *providerVolumePath, *nodeID, *rotationPollInterval, providerClients)
181+
rec, err := rotation.NewReconciler(scheme, *providerVolumePath, *nodeID, *rotationPollInterval, providerClients, *filteredWatchSecret)
156182
if err != nil {
157183
klog.Fatalf("failed to initialize rotation reconciler, error: %+v", err)
158184
}
159185
go rec.Run(ctx.Done())
160186
}
161187

162-
ccfg, err := config.GetConfig()
163-
if err != nil {
164-
klog.Fatalf("failed to initialize driver, error getting config: %+v", err)
165-
}
166-
c, err := client.New(ccfg, client.Options{Scheme: scheme, Mapper: nil})
167-
if err != nil {
168-
klog.Fatalf("failed to initialize driver, error creating client: %+v", err)
169-
}
170188
driver := secretsstore.GetDriver()
171-
driver.Run(ctx, *driverName, *nodeID, *endpoint, *providerVolumePath, providerClients, c)
189+
driver.Run(ctx, *driverName, *nodeID, *endpoint, *providerVolumePath, providerClients, mgr.GetClient())
172190
}
173191

174192
// withShutdownSignal returns a copy of the parent context that will close if

controllers/secretproviderclasspodstatus_controller.go

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"time"
2525

2626
"k8s.io/client-go/kubernetes"
27+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
28+
"sigs.k8s.io/controller-runtime/pkg/event"
29+
"sigs.k8s.io/controller-runtime/pkg/predicate"
2730

2831
"sigs.k8s.io/controller-runtime/pkg/manager"
2932

@@ -38,7 +41,6 @@ import (
3841

3942
ctrl "sigs.k8s.io/controller-runtime"
4043
"sigs.k8s.io/controller-runtime/pkg/client"
41-
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4244

4345
corev1 "k8s.io/api/core/v1"
4446
v1 "k8s.io/api/core/v1"
@@ -52,7 +54,8 @@ import (
5254
)
5355

5456
const (
55-
secretManagedLabel = "secrets-store.csi.k8s.io/managed"
57+
SecretManagedLabel = "secrets-store.csi.k8s.io/managed"
58+
SecretUsedLabel = "secrets-store.csi.k8s.io/used"
5659
secretCreationFailedReason = "FailedToCreateSecret"
5760
)
5861

@@ -108,7 +111,7 @@ func (r *SecretProviderClassPodStatusReconciler) Patcher(ctx context.Context) er
108111

109112
spcPodStatusList := &v1alpha1.SecretProviderClassPodStatusList{}
110113
spcMap := make(map[string]v1alpha1.SecretProviderClass)
111-
secretOwnerMap := make(map[types.NamespacedName][]*v1alpha1.SecretProviderClassPodStatus)
114+
secretOwnerMap := make(map[types.NamespacedName][]metav1.OwnerReference)
112115
// get a list of all spc pod status that belong to the node
113116
err := r.reader.List(ctx, spcPodStatusList, r.ListOptionsLabelSelector())
114117
if err != nil {
@@ -119,21 +122,56 @@ func (r *SecretProviderClassPodStatusReconciler) Patcher(ctx context.Context) er
119122
for i := range spcPodStatuses {
120123
spcName := spcPodStatuses[i].Status.SecretProviderClassName
121124
spc := &v1alpha1.SecretProviderClass{}
122-
if val, exists := spcMap[spcPodStatuses[i].Namespace+"/"+spcName]; exists {
125+
namespace := spcPodStatuses[i].Namespace
126+
127+
if val, exists := spcMap[namespace+"/"+spcName]; exists {
123128
spc = &val
124129
} else {
125-
if err := r.reader.Get(ctx, client.ObjectKey{Namespace: spcPodStatuses[i].Namespace, Name: spcName}, spc); err != nil {
130+
if err := r.reader.Get(ctx, client.ObjectKey{Namespace: namespace, Name: spcName}, spc); err != nil {
126131
return fmt.Errorf("failed to get spc %s, err: %+v", spcName, err)
127132
}
128-
spcMap[spcPodStatuses[i].Namespace+"/"+spcName] = *spc
133+
spcMap[namespace+"/"+spcName] = *spc
134+
}
135+
// get the pod and check if the pod has a owner reference
136+
pod := &v1.Pod{}
137+
err = r.reader.Get(ctx, client.ObjectKey{Namespace: namespace, Name: spcPodStatuses[i].Status.PodName}, pod)
138+
if err != nil {
139+
return fmt.Errorf("failed to fetch pod during patching, err: %+v", err)
129140
}
141+
var ownerRefs []metav1.OwnerReference
142+
for _, ownerRef := range pod.GetOwnerReferences() {
143+
ownerRefs = append(ownerRefs, metav1.OwnerReference{
144+
APIVersion: ownerRef.APIVersion,
145+
Kind: ownerRef.Kind,
146+
UID: ownerRef.UID,
147+
Name: ownerRef.Name,
148+
})
149+
}
150+
// If a pod has no owner references, then it's a static pod and
151+
// doesn't belong to a replicaset. In this case, use the spcps as
152+
// owner reference just like we do it today
153+
if len(ownerRefs) == 0 {
154+
// Create a new owner ref.
155+
gvk, err := apiutil.GVKForObject(&spcPodStatuses[i], r.scheme)
156+
if err != nil {
157+
return err
158+
}
159+
ref := metav1.OwnerReference{
160+
APIVersion: gvk.GroupVersion().String(),
161+
Kind: gvk.Kind,
162+
UID: spcPodStatuses[i].GetUID(),
163+
Name: spcPodStatuses[i].GetName(),
164+
}
165+
ownerRefs = append(ownerRefs, ref)
166+
}
167+
130168
for _, secret := range spc.Spec.SecretObjects {
131-
key := types.NamespacedName{Name: secret.SecretName, Namespace: spcPodStatuses[i].Namespace}
169+
key := types.NamespacedName{Name: secret.SecretName, Namespace: namespace}
132170
val, exists := secretOwnerMap[key]
133171
if exists {
134-
secretOwnerMap[key] = append(val, &spcPodStatuses[i])
172+
secretOwnerMap[key] = append(val, ownerRefs...)
135173
} else {
136-
secretOwnerMap[key] = []*v1alpha1.SecretProviderClassPodStatus{&spcPodStatuses[i]}
174+
secretOwnerMap[key] = ownerRefs
137175
}
138176
}
139177
}
@@ -191,22 +229,6 @@ func (r *SecretProviderClassPodStatusReconciler) Reconcile(ctx context.Context,
191229
return ctrl.Result{}, err
192230
}
193231

194-
// reconcile delete
195-
if !spcPodStatus.GetDeletionTimestamp().IsZero() {
196-
klog.InfoS("reconcile complete", "spcps", req.NamespacedName.String())
197-
return ctrl.Result{}, nil
198-
}
199-
200-
node, ok := spcPodStatus.GetLabels()[v1alpha1.InternalNodeLabel]
201-
if !ok {
202-
klog.V(3).InfoS("node label not found, ignoring this spc pod status", "spcps", klog.KObj(spcPodStatus))
203-
return ctrl.Result{}, nil
204-
}
205-
if !strings.EqualFold(node, r.nodeID) {
206-
klog.V(3).InfoS("ignoring as spc pod status belongs diff node", "node", node, "spcps", klog.KObj(spcPodStatus))
207-
return ctrl.Result{}, nil
208-
}
209-
210232
// Obtain the full pod metadata. An object reference is needed for sending
211233
// events and the UID is helpful for validating the SPCPS TargetPath.
212234
pod := &v1.Pod{}
@@ -296,7 +318,7 @@ func (r *SecretProviderClassPodStatusReconciler) Reconcile(ctx context.Context,
296318
// Set secrets-store.csi.k8s.io/managed=true label on the secret that's created and managed
297319
// by the secrets-store-csi-driver. This label will be used to perform a filtered list watch
298320
// only on secrets created and managed by the driver
299-
labelsMap[secretManagedLabel] = "true"
321+
labelsMap[SecretManagedLabel] = "true"
300322

301323
createFn := func() (bool, error) {
302324
if err := r.createK8sSecret(ctx, secretName, req.Namespace, datamap, labelsMap, secretType); err != nil {
@@ -335,9 +357,41 @@ func (r *SecretProviderClassPodStatusReconciler) Reconcile(ctx context.Context,
335357
func (r *SecretProviderClassPodStatusReconciler) SetupWithManager(mgr ctrl.Manager) error {
336358
return ctrl.NewControllerManagedBy(mgr).
337359
For(&v1alpha1.SecretProviderClassPodStatus{}).
360+
WithEventFilter(r.belongsToNodePredicate()).
338361
Complete(r)
339362
}
340363

364+
// belongsToNodePredicate defines predicates for handlers
365+
func (r *SecretProviderClassPodStatusReconciler) belongsToNodePredicate() predicate.Funcs {
366+
return predicate.Funcs{
367+
UpdateFunc: func(e event.UpdateEvent) bool {
368+
return r.processIfBelongsToNode(e.ObjectNew)
369+
},
370+
CreateFunc: func(e event.CreateEvent) bool {
371+
return r.processIfBelongsToNode(e.Object)
372+
},
373+
DeleteFunc: func(e event.DeleteEvent) bool {
374+
return false
375+
},
376+
GenericFunc: func(e event.GenericEvent) bool {
377+
return r.processIfBelongsToNode(e.Object)
378+
},
379+
}
380+
}
381+
382+
// processIfBelongsToNode determines if the secretproviderclasspodstatus belongs to the node based on the
383+
// internal.secrets-store.csi.k8s.io/node-name: <node name> label. If belongs to node, then the spcps is processed.
384+
func (r *SecretProviderClassPodStatusReconciler) processIfBelongsToNode(objMeta metav1.Object) bool {
385+
node, ok := objMeta.GetLabels()[v1alpha1.InternalNodeLabel]
386+
if !ok {
387+
return false
388+
}
389+
if !strings.EqualFold(node, r.nodeID) {
390+
return false
391+
}
392+
return true
393+
}
394+
341395
// createK8sSecret creates K8s secret with data from mounted files
342396
// If a secret with the same name already exists in the namespace of the pod, the error is nil.
343397
func (r *SecretProviderClassPodStatusReconciler) createK8sSecret(ctx context.Context, name, namespace string, datamap map[string][]byte, labelsmap map[string]string, secretType corev1.SecretType) error {
@@ -363,7 +417,7 @@ func (r *SecretProviderClassPodStatusReconciler) createK8sSecret(ctx context.Con
363417
}
364418

365419
// patchSecretWithOwnerRef patches the secret owner reference with the spc pod status
366-
func (r *SecretProviderClassPodStatusReconciler) patchSecretWithOwnerRef(ctx context.Context, name, namespace string, spcPodStatus ...*v1alpha1.SecretProviderClassPodStatus) error {
420+
func (r *SecretProviderClassPodStatusReconciler) patchSecretWithOwnerRef(ctx context.Context, name, namespace string, ownerRefs ...metav1.OwnerReference) error {
367421
secret := &corev1.Secret{}
368422
secretKey := types.NamespacedName{
369423
Namespace: namespace,
@@ -380,23 +434,23 @@ func (r *SecretProviderClassPodStatusReconciler) patchSecretWithOwnerRef(ctx con
380434
patch := client.MergeFromWithOptions(secret.DeepCopy(), client.MergeFromWithOptimisticLock{})
381435
needsPatch := false
382436

437+
secretOwnerRefs := secret.GetOwnerReferences()
383438
secretOwnerMap := make(map[string]types.UID)
384-
for _, or := range secret.GetOwnerReferences() {
439+
for _, or := range secretOwnerRefs {
385440
secretOwnerMap[or.Name] = or.UID
386441
}
387442

388-
for i := range spcPodStatus {
389-
if _, exists := secretOwnerMap[spcPodStatus[i].Name]; exists {
443+
for i := range ownerRefs {
444+
if _, exists := secretOwnerMap[ownerRefs[i].Name]; exists {
390445
continue
391446
}
392447
needsPatch = true
393-
err := controllerutil.SetOwnerReference(spcPodStatus[i], secret, r.scheme)
394-
if err != nil {
395-
return err
396-
}
448+
klog.Infof("Adding %s/%s as owner ref for %s/%s", ownerRefs[i].APIVersion, ownerRefs[i].Name, namespace, name)
449+
secretOwnerRefs = append(secretOwnerRefs, ownerRefs[i])
397450
}
398451

399452
if needsPatch {
453+
secret.SetOwnerReferences(secretOwnerRefs)
400454
return r.writer.Patch(ctx, secret, patch)
401455
}
402456
return nil

0 commit comments

Comments
 (0)