Skip to content

Commit 4477c71

Browse files
authored
Merge pull request #1435 from qinqon/add-cache-filter-option-labels-and-fields
✨ Add SelectorsByObject option to cache
2 parents b2c90ab + cd065bf commit 4477c71

File tree

7 files changed

+232
-19
lines changed

7 files changed

+232
-19
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
SHELL:=/usr/bin/env bash
2525
.DEFAULT_GOAL:=help
2626

27+
export WHAT ?= ./...
28+
2729
# Use GOPROXY environment variable if set
2830
GOPROXY := $(shell go env GOPROXY)
2931
ifeq ($(GOPROXY),)

hack/test-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh
2020

2121
header_text "running go test"
2222

23-
go test -race ${MOD_OPT} ./...
23+
go test -race ${MOD_OPT} ${WHAT}
2424

2525
if [[ -n ${ARTIFACTS:-} ]]; then
2626
if grep -Rin '<failure type="Failure">' ${ARTIFACTS}/*; then exit 1; fi

pkg/cache/cache.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ type Informer interface {
8686
HasSynced() bool
8787
}
8888

89+
// SelectorsByObject associate a client.Object's GVK to a field/label selector
90+
type SelectorsByObject map[client.Object]internal.Selector
91+
8992
// Options are the optional arguments for creating a new InformersMap object
9093
type Options struct {
9194
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
@@ -103,6 +106,13 @@ type Options struct {
103106
// Namespace restricts the cache's ListWatch to the desired namespace
104107
// Default watches all namespaces
105108
Namespace string
109+
110+
// SelectorsByObject restricts the cache's ListWatch to the desired
111+
// fields per GVK at the specified object, the map's value must implement
112+
// Selector [1] using for example a Set [2]
113+
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
114+
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
115+
SelectorsByObject SelectorsByObject
106116
}
107117

108118
var defaultResyncTime = 10 * time.Hour
@@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) {
113123
if err != nil {
114124
return nil, err
115125
}
116-
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
126+
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
127+
if err != nil {
128+
return nil, err
129+
}
130+
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK)
117131
return &informerCache{InformersMap: im}, nil
118132
}
119133

134+
// BuilderWithOptions returns a Cache constructor that will build the a cache
135+
// honoring the options argument, this is useful to specify options like
136+
// SelectorsByObject
137+
// WARNING: if SelectorsByObject is specified. filtered out resources are not
138+
// returned.
139+
func BuilderWithOptions(options Options) NewCacheFunc {
140+
return func(config *rest.Config, opts Options) (Cache, error) {
141+
if opts.Scheme == nil {
142+
opts.Scheme = options.Scheme
143+
}
144+
if opts.Mapper == nil {
145+
opts.Mapper = options.Mapper
146+
}
147+
if opts.Resync == nil {
148+
opts.Resync = options.Resync
149+
}
150+
if opts.Namespace == "" {
151+
opts.Namespace = options.Namespace
152+
}
153+
opts.SelectorsByObject = options.SelectorsByObject
154+
return New(config, opts)
155+
}
156+
}
157+
120158
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
121159
// Use the default Kubernetes Scheme if unset
122160
if opts.Scheme == nil {
@@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
139177
}
140178
return opts, nil
141179
}
180+
181+
func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
182+
selectorsByGVK := internal.SelectorsByGVK{}
183+
for object, selector := range selectorsByObject {
184+
gvk, err := apiutil.GVKForObject(object, scheme)
185+
if err != nil {
186+
return nil, err
187+
}
188+
selectorsByGVK[gvk] = selector
189+
}
190+
return selectorsByGVK, nil
191+
}

pkg/cache/cache_test.go

Lines changed: 124 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@ import (
2121
"fmt"
2222

2323
. "github.com/onsi/ginkgo"
24+
. "github.com/onsi/ginkgo/extensions/table"
2425
. "github.com/onsi/gomega"
26+
2527
kcorev1 "k8s.io/api/core/v1"
2628
"k8s.io/apimachinery/pkg/api/errors"
2729
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2830
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31+
"k8s.io/apimachinery/pkg/fields"
32+
"k8s.io/apimachinery/pkg/labels"
2933
"k8s.io/apimachinery/pkg/runtime/schema"
3034
kscheme "k8s.io/client-go/kubernetes/scheme"
3135
"k8s.io/client-go/rest"
@@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3"
4246

4347
// TODO(community): Pull these helper functions into testenv.
4448
// Restart policy is included to allow indexing on that field.
45-
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
49+
func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object {
4650
three := int64(3)
51+
if labels == nil {
52+
labels = map[string]string{}
53+
}
54+
labels["test-label"] = name
4755
pod := &kcorev1.Pod{
4856
ObjectMeta: kmetav1.ObjectMeta{
4957
Name: name,
5058
Namespace: namespace,
51-
Labels: map[string]string{
52-
"test-label": name,
53-
},
59+
Labels: labels,
5460
},
5561
Spec: kcorev1.PodSpec{
5662
Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}},
@@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie
6571
return pod
6672
}
6773

74+
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
75+
return createPodWithLabels(name, namespace, restartPolicy, nil)
76+
}
77+
6878
func deletePod(pod client.Object) {
6979
cl, err := client.New(cfg, client.Options{})
7080
Expect(err).NotTo(HaveOccurred())
@@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
110120
// Includes restart policy since these objects are indexed on this field.
111121
knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever)
112122
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
113-
knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure)
114-
knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever)
123+
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
124+
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
115125
podGVK := schema.GroupVersionKind{
116126
Kind: "Pod",
117127
Version: "v1",
@@ -284,6 +294,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
284294
Expect(err).To(HaveOccurred())
285295
Expect(errors.IsTimeout(err)).To(BeTrue())
286296
})
297+
287298
})
288299
Context("with unstructured objects", func() {
289300
It("should be able to list objects that haven't been watched previously", func() {
@@ -709,6 +720,113 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
709720
Expect(err).To(HaveOccurred())
710721
})
711722
})
723+
type selectorsTestCase struct {
724+
fieldSelectors map[string]string
725+
labelSelectors map[string]string
726+
expectedPods []string
727+
}
728+
DescribeTable(" and cache with selectors", func(tc selectorsTestCase) {
729+
By("creating the cache")
730+
builder := cache.BuilderWithOptions(
731+
cache.Options{
732+
SelectorsByObject: cache.SelectorsByObject{
733+
&kcorev1.Pod{}: {
734+
Label: labels.Set(tc.labelSelectors).AsSelector(),
735+
Field: fields.Set(tc.fieldSelectors).AsSelector(),
736+
},
737+
},
738+
},
739+
)
740+
informer, err := builder(cfg, cache.Options{})
741+
Expect(err).NotTo(HaveOccurred())
742+
743+
By("running the cache and waiting for it to sync")
744+
go func() {
745+
defer GinkgoRecover()
746+
Expect(informer.Start(informerCacheCtx)).To(Succeed())
747+
}()
748+
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())
749+
750+
By("Checking with structured")
751+
obtainedStructuredPodList := kcorev1.PodList{}
752+
Expect(informer.List(context.Background(), &obtainedStructuredPodList)).To(Succeed())
753+
Expect(obtainedStructuredPodList.Items).Should(WithTransform(func(pods []kcorev1.Pod) []string {
754+
obtainedPodNames := []string{}
755+
for _, pod := range pods {
756+
obtainedPodNames = append(obtainedPodNames, pod.Name)
757+
}
758+
return obtainedPodNames
759+
}, ConsistOf(tc.expectedPods)))
760+
761+
By("Checking with unstructured")
762+
obtainedUnstructuredPodList := unstructured.UnstructuredList{}
763+
obtainedUnstructuredPodList.SetGroupVersionKind(schema.GroupVersionKind{
764+
Group: "",
765+
Version: "v1",
766+
Kind: "PodList",
767+
})
768+
err = informer.List(context.Background(), &obtainedUnstructuredPodList)
769+
Expect(err).To(Succeed())
770+
Expect(obtainedUnstructuredPodList.Items).Should(WithTransform(func(pods []unstructured.Unstructured) []string {
771+
obtainedPodNames := []string{}
772+
for _, pod := range pods {
773+
obtainedPodNames = append(obtainedPodNames, pod.GetName())
774+
}
775+
return obtainedPodNames
776+
}, ConsistOf(tc.expectedPods)))
777+
778+
By("Checking with metadata")
779+
obtainedMetadataPodList := kmetav1.PartialObjectMetadataList{}
780+
obtainedMetadataPodList.SetGroupVersionKind(schema.GroupVersionKind{
781+
Group: "",
782+
Version: "v1",
783+
Kind: "PodList",
784+
})
785+
err = informer.List(context.Background(), &obtainedMetadataPodList)
786+
Expect(err).To(Succeed())
787+
Expect(obtainedMetadataPodList.Items).Should(WithTransform(func(pods []kmetav1.PartialObjectMetadata) []string {
788+
obtainedPodNames := []string{}
789+
for _, pod := range pods {
790+
obtainedPodNames = append(obtainedPodNames, pod.Name)
791+
}
792+
return obtainedPodNames
793+
}, ConsistOf(tc.expectedPods)))
794+
},
795+
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
796+
fieldSelectors: map[string]string{},
797+
labelSelectors: map[string]string{},
798+
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
799+
}),
800+
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
801+
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
802+
expectedPods: []string{"test-pod-2"},
803+
}),
804+
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
805+
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
806+
expectedPods: []string{"test-pod-2", "test-pod-3"},
807+
}),
808+
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
809+
labelSelectors: map[string]string{"test-label": "test-pod-4"},
810+
expectedPods: []string{"test-pod-4"},
811+
}),
812+
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
813+
labelSelectors: map[string]string{"common-label": "common"},
814+
expectedPods: []string{"test-pod-3", "test-pod-4"},
815+
}),
816+
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
817+
labelSelectors: map[string]string{"common-label": "common"},
818+
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
819+
expectedPods: []string{"test-pod-3"},
820+
}),
821+
Entry("when label does not match it does not has to inform", selectorsTestCase{
822+
labelSelectors: map[string]string{"new-label": "new"},
823+
expectedPods: []string{},
824+
}),
825+
Entry("when field does not match it does not has to inform", selectorsTestCase{
826+
fieldSelectors: map[string]string{"metadata.namespace": "new"},
827+
expectedPods: []string{},
828+
}),
829+
)
712830
})
713831
Describe("as an Informer", func() {
714832
Context("with structured objects", func() {
@@ -789,7 +907,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
789907
Eventually(out).Should(Receive(Equal(pod)))
790908
close(done)
791909
})
792-
793910
It("should be able to index an object field then retrieve objects by that field", func() {
794911
By("creating the cache")
795912
informer, err := cache.New(cfg, cache.Options{})

pkg/cache/internal/deleg_map.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
4949
scheme *runtime.Scheme,
5050
mapper meta.RESTMapper,
5151
resync time.Duration,
52-
namespace string) *InformersMap {
52+
namespace string,
53+
selectors SelectorsByGVK,
54+
) *InformersMap {
5355

5456
return &InformersMap{
55-
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
56-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
57-
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
57+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
58+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
59+
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),
5860

5961
Scheme: scheme,
6062
}
@@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
105107
}
106108

107109
// newStructuredInformersMap creates a new InformersMap for structured objects.
108-
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
109-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
110+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
111+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
112+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
110113
}
111114

112115
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
113-
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
114-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
116+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
117+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
118+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
115119
}
116120

117121
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
118-
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
119-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
122+
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
123+
namespace string, selectors SelectorsByGVK) *specificInformersMap {
124+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
120125
}

0 commit comments

Comments
 (0)