Skip to content

Commit 8c858b3

Browse files
estrozDirectXMan12
authored andcommitted
pkg/client/apiutil: add a dynamic RESTMapper will reload the delegated
meta.RESTMapper on a cache miss. API calls will return ErrRateLimited if a rate limit is hit. This RESTMapper can be configured to be lazy.
1 parent 4ba0a3b commit 8c858b3

File tree

3 files changed

+325
-3
lines changed

3 files changed

+325
-3
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ require (
1818
go.uber.org/atomic v1.3.2 // indirect
1919
go.uber.org/multierr v1.1.0 // indirect
2020
go.uber.org/zap v1.9.1
21-
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect
22-
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
21+
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
22+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
2323
gomodules.xyz/jsonpatch/v2 v2.0.1
2424
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
2525
gopkg.in/fsnotify.v1 v1.4.7

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
213213
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
214214
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
215215
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
216+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
217+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
216218
gomodules.xyz/jsonpatch/v2 v2.0.1 h1:xyiBuvkD2g5n7cYzx6u2sxQvsAy4QJsZFCzGVdzOXZ0=
217219
gomodules.xyz/jsonpatch/v2 v2.0.1/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
218220
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
@@ -259,7 +261,6 @@ k8s.io/klog v0.3.3 h1:niceAagH1tzskmaie/icWd7ci1wbG7Bf2c6YGcQv+3c=
259261
k8s.io/klog v0.3.3/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
260262
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
261263
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
262-
k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4=
263264
k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
264265
k8s.io/utils v0.0.0-20190506122338-8fab8cb257d5 h1:VBM/0P5TWxwk+Nw6Z+lAw3DKgO76g90ETOiA6rfLV1Y=
265266
k8s.io/utils v0.0.0-20190506122338-8fab8cb257d5/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apiutil
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"golang.org/x/time/rate"
24+
"golang.org/x/xerrors"
25+
"k8s.io/apimachinery/pkg/api/meta"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
"k8s.io/client-go/discovery"
28+
"k8s.io/client-go/rest"
29+
"k8s.io/client-go/restmapper"
30+
)
31+
32+
// ErrRateLimited is returned by a RESTMapper method if the number of API
33+
// calls has exceeded a limit within a certain time period.
34+
type ErrRateLimited struct {
35+
// Duration to wait until the next API call can be made.
36+
Delay time.Duration
37+
}
38+
39+
func (e ErrRateLimited) Error() string {
40+
return "too many API calls to the RESTMapper within a timeframe"
41+
}
42+
43+
// DelayIfRateLimited returns the delay time until the next API call is
44+
// allowed and true if err is of type ErrRateLimited. The zero
45+
// time.Duration value and false are returned if err is not a ErrRateLimited.
46+
func DelayIfRateLimited(err error) (time.Duration, bool) {
47+
var rlerr ErrRateLimited
48+
if xerrors.As(err, &rlerr) {
49+
return rlerr.Delay, true
50+
}
51+
return 0, false
52+
}
53+
54+
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
55+
// types at runtime.
56+
type dynamicRESTMapper struct {
57+
mu sync.RWMutex // protects the following fields
58+
client discovery.DiscoveryInterface
59+
staticMapper meta.RESTMapper
60+
limiter *dynamicLimiter
61+
62+
lazy bool
63+
// Used for lazy init.
64+
initOnce sync.Once
65+
}
66+
67+
// WithLimiter sets the RESTMapper's underlying limiter to lim.
68+
func WithLimiter(lim *rate.Limiter) func(*dynamicRESTMapper) error {
69+
return func(drm *dynamicRESTMapper) error {
70+
drm.limiter = &dynamicLimiter{lim}
71+
return nil
72+
}
73+
}
74+
75+
// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings
76+
// until an API call is made.
77+
var WithLazyDiscovery = func(drm *dynamicRESTMapper) error {
78+
drm.lazy = true
79+
return nil
80+
}
81+
82+
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
83+
// RESTMapper dynamically discovers resource types at runtime. opts
84+
// configure the RESTMapper.
85+
func NewDynamicRESTMapper(cfg *rest.Config, opts ...func(*dynamicRESTMapper) error) (meta.RESTMapper, error) {
86+
client, err := discovery.NewDiscoveryClientForConfig(cfg)
87+
if err != nil {
88+
return nil, err
89+
}
90+
drm := &dynamicRESTMapper{
91+
client: client,
92+
limiter: &dynamicLimiter{
93+
rate.NewLimiter(rate.Limit(defaultLimitRate), defaultLimitSize),
94+
},
95+
}
96+
for _, opt := range opts {
97+
if err = opt(drm); err != nil {
98+
return nil, err
99+
}
100+
}
101+
if !drm.lazy {
102+
if err := drm.setStaticMapper(); err != nil {
103+
return nil, err
104+
}
105+
}
106+
return drm, nil
107+
}
108+
109+
var (
110+
// defaultLimitRate is the number of RESTMapper API calls allowed
111+
// per second assuming the rate of API calls <= defaultLimitRate.
112+
defaultLimitRate = 600
113+
// defaultLimitSize is the maximum number of simultaneous RESTMapper
114+
// API calls allowed.
115+
defaultLimitSize = 5
116+
)
117+
118+
// setStaticMapper sets drm's staticMapper by querying its client, regardless
119+
// of reload backoff.
120+
func (drm *dynamicRESTMapper) setStaticMapper() error {
121+
groupResources, err := restmapper.GetAPIGroupResources(drm.client)
122+
if err != nil {
123+
return err
124+
}
125+
drm.staticMapper = restmapper.NewDiscoveryRESTMapper(groupResources)
126+
return nil
127+
}
128+
129+
// init initializes drm only once if drm is lazy.
130+
func (drm *dynamicRESTMapper) init() (err error) {
131+
drm.initOnce.Do(func() {
132+
if drm.lazy {
133+
err = drm.setStaticMapper()
134+
}
135+
})
136+
return err
137+
}
138+
139+
// reload reloads the static RESTMapper, and will return an error only
140+
// if a rate limit has been hit. reload is thread-safe.
141+
func (drm *dynamicRESTMapper) reload() error {
142+
// limiter is thread-safe.
143+
if err := drm.limiter.checkRate(); err != nil {
144+
return err
145+
}
146+
// Lock here so callers can be rate-limited regardless of lock state.
147+
drm.mu.Lock()
148+
defer drm.mu.Unlock()
149+
return drm.setStaticMapper()
150+
}
151+
152+
// TODO: wrap reload errors on NoKindMatchError with go 1.13 errors.
153+
154+
func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
155+
if err := drm.init(); err != nil {
156+
return schema.GroupVersionKind{}, err
157+
}
158+
gvk, err := drm.kindFor(resource)
159+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
160+
if rerr := drm.reload(); rerr != nil {
161+
return schema.GroupVersionKind{}, rerr
162+
}
163+
gvk, err = drm.kindFor(resource)
164+
}
165+
return gvk, err
166+
}
167+
168+
// kindFor calls the underlying static RESTMapper's KindFor method in a
169+
// thread-safe manner.
170+
func (drm *dynamicRESTMapper) kindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
171+
drm.mu.RLock()
172+
defer drm.mu.RUnlock()
173+
return drm.staticMapper.KindFor(resource)
174+
}
175+
176+
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
177+
if err := drm.init(); err != nil {
178+
return nil, err
179+
}
180+
gvks, err := drm.kindsFor(resource)
181+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
182+
if rerr := drm.reload(); rerr != nil {
183+
return nil, rerr
184+
}
185+
gvks, err = drm.kindsFor(resource)
186+
}
187+
return gvks, err
188+
}
189+
190+
// kindsFor calls the underlying static RESTMapper's KindsFor method in a
191+
// thread-safe manner.
192+
func (drm *dynamicRESTMapper) kindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
193+
drm.mu.RLock()
194+
defer drm.mu.RUnlock()
195+
return drm.staticMapper.KindsFor(resource)
196+
}
197+
198+
func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
199+
if err := drm.init(); err != nil {
200+
return schema.GroupVersionResource{}, err
201+
}
202+
gvr, err := drm.resourceFor(input)
203+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
204+
if rerr := drm.reload(); rerr != nil {
205+
return schema.GroupVersionResource{}, rerr
206+
}
207+
gvr, err = drm.resourceFor(input)
208+
}
209+
return gvr, err
210+
}
211+
212+
// resourceFor calls the underlying static RESTMapper's ResourceFor method in a
213+
// thread-safe manner.
214+
func (drm *dynamicRESTMapper) resourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
215+
drm.mu.RLock()
216+
defer drm.mu.RUnlock()
217+
return drm.staticMapper.ResourceFor(input)
218+
}
219+
220+
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
221+
if err := drm.init(); err != nil {
222+
return nil, err
223+
}
224+
gvrs, err := drm.resourcesFor(input)
225+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
226+
if rerr := drm.reload(); rerr != nil {
227+
return nil, rerr
228+
}
229+
gvrs, err = drm.resourcesFor(input)
230+
}
231+
return gvrs, err
232+
}
233+
234+
// resourcesFor calls the underlying static RESTMapper's ResourcesFor method
235+
// in a thread-safe manner.
236+
func (drm *dynamicRESTMapper) resourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
237+
drm.mu.RLock()
238+
defer drm.mu.RUnlock()
239+
return drm.staticMapper.ResourcesFor(input)
240+
}
241+
242+
func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
243+
if err := drm.init(); err != nil {
244+
return nil, err
245+
}
246+
mapping, err := drm.restMapping(gk, versions...)
247+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
248+
if rerr := drm.reload(); rerr != nil {
249+
return nil, rerr
250+
}
251+
mapping, err = drm.restMapping(gk, versions...)
252+
}
253+
return mapping, err
254+
}
255+
256+
// restMapping calls the underlying static RESTMapper's RESTMapping method
257+
// in a thread-safe manner.
258+
func (drm *dynamicRESTMapper) restMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
259+
drm.mu.RLock()
260+
defer drm.mu.RUnlock()
261+
return drm.staticMapper.RESTMapping(gk, versions...)
262+
}
263+
264+
func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
265+
if err := drm.init(); err != nil {
266+
return nil, err
267+
}
268+
mappings, err := drm.restMappings(gk, versions...)
269+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
270+
if rerr := drm.reload(); rerr != nil {
271+
return nil, rerr
272+
}
273+
mappings, err = drm.restMappings(gk, versions...)
274+
}
275+
return mappings, err
276+
}
277+
278+
// restMappings calls the underlying static RESTMapper's RESTMappings method
279+
// in a thread-safe manner.
280+
func (drm *dynamicRESTMapper) restMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
281+
drm.mu.RLock()
282+
defer drm.mu.RUnlock()
283+
return drm.staticMapper.RESTMappings(gk, versions...)
284+
}
285+
286+
func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) {
287+
if err := drm.init(); err != nil {
288+
return "", err
289+
}
290+
singular, err := drm.resourceSingularizer(resource)
291+
if xerrors.Is(err, &meta.NoKindMatchError{}) {
292+
if rerr := drm.reload(); rerr != nil {
293+
return "", rerr
294+
}
295+
singular, err = drm.resourceSingularizer(resource)
296+
}
297+
return singular, err
298+
}
299+
300+
// resourceSingularizer calls the underlying static RESTMapper's
301+
// ResourceSingularizer method in a thread-safe manner.
302+
func (drm *dynamicRESTMapper) resourceSingularizer(resource string) (string, error) {
303+
drm.mu.RLock()
304+
defer drm.mu.RUnlock()
305+
return drm.staticMapper.ResourceSingularizer(resource)
306+
}
307+
308+
// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users.
309+
type dynamicLimiter struct {
310+
*rate.Limiter
311+
}
312+
313+
// checkRate returns an ErrRateLimited if too many API calls have been made
314+
// within the set limit.
315+
func (b *dynamicLimiter) checkRate() error {
316+
res := b.Reserve()
317+
if res.Delay() == 0 {
318+
return nil
319+
}
320+
return ErrRateLimited{res.Delay()}
321+
}

0 commit comments

Comments
 (0)