Skip to content

Commit 5a54638

Browse files
committed
Add index support to providers
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
1 parent 223b19b commit 5a54638

File tree

7 files changed

+169
-43
lines changed

7 files changed

+169
-43
lines changed

pkg/manager/manager.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"k8s.io/client-go/rest"
2727

28+
"sigs.k8s.io/controller-runtime/pkg/client"
2829
"sigs.k8s.io/controller-runtime/pkg/cluster"
2930
"sigs.k8s.io/controller-runtime/pkg/config"
3031
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -103,6 +104,10 @@ type Manager interface {
103104
// GetProvider returns the multicluster provider, or nil if it is not set.
104105
GetProvider() multicluster.Provider
105106

107+
// GetFieldIndexer returns a client.FieldIndexer that adds indexes to the
108+
// multicluster provider (if set) and the local manager.
109+
GetFieldIndexer() client.FieldIndexer
110+
106111
multicluster.Aware
107112
}
108113

@@ -215,13 +220,34 @@ func (m *mcManager) GetManager(ctx context.Context, clusterName string) (manager
215220
}, nil
216221
}
217222

223+
type fieldIndexerFunc func(context.Context, client.Object, string, client.IndexerFunc) error
224+
225+
func (f fieldIndexerFunc) IndexField(ctx context.Context, obj client.Object, fieldName string, indexerFunc client.IndexerFunc) error {
226+
return f(ctx, obj, fieldName, indexerFunc)
227+
}
228+
229+
// GetFieldIndexer returns a client.FieldIndexer that adds indexes to the
230+
// multicluster provider (if set) and to the local cluster if not.
231+
func (m *mcManager) GetFieldIndexer() client.FieldIndexer {
232+
return fieldIndexerFunc(func(ctx context.Context, obj client.Object, fieldName string, indexerFunc client.IndexerFunc) error {
233+
if m.provider != nil {
234+
if err := m.provider.IndexField(ctx, obj, fieldName, indexerFunc); err != nil {
235+
return fmt.Errorf("failed to index field %q on multi-cluster provider: %w", fieldName, err)
236+
}
237+
return nil
238+
}
239+
return m.Manager.GetFieldIndexer().IndexField(ctx, obj, fieldName, indexerFunc)
240+
})
241+
}
242+
218243
var _ manager.Manager = &scopedManager{}
219244

220245
type scopedManager struct {
221246
Manager
222247
cluster.Cluster
223248
}
224249

250+
// Add adds a Runnable to the manager.
225251
func (p *scopedManager) Add(r manager.Runnable) error {
226252
return p.Manager.GetLocalManager().Add(r)
227253
}
@@ -230,3 +256,8 @@ func (p *scopedManager) Add(r manager.Runnable) error {
230256
func (p *scopedManager) Start(ctx context.Context) error {
231257
return p.Manager.GetLocalManager().Start(ctx)
232258
}
259+
260+
// GetFieldIndexer returns the field indexer.
261+
func (p *scopedManager) GetFieldIndexer() client.FieldIndexer {
262+
return p.Cluster.GetFieldIndexer()
263+
}

pkg/multicluster/multicluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package multicluster
1919
import (
2020
"context"
2121

22+
"sigs.k8s.io/controller-runtime/pkg/client"
2223
"sigs.k8s.io/controller-runtime/pkg/cluster"
2324
)
2425

@@ -55,4 +56,8 @@ type Provider interface {
5556
// If no cluster is known to the provider under the given cluster name,
5657
// an error should be returned.
5758
Get(ctx context.Context, clusterName string) (cluster.Cluster, error)
59+
60+
// IndexField indexes the given object by the given field on all engaged
61+
// clusters, current and future.
62+
IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error
5863
}

providers/cluster-api/provider.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ func New(localMgr manager.Manager, opts Options) (*Provider, error) {
9494
return p, nil
9595
}
9696

97+
type index struct {
98+
object client.Object
99+
field string
100+
extractValue client.IndexerFunc
101+
}
102+
97103
// Provider is a cluster Provider that works with Cluster-API.
98104
type Provider struct {
99105
opts Options
@@ -104,6 +110,7 @@ type Provider struct {
104110
mcMgr mcmanager.Manager
105111
clusters map[string]cluster.Cluster
106112
cancelFns map[string]context.CancelFunc
113+
indexers []index
107114
}
108115

109116
// Get returns the cluster with the given name, if it is known.
@@ -189,6 +196,11 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
189196
if err != nil {
190197
return reconcile.Result{}, fmt.Errorf("failed to create cluster: %w", err)
191198
}
199+
for _, idx := range p.indexers {
200+
if err := cl.GetCache().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
201+
return reconcile.Result{}, fmt.Errorf("failed to index field %q: %w", idx.field, err)
202+
}
203+
}
192204
clusterCtx, cancel := context.WithCancel(ctx)
193205
go func() {
194206
if err := cl.Start(clusterCtx); err != nil {
@@ -201,13 +213,13 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
201213
return reconcile.Result{}, fmt.Errorf("failed to sync cache")
202214
}
203215

204-
// remember
216+
// remember.
205217
p.clusters[key] = cl
206218
p.cancelFns[key] = cancel
207219

208220
p.log.Info("Added new cluster")
209221

210-
// engage manager
222+
// engage manager.
211223
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
212224
log.Error(err, "failed to engage manager")
213225
delete(p.clusters, key)
@@ -217,3 +229,25 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
217229

218230
return reconcile.Result{}, nil
219231
}
232+
233+
// IndexField indexes a field on all clusters, existing and future.
234+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
235+
p.lock.Lock()
236+
defer p.lock.Unlock()
237+
238+
// save for future clusters.
239+
p.indexers = append(p.indexers, index{
240+
object: obj,
241+
field: field,
242+
extractValue: extractValue,
243+
})
244+
245+
// apply to existing clusters.
246+
for name, cl := range p.clusters {
247+
if err := cl.GetCache().IndexField(ctx, obj, field, extractValue); err != nil {
248+
return fmt.Errorf("failed to index field %q on cluster %q: %w", field, name, err)
249+
}
250+
}
251+
252+
return nil
253+
}

providers/kind/provider.go

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ import (
2424
"time"
2525

2626
"github.com/go-logr/logr"
27-
2827
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
2928
"github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster"
3029
"k8s.io/apimachinery/pkg/util/sets"
3130
"k8s.io/apimachinery/pkg/util/wait"
3231
"k8s.io/client-go/tools/clientcmd"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
3333
"sigs.k8s.io/controller-runtime/pkg/cluster"
3434
"sigs.k8s.io/controller-runtime/pkg/log"
3535
kind "sigs.k8s.io/kind/pkg/cluster"
@@ -46,29 +46,36 @@ func New() *Provider {
4646
}
4747
}
4848

49+
type index struct {
50+
object client.Object
51+
field string
52+
extractValue client.IndexerFunc
53+
}
54+
4955
// Provider is a cluster Provider that works with a local Kind instance.
5056
type Provider struct {
5157
opts []cluster.Option
5258
log logr.Logger
5359
lock sync.RWMutex
5460
clusters map[string]cluster.Cluster
5561
cancelFns map[string]context.CancelFunc
62+
indexers []index
5663
}
5764

5865
// Get returns the cluster with the given name, if it is known.
59-
func (k *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
60-
k.lock.RLock()
61-
defer k.lock.RUnlock()
62-
if cl, ok := k.clusters[clusterName]; ok {
66+
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
67+
p.lock.RLock()
68+
defer p.lock.RUnlock()
69+
if cl, ok := p.clusters[clusterName]; ok {
6370
return cl, nil
6471
}
6572

6673
return nil, fmt.Errorf("cluster %s not found", clusterName)
6774
}
6875

6976
// Run starts the provider and blocks.
70-
func (k *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
71-
k.log.Info("Starting kind cluster provider")
77+
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
78+
p.log.Info("Starting kind cluster provider")
7279

7380
provider := kind.NewProvider()
7481

@@ -80,41 +87,46 @@ func (k *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
8087
return wait.PollUntilContextCancel(ctx, time.Second*2, true, func(ctx context.Context) (done bool, err error) {
8188
list, err := provider.List()
8289
if err != nil {
83-
k.log.Info("failed to list kind clusters", "error", err)
90+
p.log.Info("failed to list kind clusters", "error", err)
8491
return false, nil // keep going
8592
}
8693

8794
// start new clusters
8895
for _, clusterName := range list {
89-
log := k.log.WithValues("cluster", clusterName)
96+
log := p.log.WithValues("cluster", clusterName)
9097

9198
// skip?
9299
if !strings.HasPrefix(clusterName, "fleet-") {
93100
continue
94101
}
95-
k.lock.RLock()
96-
if _, ok := k.clusters[clusterName]; ok {
97-
k.lock.RUnlock()
102+
p.lock.RLock()
103+
if _, ok := p.clusters[clusterName]; ok {
104+
p.lock.RUnlock()
98105
continue
99106
}
100-
k.lock.RUnlock()
107+
p.lock.RUnlock()
101108

102109
// create a new cluster
103110
kubeconfig, err := provider.KubeConfig(clusterName, false)
104111
if err != nil {
105-
k.log.Info("failed to get kind kubeconfig", "error", err)
112+
p.log.Info("failed to get kind kubeconfig", "error", err)
106113
return false, nil // keep going
107114
}
108115
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfig))
109116
if err != nil {
110-
k.log.Info("failed to create rest config", "error", err)
117+
p.log.Info("failed to create rest config", "error", err)
111118
return false, nil // keep going
112119
}
113-
cl, err := cluster.New(cfg, k.opts...)
120+
cl, err := cluster.New(cfg, p.opts...)
114121
if err != nil {
115-
k.log.Info("failed to create cluster", "error", err)
122+
p.log.Info("failed to create cluster", "error", err)
116123
return false, nil // keep going
117124
}
125+
for _, idx := range p.indexers {
126+
if err := cl.GetCache().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
127+
return false, fmt.Errorf("failed to index field %q: %w", idx.field, err)
128+
}
129+
}
118130
clusterCtx, cancel := context.WithCancel(ctx)
119131
go func() {
120132
if err := cl.Start(clusterCtx); err != nil {
@@ -129,47 +141,69 @@ func (k *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
129141
}
130142

131143
// remember
132-
k.lock.Lock()
133-
k.clusters[clusterName] = cl
134-
k.cancelFns[clusterName] = cancel
135-
k.lock.Unlock()
144+
p.lock.Lock()
145+
p.clusters[clusterName] = cl
146+
p.cancelFns[clusterName] = cancel
147+
p.lock.Unlock()
136148

137-
k.log.Info("Added new cluster", "cluster", clusterName)
149+
p.log.Info("Added new cluster", "cluster", clusterName)
138150

139151
// engage manager
140152
if mgr != nil {
141153
if err := mgr.Engage(clusterCtx, clusterName, cl); err != nil {
142154
log.Error(err, "failed to engage manager")
143-
k.lock.Lock()
144-
delete(k.clusters, clusterName)
145-
delete(k.cancelFns, clusterName)
146-
k.lock.Unlock()
155+
p.lock.Lock()
156+
delete(p.clusters, clusterName)
157+
delete(p.cancelFns, clusterName)
158+
p.lock.Unlock()
147159
return false, nil
148160
}
149161
}
150162
}
151163

152164
// remove old clusters
153165
kindNames := sets.New(list...)
154-
k.lock.Lock()
155-
clusterNames := make([]string, 0, len(k.clusters))
156-
for name := range k.clusters {
166+
p.lock.Lock()
167+
clusterNames := make([]string, 0, len(p.clusters))
168+
for name := range p.clusters {
157169
clusterNames = append(clusterNames, name)
158170
}
159-
k.lock.Unlock()
171+
p.lock.Unlock()
160172
for _, name := range clusterNames {
161173
if !kindNames.Has(name) {
162174
// stop and forget
163-
k.lock.Lock()
164-
k.cancelFns[name]()
165-
delete(k.clusters, name)
166-
delete(k.cancelFns, name)
167-
k.lock.Unlock()
175+
p.lock.Lock()
176+
p.cancelFns[name]()
177+
delete(p.clusters, name)
178+
delete(p.cancelFns, name)
179+
p.lock.Unlock()
168180

169-
k.log.Info("Cluster removed", "cluster", name)
181+
p.log.Info("Cluster removed", "cluster", name)
170182
}
171183
}
172184

173185
return false, nil
174186
})
175187
}
188+
189+
// IndexField indexes a field on all clusters, existing and future.
190+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
191+
p.lock.Lock()
192+
defer p.lock.Unlock()
193+
194+
// save for future clusters.
195+
p.indexers = append(p.indexers, index{
196+
object: obj,
197+
field: field,
198+
extractValue: extractValue,
199+
})
200+
201+
// apply to existing clusters.
202+
for name, cl := range p.clusters {
203+
if err := cl.GetCache().IndexField(ctx, obj, field, extractValue); err != nil {
204+
return fmt.Errorf("failed to index field %q on cluster %q: %w", field, name, err)
205+
}
206+
}
207+
208+
return nil
209+
}

providers/namespace/provider.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@ import (
2222
"sync"
2323

2424
"github.com/go-logr/logr"
25-
"github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster"
2625

27-
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
2826
corev1 "k8s.io/api/core/v1"
2927
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3028
toolscache "k8s.io/client-go/tools/cache"
29+
30+
"sigs.k8s.io/controller-runtime/pkg/client"
3131
"sigs.k8s.io/controller-runtime/pkg/cluster"
3232
"sigs.k8s.io/controller-runtime/pkg/log"
33+
34+
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
35+
"github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster"
3336
)
3437

3538
var _ multicluster.Provider = &Provider{}
@@ -125,11 +128,16 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
125128
}
126129

127130
// Get returns a cluster by name.
128-
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
131+
func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster, error) {
129132
p.lock.RLock()
130133
defer p.lock.RUnlock()
131134
if cl, ok := p.clusters[clusterName]; ok {
132135
return cl, nil
133136
}
134137
return nil, fmt.Errorf("cluster %s not found", clusterName)
135138
}
139+
140+
// IndexField indexes a field on all clusters.
141+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
142+
return p.cluster.GetFieldIndexer().IndexField(ctx, obj, field, extractValue)
143+
}

0 commit comments

Comments
 (0)