Skip to content

Commit 53fc44b

Browse files
authored
Merge pull request #136 from hasbro17/make-list-watchers-namespaced
Option to restrict informer cache to a namespace
2 parents b2824f6 + ca13e86 commit 53fc44b

File tree

5 files changed

+106
-12
lines changed

5 files changed

+106
-12
lines changed

pkg/cache/cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ type Options struct {
7979

8080
// Resync is the resync period. Defaults to defaultResyncTime.
8181
Resync *time.Duration
82+
83+
// Namespace restricts the cache's ListWatch to the desired namespace
84+
// Default watches all namespaces
85+
Namespace string
8286
}
8387

8488
var defaultResyncTime = 10 * time.Hour
@@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
8993
if err != nil {
9094
return nil, err
9195
}
92-
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync)
96+
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
9397
return &informerCache{InformersMap: im}, nil
9498
}
9599

pkg/cache/cache_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,35 @@ var _ = Describe("Informer Cache", func() {
195195
Expect(actual.Namespace).To(Equal(testNamespaceOne))
196196
})
197197

198+
It("should be able to restrict cache to a namespace", func() {
199+
By("creating a namespaced cache")
200+
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
201+
Expect(err).NotTo(HaveOccurred())
202+
203+
By("running the cache and waiting for it to sync")
204+
go func() {
205+
defer GinkgoRecover()
206+
Expect(namespacedCache.Start(stop)).To(Succeed())
207+
}()
208+
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())
209+
210+
By("listing pods in all namespaces")
211+
out := &kcorev1.PodList{}
212+
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())
213+
214+
By("verifying the returned pod is from the watched namespace")
215+
Expect(out.Items).NotTo(BeEmpty())
216+
Expect(out.Items).Should(HaveLen(1))
217+
Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne))
218+
219+
By("listing all namespaces - should still be able to get a cluster-scoped resource")
220+
namespaceList := &kcorev1.NamespaceList{}
221+
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())
222+
223+
By("verifying the namespace list is not empty")
224+
Expect(namespaceList.Items).NotTo(BeEmpty())
225+
})
226+
198227
It("should deep copy the object unless told otherwise", func() {
199228
By("retrieving a specific pod from the cache")
200229
out := &kcorev1.Pod{}
@@ -333,6 +362,45 @@ var _ = Describe("Informer Cache", func() {
333362
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
334363
})
335364

365+
It("should be able to restrict cache to a namespace", func() {
366+
By("creating a namespaced cache")
367+
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
368+
Expect(err).NotTo(HaveOccurred())
369+
370+
By("running the cache and waiting for it to sync")
371+
go func() {
372+
defer GinkgoRecover()
373+
Expect(namespacedCache.Start(stop)).To(Succeed())
374+
}()
375+
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())
376+
377+
By("listing pods in all namespaces")
378+
out := &unstructured.UnstructuredList{}
379+
out.SetGroupVersionKind(schema.GroupVersionKind{
380+
Group: "",
381+
Version: "v1",
382+
Kind: "PodList",
383+
})
384+
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())
385+
386+
By("verifying the returned pod is from the watched namespace")
387+
Expect(out.Items).NotTo(BeEmpty())
388+
Expect(out.Items).Should(HaveLen(1))
389+
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))
390+
391+
By("listing all namespaces - should still be able to get a cluster-scoped resource")
392+
namespaceList := &unstructured.UnstructuredList{}
393+
namespaceList.SetGroupVersionKind(schema.GroupVersionKind{
394+
Group: "",
395+
Version: "v1",
396+
Kind: "NamespaceList",
397+
})
398+
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())
399+
400+
By("verifying the namespace list is not empty")
401+
Expect(namespaceList.Items).NotTo(BeEmpty())
402+
})
403+
336404
It("should deep copy the object unless told otherwise", func() {
337405
By("retrieving a specific pod from the cache")
338406
out := &unstructured.Unstructured{}

pkg/cache/internal/deleg_map.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ type InformersMap struct {
4444
func NewInformersMap(config *rest.Config,
4545
scheme *runtime.Scheme,
4646
mapper meta.RESTMapper,
47-
resync time.Duration) *InformersMap {
47+
resync time.Duration,
48+
namespace string) *InformersMap {
4849

4950
return &InformersMap{
50-
structured: newStructuredInformersMap(config, scheme, mapper, resync),
51-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
51+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
52+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
5253

5354
Scheme: scheme,
5455
}
@@ -85,11 +86,11 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma
8586
}
8687

8788
// newStructuredInformersMap creates a new InformersMap for structured objects.
88-
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
89-
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
89+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
90+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
9091
}
9192

9293
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
93-
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
94-
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
94+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
95+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
9596
}

pkg/cache/internal/informers_map.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInforme
4242
func newSpecificInformersMap(config *rest.Config,
4343
scheme *runtime.Scheme,
4444
mapper meta.RESTMapper,
45-
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
45+
resync time.Duration,
46+
namespace string,
47+
createListWatcher createListWatcherFunc) *specificInformersMap {
4648
ip := &specificInformersMap{
4749
config: config,
4850
Scheme: scheme,
@@ -52,6 +54,7 @@ func newSpecificInformersMap(config *rest.Config,
5254
paramCodec: runtime.NewParameterCodec(scheme),
5355
resync: resync,
5456
createListWatcher: createListWatcher,
57+
namespace: namespace,
5558
}
5659
return ip
5760
}
@@ -102,6 +105,10 @@ type specificInformersMap struct {
102105
// and allows for abstracting over the particulars of structured vs
103106
// unstructured objects.
104107
createListWatcher createListWatcherFunc
108+
109+
// namespace is the namespace that all ListWatches are restricted to
110+
// default or empty string means all namespaces
111+
namespace string
105112
}
106113

107114
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
@@ -227,14 +234,16 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
227234
return &cache.ListWatch{
228235
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
229236
res := listObj.DeepCopyObject()
230-
err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
237+
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
238+
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
231239
return res, err
232240
},
233241
// Setup the watch function
234242
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
235243
// Watch needs to be set to true separately
236244
opts.Watch = true
237-
return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
245+
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
246+
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
238247
},
239248
}, nil
240249
}
@@ -254,12 +263,18 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
254263
// Create a new ListWatch for the obj
255264
return &cache.ListWatch{
256265
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
266+
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
267+
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
268+
}
257269
return dynamicClient.Resource(mapping.Resource).List(opts)
258270
},
259271
// Setup the watch function
260272
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
261273
// Watch needs to be set to true separately
262274
opts.Watch = true
275+
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
276+
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
277+
}
263278
return dynamicClient.Resource(mapping.Resource).Watch(opts)
264279
},
265280
}, nil

pkg/manager/manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ type Options struct {
106106
// will use for holding the leader lock.
107107
LeaderElectionID string
108108

109+
// Namespace if specified restricts the manager's cache to watch objects in the desired namespace
110+
// Defaults to all namespaces
111+
// Note: If a namespace is specified then controllers can still Watch for a cluster-scoped resource e.g Node
112+
// For namespaced resources the cache will only hold objects from the desired namespace.
113+
Namespace string
114+
109115
// Dependency injection for testing
110116
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
111117
newClient func(config *rest.Config, options client.Options) (client.Client, error)
@@ -153,7 +159,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
153159
}
154160

155161
// Create the cache for the cached read client and registering informers
156-
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod})
162+
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
157163
if err != nil {
158164
return nil, err
159165
}

0 commit comments

Comments
 (0)