Skip to content

Commit cd065bf

Browse files
committed
Add SelectorsByObject option to cache
Controller-Runtime controllers use a cache to subscribe to events from Kubernetes objects and to read those objects more efficiently by avoiding to call out to the API. This cache is backed by Kubernetes informers. The only way to filter this cache is by namespace and resource type. In cases where a controller is only interested in a small subset of objects (for example all pods on a node), this might end up not being efficient enough. This change increase the "pkg/cache" interface adding the "BuildWithOptins" function and the "Options.SelectorsByObject" option. The SelectorsByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object, the map's value must implement Selector [1] using for example a Set [2] This is the implementation of the design document [3] [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set [3] https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/use-selectors-at-cache.md Signed-off-by: Quique Llorente <ellorent@redhat.com>
1 parent b2c90ab commit cd065bf

File tree

7 files changed

+232
-19
lines changed

7 files changed

+232
-19
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
SHELL:=/usr/bin/env bash
2525
.DEFAULT_GOAL:=help
2626

27+
export WHAT ?= ./...
28+
2729
# Use GOPROXY environment variable if set
2830
GOPROXY := $(shell go env GOPROXY)
2931
ifeq ($(GOPROXY),)

hack/test-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh
2020

2121
header_text "running go test"
2222

23-
go test -race ${MOD_OPT} ./...
23+
go test -race ${MOD_OPT} ${WHAT}
2424

2525
if [[ -n ${ARTIFACTS:-} ]]; then
2626
if grep -Rin '<failure type="Failure">' ${ARTIFACTS}/*; then exit 1; fi

pkg/cache/cache.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ type Informer interface {
8686
HasSynced() bool
8787
}
8888

89+
// SelectorsByObject associate a client.Object's GVK to a field/label selector
90+
type SelectorsByObject map[client.Object]internal.Selector
91+
8992
// Options are the optional arguments for creating a new InformersMap object
9093
type Options struct {
9194
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
@@ -103,6 +106,13 @@ type Options struct {
103106
// Namespace restricts the cache's ListWatch to the desired namespace
104107
// Default watches all namespaces
105108
Namespace string
109+
110+
// SelectorsByObject restricts the cache's ListWatch to the desired
111+
// fields per GVK at the specified object, the map's value must implement
112+
// Selector [1] using for example a Set [2]
113+
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
114+
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
115+
SelectorsByObject SelectorsByObject
106116
}
107117

108118
var defaultResyncTime = 10 * time.Hour
@@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) {
113123
if err != nil {
114124
return nil, err
115125
}
116-
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
126+
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
127+
if err != nil {
128+
return nil, err
129+
}
130+
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK)
117131
return &informerCache{InformersMap: im}, nil
118132
}
119133

134+
// BuilderWithOptions returns a Cache constructor that will build the a cache
135+
// honoring the options argument, this is useful to specify options like
136+
// SelectorsByObject
137+
// WARNING: if SelectorsByObject is specified. filtered out resources are not
138+
// returned.
139+
func BuilderWithOptions(options Options) NewCacheFunc {
140+
return func(config *rest.Config, opts Options) (Cache, error) {
141+
if opts.Scheme == nil {
142+
opts.Scheme = options.Scheme
143+
}
144+
if opts.Mapper == nil {
145+
opts.Mapper = options.Mapper
146+
}
147+
if opts.Resync == nil {
148+
opts.Resync = options.Resync
149+
}
150+
if opts.Namespace == "" {
151+
opts.Namespace = options.Namespace
152+
}
153+
opts.SelectorsByObject = options.SelectorsByObject
154+
return New(config, opts)
155+
}
156+
}
157+
120158
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
121159
// Use the default Kubernetes Scheme if unset
122160
if opts.Scheme == nil {
@@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
139177
}
140178
return opts, nil
141179
}
180+
181+
func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
182+
selectorsByGVK := internal.SelectorsByGVK{}
183+
for object, selector := range selectorsByObject {
184+
gvk, err := apiutil.GVKForObject(object, scheme)
185+
if err != nil {
186+
return nil, err
187+
}
188+
selectorsByGVK[gvk] = selector
189+
}
190+
return selectorsByGVK, nil
191+
}

pkg/cache/cache_test.go

Lines changed: 124 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@ import (
2121
"fmt"
2222

2323
. "github.com/onsi/ginkgo"
24+
. "github.com/onsi/ginkgo/extensions/table"
2425
. "github.com/onsi/gomega"
26+
2527
kcorev1 "k8s.io/api/core/v1"
2628
"k8s.io/apimachinery/pkg/api/errors"
2729
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2830
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31+
"k8s.io/apimachinery/pkg/fields"
32+
"k8s.io/apimachinery/pkg/labels"
2933
"k8s.io/apimachinery/pkg/runtime/schema"
3034
kscheme "k8s.io/client-go/kubernetes/scheme"
3135
"k8s.io/client-go/rest"
@@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3"
4246

4347
// TODO(community): Pull these helper functions into testenv.
4448
// Restart policy is included to allow indexing on that field.
45-
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
49+
func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object {
4650
three := int64(3)
51+
if labels == nil {
52+
labels = map[string]string{}
53+
}
54+
labels["test-label"] = name
4755
pod := &kcorev1.Pod{
4856
ObjectMeta: kmetav1.ObjectMeta{
4957
Name: name,
5058
Namespace: namespace,
51-
Labels: map[string]string{
52-
"test-label": name,
53-
},
59+
Labels: labels,
5460
},
5561
Spec: kcorev1.PodSpec{
5662
Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}},
@@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie
6571
return pod
6672
}
6773

74+
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
75+
return createPodWithLabels(name, namespace, restartPolicy, nil)
76+
}
77+
6878
func deletePod(pod client.Object) {
6979
cl, err := client.New(cfg, client.Options{})
7080
Expect(err).NotTo(HaveOccurred())
@@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
110120
// Includes restart policy since these objects are indexed on this field.
111121
knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever)
112122
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
113-
knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure)
114-
knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever)
123+
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
124+
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
115125
podGVK := schema.GroupVersionKind{
116126
Kind: "Pod",
117127
Version: "v1",
@@ -284,6 +294,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
284294
Expect(err).To(HaveOccurred())
285295
Expect(errors.IsTimeout(err)).To(BeTrue())
286296
})
297+
287298
})
288299
Context("with unstructured objects", func() {
289300
It("should be able to list objects that haven't been watched previously", func() {
@@ -709,6 +720,113 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
709720
Expect(err).To(HaveOccurred())
710721
})
711722
})
723+
type selectorsTestCase struct {
724+
fieldSelectors map[string]string
725+
labelSelectors map[string]string
726+
expectedPods []string
727+
}
728+
DescribeTable(" and cache with selectors", func(tc selectorsTestCase) {
729+
By("creating the cache")
730+
builder := cache.BuilderWithOptions(
731+
cache.Options{
732+
SelectorsByObject: cache.SelectorsByObject{
733+
&kcorev1.Pod{}: {
734+
Label: labels.Set(tc.labelSelectors).AsSelector(),
735+
Field: fields.Set(tc.fieldSelectors).AsSelector(),
736+
},
737+
},
738+
},
739+
)
740+
informer, err := builder(cfg, cache.Options{})
741+
Expect(err).NotTo(HaveOccurred())
742+
743+
By("running the cache and waiting for it to sync")
744+
go func() {
745+
defer GinkgoRecover()
746+
Expect(informer.Start(informerCacheCtx)).To(Succeed())
747+
}()
748+
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())
749+
750+
By("Checking with structured")
751+
obtainedStructuredPodList := kcorev1.PodList{}
752+
Expect(informer.List(context.Background(), &obtainedStructuredPodList)).To(Succeed())
753+
Expect(obtainedStructuredPodList.Items).Should(WithTransform(func(pods []kcorev1.Pod) []string {
754+
obtainedPodNames := []string{}
755+
for _, pod := range pods {
756+
obtainedPodNames = append(obtainedPodNames, pod.Name)
757+
}
758+
return obtainedPodNames
759+
}, ConsistOf(tc.expectedPods)))
760+
761+
By("Checking with unstructured")
762+
obtainedUnstructuredPodList := unstructured.UnstructuredList{}
763+
obtainedUnstructuredPodList.SetGroupVersionKind(schema.GroupVersionKind{
764+
Group: "",
765+
Version: "v1",
766+
Kind: "PodList",
767+
})
768+
err = informer.List(context.Background(), &obtainedUnstructuredPodList)
769+
Expect(err).To(Succeed())
770+
Expect(obtainedUnstructuredPodList.Items).Should(WithTransform(func(pods []unstructured.Unstructured) []string {
771+
obtainedPodNames := []string{}
772+
for _, pod := range pods {
773+
obtainedPodNames = append(obtainedPodNames, pod.GetName())
774+
}
775+
return obtainedPodNames
776+
}, ConsistOf(tc.expectedPods)))
777+
778+
By("Checking with metadata")
779+
obtainedMetadataPodList := kmetav1.PartialObjectMetadataList{}
780+
obtainedMetadataPodList.SetGroupVersionKind(schema.GroupVersionKind{
781+
Group: "",
782+
Version: "v1",
783+
Kind: "PodList",
784+
})
785+
err = informer.List(context.Background(), &obtainedMetadataPodList)
786+
Expect(err).To(Succeed())
787+
Expect(obtainedMetadataPodList.Items).Should(WithTransform(func(pods []kmetav1.PartialObjectMetadata) []string {
788+
obtainedPodNames := []string{}
789+
for _, pod := range pods {
790+
obtainedPodNames = append(obtainedPodNames, pod.Name)
791+
}
792+
return obtainedPodNames
793+
}, ConsistOf(tc.expectedPods)))
794+
},
795+
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
796+
fieldSelectors: map[string]string{},
797+
labelSelectors: map[string]string{},
798+
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
799+
}),
800+
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
801+
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
802+
expectedPods: []string{"test-pod-2"},
803+
}),
804+
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
805+
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
806+
expectedPods: []string{"test-pod-2", "test-pod-3"},
807+
}),
808+
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
809+
labelSelectors: map[string]string{"test-label": "test-pod-4"},
810+
expectedPods: []string{"test-pod-4"},
811+
}),
812+
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
813+
labelSelectors: map[string]string{"common-label": "common"},
814+
expectedPods: []string{"test-pod-3", "test-pod-4"},
815+
}),
816+
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
817+
labelSelectors: map[string]string{"common-label": "common"},
818+
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
819+
expectedPods: []string{"test-pod-3"},
820+
}),
821+
Entry("when label does not match it does not has to inform", selectorsTestCase{
822+
labelSelectors: map[string]string{"new-label": "new"},
823+
expectedPods: []string{},
824+
}),
825+
Entry("when field does not match it does not has to inform", selectorsTestCase{
826+
fieldSelectors: map[string]string{"metadata.namespace": "new"},
827+
expectedPods: []string{},
828+
}),
829+
)
712830
})
713831
Describe("as an Informer", func() {
714832
Context("with structured objects", func() {
@@ -789,7 +907,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
789907
Eventually(out).Should(Receive(Equal(pod)))
790908
close(done)
791909
})
792-
793910
It("should be able to index an object field then retrieve objects by that field", func() {
794911
By("creating the cache")
795912
informer, err := cache.New(cfg, cache.Options{})

pkg/cache/internal/deleg_map.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
4949
scheme *runtime.Scheme,
5050
mapper meta.RESTMapper,
5151
resync time.Duration,
52-
namespace string) *InformersMap {
52+
namespace string,
53+
selectors SelectorsByGVK,
54+
) *InformersMap {
5355

5456
return &InformersMap{
55-
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
56-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
57-
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
57+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
58+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
59+
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),
5860

5961
Scheme: scheme,
6062
}
@@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
105107
}
106108

107109
// newStructuredInformersMap creates a new InformersMap for structured objects.
108-
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
109-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
110+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
111+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
112+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
110113
}
111114

112115
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
113-
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
114-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
116+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
117+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
118+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
115119
}
116120

117121
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
118-
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
119-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
122+
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
123+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
124+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
120125
}

0 commit comments

Comments
 (0)