Skip to content

Commit dea6dee

Browse files
authored
Merge pull request #1307 from alvaroaleman/cluster
✨ Move cluster-specifics from Manager into new pkg/cluster
2 parents 8fde15b + 066bfea commit dea6dee

File tree

9 files changed

+887
-200
lines changed

9 files changed

+887
-200
lines changed

pkg/cluster/client_builder.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"k8s.io/client-go/rest"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/cache"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
)
25+
26+
// ClientBuilder builder is the interface for the client builder.
27+
type ClientBuilder interface {
28+
// WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache
29+
// for this client. This function can be called multiple times, it should append to an internal slice.
30+
WithUncached(objs ...client.Object) ClientBuilder
31+
32+
// Build returns a new client.
33+
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
34+
}
35+
36+
// NewClientBuilder returns a builder to build new clients to be passed when creating a Manager.
37+
func NewClientBuilder() ClientBuilder {
38+
return &newClientBuilder{}
39+
}
40+
41+
type newClientBuilder struct {
42+
uncached []client.Object
43+
}
44+
45+
func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
46+
n.uncached = append(n.uncached, objs...)
47+
return n
48+
}
49+
50+
func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
51+
// Create the Client for Write operations.
52+
c, err := client.New(config, options)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
return client.NewDelegatingClient(client.NewDelegatingClientInput{
58+
CacheReader: cache,
59+
Client: c,
60+
UncachedObjects: n.uncached,
61+
})
62+
}

pkg/cluster/cluster.go

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"context"
21+
"errors"
22+
"time"
23+
24+
"github.com/go-logr/logr"
25+
"k8s.io/apimachinery/pkg/api/meta"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/client-go/kubernetes/scheme"
28+
"k8s.io/client-go/rest"
29+
"k8s.io/client-go/tools/record"
30+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
31+
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
32+
33+
"sigs.k8s.io/controller-runtime/pkg/cache"
34+
"sigs.k8s.io/controller-runtime/pkg/client"
35+
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
36+
)
37+
38+
// Cluster provides various methods to interact with a cluster.
39+
type Cluster interface {
40+
// SetFields will set any dependencies on an object for which the object has implemented the inject
41+
// interface - e.g. inject.Client.
42+
SetFields(interface{}) error
43+
44+
// GetConfig returns an initialized Config
45+
GetConfig() *rest.Config
46+
47+
// GetScheme returns an initialized Scheme
48+
GetScheme() *runtime.Scheme
49+
50+
// GetClient returns a client configured with the Config. This client may
51+
// not be a fully "direct" client -- it may read from a cache, for
52+
// instance. See Options.NewClient for more information on how the default
53+
// implementation works.
54+
GetClient() client.Client
55+
56+
// GetFieldIndexer returns a client.FieldIndexer configured with the client
57+
GetFieldIndexer() client.FieldIndexer
58+
59+
// GetCache returns a cache.Cache
60+
GetCache() cache.Cache
61+
62+
// GetEventRecorderFor returns a new EventRecorder for the provided name
63+
GetEventRecorderFor(name string) record.EventRecorder
64+
65+
// GetRESTMapper returns a RESTMapper
66+
GetRESTMapper() meta.RESTMapper
67+
68+
// GetAPIReader returns a reader that will be configured to use the API server.
69+
// This should be used sparingly and only when the client does not fit your
70+
// use case.
71+
GetAPIReader() client.Reader
72+
73+
// Start starts the cluster
74+
Start(ctx context.Context) error
75+
}
76+
77+
// Options are the possible options that can be configured for a Cluster.
78+
type Options struct {
79+
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
80+
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
81+
// idea to pass your own scheme in. See the documentation in pkg/scheme for more information.
82+
Scheme *runtime.Scheme
83+
84+
// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
85+
MapperProvider func(c *rest.Config) (meta.RESTMapper, error)
86+
87+
// Logger is the logger that should be used by this Cluster.
88+
// If none is set, it defaults to log.Log global logger.
89+
Logger logr.Logger
90+
91+
// SyncPeriod determines the minimum frequency at which watched resources are
92+
// reconciled. A lower period will correct entropy more quickly, but reduce
93+
// responsiveness to change if there are many watched resources. Change this
94+
// value only if you know what you are doing. Defaults to 10 hours if unset.
95+
// there will a 10 percent jitter between the SyncPeriod of all controllers
96+
// so that all controllers will not send list requests simultaneously.
97+
SyncPeriod *time.Duration
98+
99+
// Namespace if specified restricts the manager's cache to watch objects in
100+
// the desired namespace Defaults to all namespaces
101+
//
102+
// Note: If a namespace is specified, controllers can still Watch for a
103+
// cluster-scoped resource (e.g Node). For namespaced resources the cache
104+
// will only hold objects from the desired namespace.
105+
Namespace string
106+
107+
// NewCache is the function that will create the cache to be used
108+
// by the manager. If not set this will use the default new cache function.
109+
NewCache cache.NewCacheFunc
110+
111+
// ClientBuilder is the builder that creates the client to be used by the manager.
112+
// If not set this will create the default DelegatingClient that will
113+
// use the cache for reads and the client for writes.
114+
ClientBuilder ClientBuilder
115+
116+
// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
117+
// for the given objects.
118+
ClientDisableCacheFor []client.Object
119+
120+
// DryRunClient specifies whether the client should be configured to enforce
121+
// dryRun mode.
122+
DryRunClient bool
123+
124+
// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
125+
// Use this to customize the event correlator and spam filter
126+
//
127+
// Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers
128+
// is shorter than the lifetime of your process.
129+
EventBroadcaster record.EventBroadcaster
130+
131+
// makeBroadcaster allows deferring the creation of the broadcaster to
132+
// avoid leaking goroutines if we never call Start on this manager. It also
133+
// returns whether or not this is a "owned" broadcaster, and as such should be
134+
// stopped with the manager.
135+
makeBroadcaster intrec.EventBroadcasterProducer
136+
137+
// Dependency injection for testing
138+
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
139+
}
140+
141+
// Option can be used to manipulate Options
142+
type Option func(*Options)
143+
144+
// New constructs a brand new cluster
145+
func New(config *rest.Config, opts ...Option) (Cluster, error) {
146+
if config == nil {
147+
return nil, errors.New("must specify Config")
148+
}
149+
150+
options := Options{}
151+
for _, opt := range opts {
152+
opt(&options)
153+
}
154+
options = setOptionsDefaults(options)
155+
156+
// Create the mapper provider
157+
mapper, err := options.MapperProvider(config)
158+
if err != nil {
159+
options.Logger.Error(err, "Failed to get API Group-Resources")
160+
return nil, err
161+
}
162+
163+
// Create the cache for the cached read client and registering informers
164+
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
170+
171+
apiReader, err := client.New(config, clientOptions)
172+
if err != nil {
173+
return nil, err
174+
}
175+
176+
writeObj, err := options.ClientBuilder.
177+
WithUncached(options.ClientDisableCacheFor...).
178+
Build(cache, config, clientOptions)
179+
if err != nil {
180+
return nil, err
181+
}
182+
183+
if options.DryRunClient {
184+
writeObj = client.NewDryRunClient(writeObj)
185+
}
186+
187+
// Create the recorder provider to inject event recorders for the components.
188+
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
189+
// to the particular controller that it's being injected into, rather than a generic one like is here.
190+
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
191+
if err != nil {
192+
return nil, err
193+
}
194+
195+
return &cluster{
196+
config: config,
197+
scheme: options.Scheme,
198+
cache: cache,
199+
fieldIndexes: cache,
200+
client: writeObj,
201+
apiReader: apiReader,
202+
recorderProvider: recorderProvider,
203+
mapper: mapper,
204+
logger: options.Logger,
205+
}, nil
206+
}
207+
208+
// setOptionsDefaults set default values for Options fields
209+
func setOptionsDefaults(options Options) Options {
210+
// Use the Kubernetes client-go scheme if none is specified
211+
if options.Scheme == nil {
212+
options.Scheme = scheme.Scheme
213+
}
214+
215+
if options.MapperProvider == nil {
216+
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
217+
return apiutil.NewDynamicRESTMapper(c)
218+
}
219+
}
220+
221+
// Allow the client builder to be mocked
222+
if options.ClientBuilder == nil {
223+
options.ClientBuilder = NewClientBuilder()
224+
}
225+
226+
// Allow newCache to be mocked
227+
if options.NewCache == nil {
228+
options.NewCache = cache.New
229+
}
230+
231+
// Allow newRecorderProvider to be mocked
232+
if options.newRecorderProvider == nil {
233+
options.newRecorderProvider = intrec.NewProvider
234+
}
235+
236+
// This is duplicated with pkg/manager, we need it here to provide
237+
// the user with an EventBroadcaster and there for the Leader election
238+
if options.EventBroadcaster == nil {
239+
// defer initialization to avoid leaking by default
240+
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
241+
return record.NewBroadcaster(), true
242+
}
243+
} else {
244+
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
245+
return options.EventBroadcaster, false
246+
}
247+
}
248+
249+
if options.Logger == nil {
250+
options.Logger = logf.RuntimeLog.WithName("cluster")
251+
}
252+
253+
return options
254+
}

pkg/cluster/cluster_suite_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"net/http"
21+
"testing"
22+
23+
. "github.com/onsi/ginkgo"
24+
. "github.com/onsi/gomega"
25+
"k8s.io/client-go/kubernetes"
26+
"k8s.io/client-go/rest"
27+
"sigs.k8s.io/controller-runtime/pkg/envtest"
28+
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
29+
logf "sigs.k8s.io/controller-runtime/pkg/log"
30+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
31+
)
32+
33+
func TestSource(t *testing.T) {
34+
RegisterFailHandler(Fail)
35+
suiteName := "Cluster Suite"
36+
RunSpecsWithDefaultAndCustomReporters(t, suiteName, []Reporter{printer.NewlineReporter{}, printer.NewProwReporter(suiteName)})
37+
}
38+
39+
var testenv *envtest.Environment
40+
var cfg *rest.Config
41+
var clientset *kubernetes.Clientset
42+
43+
// clientTransport is used to force-close keep-alives in tests that check for leaks
44+
var clientTransport *http.Transport
45+
46+
var _ = BeforeSuite(func(done Done) {
47+
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
48+
49+
testenv = &envtest.Environment{}
50+
51+
var err error
52+
cfg, err = testenv.Start()
53+
Expect(err).NotTo(HaveOccurred())
54+
55+
clientTransport = &http.Transport{}
56+
cfg.Transport = clientTransport
57+
58+
clientset, err = kubernetes.NewForConfig(cfg)
59+
Expect(err).NotTo(HaveOccurred())
60+
61+
close(done)
62+
}, 60)
63+
64+
var _ = AfterSuite(func() {
65+
Expect(testenv.Stop()).To(Succeed())
66+
})

0 commit comments

Comments
 (0)