Skip to content

Commit ab15cbf

Browse files
author
Shawn Hurley
authored
pkg/ansible/proxy: adding ability for dependent watches to be recovered (#1039)
**Description of the change:** This will allow ansible operator to re-watch dependent resources on an operator pod restart or on the loss of leader status. **Motivation for the change:** Operator pods are expected to be restarted or to lose leader status
1 parent 3a35ff6 commit ab15cbf

File tree

2 files changed

+170
-86
lines changed

2 files changed

+170
-86
lines changed

pkg/ansible/proxy/proxy.go

Lines changed: 116 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type marshaler interface {
5555
// resource exists in our cache. If it does then there is no need to bombard
5656
// the APIserver with our request and we should write the response from the
5757
// proxy.
58-
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
58+
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}, cMap *controllermap.ControllerMap, injectOwnerRef bool) http.Handler {
5959
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
6060
switch req.Method {
6161
case http.MethodGet:
@@ -138,17 +138,39 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
138138
}
139139
m = &un
140140
} else {
141-
un := unstructured.Unstructured{}
141+
un := &unstructured.Unstructured{}
142142
un.SetGroupVersionKind(k)
143143
obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name}
144-
err = informerCache.Get(context.Background(), obj, &un)
144+
err = informerCache.Get(context.Background(), obj, un)
145145
if err != nil {
146146
// break here in case resource doesn't exist in cache but exists on APIserver
147147
// This is very unlikely but provides user with expected 404
148148
log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj))
149149
break
150150
}
151-
m = &un
151+
m = un
152+
// Once we get the resource, we are going to attempt to recover the dependent watches here,
153+
// This will happen in the background, and log errors.
154+
if injectOwnerRef {
155+
go func() {
156+
ownerRef, err := getRequestOwnerRef(req)
157+
if err != nil {
158+
log.Error(err, "Could not get ownerRef from proxy")
159+
return
160+
}
161+
162+
for _, oRef := range un.GetOwnerReferences() {
163+
if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind {
164+
err := addWatchToController(ownerRef, cMap, un, restMapper)
165+
if err != nil {
166+
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
167+
return
168+
}
169+
}
170+
}
171+
}()
172+
}
173+
152174
}
153175

154176
i := bytes.Buffer{}
@@ -184,7 +206,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
184206
// InjectOwnerReferenceHandler will handle proxied requests and inject the
185207
// owner refernece found in the authorization header. The Authorization is
186208
// then deleted so that the proxy can re-set with the correct authorization.
187-
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper) http.Handler {
209+
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
188210
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
189211
switch req.Method {
190212
case http.MethodPost:
@@ -203,29 +225,9 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
203225
break
204226
}
205227
log.Info("Injecting owner reference")
206-
207-
user, _, ok := req.BasicAuth()
208-
if !ok {
209-
log.Error(errors.New("basic auth header not found"), "")
210-
w.Header().Set("WWW-Authenticate", "Basic realm=\"Operator Proxy\"")
211-
http.Error(w, "", http.StatusUnauthorized)
212-
return
213-
}
214-
authString, err := base64.StdEncoding.DecodeString(user)
228+
owner, err := getRequestOwnerRef(req)
215229
if err != nil {
216-
m := "Could not base64 decode username"
217-
log.Error(err, m)
218-
http.Error(w, m, http.StatusBadRequest)
219-
return
220-
}
221-
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
222-
// as a subset along with the Namespace of the owner. Please see the
223-
// kubeconfig.NamespacedOwnerReference type for more information. The
224-
// namespace is required when creating the reconcile requests.
225-
owner := kubeconfig.NamespacedOwnerReference{}
226-
json.Unmarshal(authString, &owner)
227-
if err := json.Unmarshal(authString, &owner); err != nil {
228-
m := "Could not unmarshal auth string"
230+
m := "Could not get owner reference"
229231
log.Error(err, m)
230232
http.Error(w, m, http.StatusInternalServerError)
231233
return
@@ -257,35 +259,23 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
257259
log.V(1).Info("Serialized body", "Body", string(newBody))
258260
req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody))
259261
req.ContentLength = int64(len(newBody))
260-
dataMapping, err := restMapper.RESTMapping(data.GroupVersionKind().GroupKind(), data.GroupVersionKind().Version)
261-
if err != nil {
262-
m := fmt.Sprintf("Could not get rest mapping for: %v", data.GroupVersionKind())
263-
log.Error(err, m)
264-
http.Error(w, m, http.StatusInternalServerError)
265-
return
266-
}
267-
// We need to determine whether or not the owner is a cluster-scoped
268-
// resource because enqueue based on an owner reference does not work if
269-
// a namespaced resource owns a cluster-scoped resource
270-
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
271-
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
272-
if err != nil {
273-
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
274-
log.Error(err, m)
275-
http.Error(w, m, http.StatusInternalServerError)
276-
return
277-
}
278262

279-
dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
280-
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
281-
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
282263
// add watch for resource
283-
err = addWatchToController(owner, cMap, data, useOwnerReference)
284-
if err != nil {
285-
m := "could not add watch to controller"
286-
log.Error(err, m)
287-
http.Error(w, m, http.StatusInternalServerError)
288-
return
264+
// check if resource doesn't exist in watched namespaces
265+
// if watchedNamespaces[""] exists then we are watching all namespaces
266+
// and want to continue
267+
// This is making sure we are not attempting to watch a resource outside of the
268+
// namespaces that the cache can watch.
269+
_, allNsPresent := watchedNamespaces[metav1.NamespaceAll]
270+
_, reqNsPresent := watchedNamespaces[r.Namespace]
271+
if allNsPresent || reqNsPresent {
272+
err = addWatchToController(owner, cMap, data, restMapper)
273+
if err != nil {
274+
m := "could not add watch to controller"
275+
log.Error(err, m)
276+
http.Error(w, m, http.StatusInternalServerError)
277+
return
278+
}
289279
}
290280
}
291281
// Removing the authorization so that the proxy can set the correct authorization.
@@ -294,6 +284,7 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
294284
})
295285
}
296286

287+
// RequestLogHandler - log the requests that come through the proxy.
297288
func RequestLogHandler(h http.Handler) http.Handler {
298289
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
299290
// read body
@@ -379,13 +370,13 @@ func Run(done chan error, o Options) error {
379370
}
380371

381372
if !o.NoOwnerInjection {
382-
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper)
373+
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper, watchedNamespaceMap)
383374
}
384375
if o.LogRequests {
385376
server.Handler = RequestLogHandler(server.Handler)
386377
}
387378
if !o.DisableCache {
388-
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap)
379+
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap, o.ControllerMap, !o.NoOwnerInjection)
389380
}
390381

391382
l, err := server.Listen(o.Address, o.Port)
@@ -399,57 +390,71 @@ func Run(done chan error, o Options) error {
399390
return nil
400391
}
401392

402-
func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, useOwnerReference bool) error {
403-
gv, err := schema.ParseGroupVersion(owner.APIVersion)
393+
func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper) error {
394+
dataMapping, err := restMapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version)
395+
if err != nil {
396+
m := fmt.Sprintf("Could not get rest mapping for: %v", resource.GroupVersionKind())
397+
log.Error(err, m)
398+
return err
399+
400+
}
401+
// We need to determine whether or not the owner is a cluster-scoped
402+
// resource because enqueue based on an owner reference does not work if
403+
// a namespaced resource owns a cluster-scoped resource
404+
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
404405
if err != nil {
406+
m := fmt.Sprintf("could not get broup version for: %v", owner)
407+
log.Error(err, m)
405408
return err
406409
}
407-
gvk := schema.GroupVersionKind{
408-
Group: gv.Group,
409-
Version: gv.Version,
410-
Kind: owner.Kind,
410+
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
411+
if err != nil {
412+
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
413+
log.Error(err, m)
414+
return err
411415
}
412-
contents, ok := cMap.Get(gvk)
416+
417+
dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
418+
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
419+
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
420+
contents, ok := cMap.Get(ownerMapping.GroupVersionKind)
413421
if !ok {
414422
return errors.New("failed to find controller in map")
415423
}
416424
wMap := contents.WatchMap
417425
uMap := contents.UIDMap
418-
// Store UID
419-
uMap.Store(owner.UID, types.NamespacedName{
420-
Name: owner.Name,
421-
Namespace: owner.Namespace,
422-
})
423426
u := &unstructured.Unstructured{}
424-
u.SetGroupVersionKind(gvk)
427+
u.SetGroupVersionKind(ownerMapping.GroupVersionKind)
425428
// Add a watch to controller
426429
if contents.WatchDependentResources {
427-
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources
428-
if useOwnerReference && !contents.WatchClusterScopedResources {
429-
_, exists := wMap.Get(resource.GroupVersionKind())
430-
// If already watching resource no need to add a new watch
431-
if exists {
432-
return nil
433-
}
430+
// Store UID
431+
uMap.Store(owner.UID, types.NamespacedName{
432+
Name: owner.Name,
433+
Namespace: owner.Namespace,
434+
})
435+
_, exists := wMap.Get(resource.GroupVersionKind())
436+
// If already watching resource no need to add a new watch
437+
if exists {
438+
return nil
439+
}
440+
// Store watch in map
441+
wMap.Store(resource.GroupVersionKind())
442+
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources and we have to
443+
if useOwnerReference {
434444
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
435445
// Store watch in map
436-
wMap.Store(resource.GroupVersionKind())
437446
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
438-
} else if contents.WatchClusterScopedResources {
439-
// Use Map func since EnqueuRequestForOwner won't work
440-
// Check if resource is already watched
441-
_, exists := wMap.Get(resource.GroupVersionKind())
442-
if exists {
443-
return nil
447+
if err != nil {
448+
return err
444449
}
450+
} else if contents.WatchClusterScopedResources {
445451
log.Info("Watching child resource which can be cluster-scoped", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
446-
// Store watch in map
447-
wMap.Store(resource.GroupVersionKind())
448452
// Add watch
449453
err = contents.Controller.Watch(
450454
&source.Kind{Type: resource},
455+
// Use Map func since EnqueuRequestForOwner won't work
451456
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
452-
log.V(2).Info("Creating reconcile request from object", "gvk", gvk, "name", a.Meta.GetName())
457+
log.V(2).Info("Creating reconcile request from object", "gvk", ownerMapping.GroupVersionKind, "name", a.Meta.GetName())
453458
ownRefs := a.Meta.GetOwnerReferences()
454459
for _, ref := range ownRefs {
455460
nn, exists := uMap.Get(ref.UID)
@@ -470,3 +475,28 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
470475
}
471476
return nil
472477
}
478+
479+
func getRequestOwnerRef(req *http.Request) (kubeconfig.NamespacedOwnerReference, error) {
480+
owner := kubeconfig.NamespacedOwnerReference{}
481+
user, _, ok := req.BasicAuth()
482+
if !ok {
483+
return owner, errors.New("basic auth header not found")
484+
}
485+
authString, err := base64.StdEncoding.DecodeString(user)
486+
if err != nil {
487+
m := "Could not base64 decode username"
488+
log.Error(err, m)
489+
return owner, err
490+
}
491+
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
492+
// as a subset along with the Namespace of the owner. Please see the
493+
// kubeconfig.NamespacedOwnerReference type for more information. The
494+
// namespace is required when creating the reconcile requests.
495+
json.Unmarshal(authString, &owner)
496+
if err := json.Unmarshal(authString, &owner); err != nil {
497+
m := "Could not unmarshal auth string"
498+
log.Error(err, m)
499+
return owner, err
500+
}
501+
return owner, err
502+
}

test/ansible-memcached/asserts.yml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,60 @@
6161
assert:
6262
that: lookup('k8s', kind='Service', api_version='v1', namespace=namespace, resource_name='test-service')
6363

64+
- when: molecule_yml.scenario.name == "test-local"
65+
block:
66+
- name: Restart the operator by killing the pod
67+
k8s:
68+
state: absent
69+
definition:
70+
api_version: v1
71+
kind: Pod
72+
metadata:
73+
namespace: '{{ namespace }}'
74+
name: '{{ pod.metadata.name }}'
75+
vars:
76+
pod: '{{ q("k8s", api_version="v1", kind="Pod", namespace=namespace, label_selector="name=memcached-operator").0 }}'
77+
78+
- name: Wait 2 minutes for operator deployment
79+
debug:
80+
var: deploy
81+
until: deploy and deploy.status and deploy.status.replicas == deploy.status.get("availableReplicas", 0)
82+
retries: 12
83+
delay: 10
84+
vars:
85+
deploy: '{{ lookup("k8s",
86+
kind="Deployment",
87+
api_version="apps/v1",
88+
namespace=namespace,
89+
resource_name="memcached-operator"
90+
)}}'
91+
92+
- name: Wait for reconcilation to have a chance at finishing
93+
pause:
94+
seconds: 15
95+
96+
- name: Delete the service that is created.
97+
k8s:
98+
kind: Service
99+
api_version: v1
100+
namespace: '{{ namespace }}'
101+
name: test-service
102+
state: absent
103+
104+
- name: Verify that test-service was re-created
105+
debug:
106+
var: service
107+
until: service
108+
retries: 12
109+
delay: 10
110+
vars:
111+
service: '{{ lookup("k8s",
112+
kind="Service",
113+
api_version="v1",
114+
namespace=namespace,
115+
resource_name="test-service",
116+
)}}'
117+
64118
- name: Delete the custom resource
65119
k8s:
66120
state: absent

0 commit comments

Comments
 (0)