Skip to content

Commit fd86f9d

Browse files
dymurrayShawn Hurley
authored andcommitted
pkg/ansible; Add watching of dependent resources (#857)
Description of the change: Adding support for the Ansible Operator to keep track of resources that are created by adding them for the controller to watch if we have already created a controller object for the specific GVK. Motivation for the change: Performance Enhancement.
1 parent 5b5d5ac commit fd86f9d

File tree

8 files changed

+171
-47
lines changed

8 files changed

+171
-47
lines changed

commands/operator-sdk/cmd/up/local.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,23 @@ func upLocalAnsible() {
165165
printVersion()
166166
log.Infof("watching namespace: %s", namespace)
167167
done := make(chan error)
168+
cMap := proxy.NewControllerMap()
168169

169170
// start the proxy
170171
err = proxy.Run(done, proxy.Options{
171-
Address: "localhost",
172-
Port: 8888,
173-
KubeConfig: mgr.GetConfig(),
174-
Cache: mgr.GetCache(),
175-
RESTMapper: mgr.GetRESTMapper(),
172+
Address: "localhost",
173+
Port: 8888,
174+
KubeConfig: mgr.GetConfig(),
175+
Cache: mgr.GetCache(),
176+
RESTMapper: mgr.GetRESTMapper(),
177+
ControllerMap: cMap,
176178
})
177179
if err != nil {
178180
log.Fatalf("error starting proxy: (%v)", err)
179181
}
180182

181183
// start the operator
182-
go ansibleOperator.Run(done, mgr, ansibleOperatorFlags)
184+
go ansibleOperator.Run(done, mgr, ansibleOperatorFlags, cMap)
183185

184186
// wait for either to finish
185187
err = <-done

pkg/ansible/controller/controller.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@ var log = logf.Log.WithName("ansible-controller")
3838

3939
// Options - options for your controller
4040
type Options struct {
41-
EventHandlers []events.EventHandler
42-
LoggingLevel events.LogLevel
43-
Runner runner.Runner
44-
GVK schema.GroupVersionKind
45-
ReconcilePeriod time.Duration
46-
ManageStatus bool
41+
EventHandlers []events.EventHandler
42+
LoggingLevel events.LogLevel
43+
Runner runner.Runner
44+
GVK schema.GroupVersionKind
45+
ReconcilePeriod time.Duration
46+
ManageStatus bool
47+
WatchDependentResources bool
4748
}
4849

4950
// Add - Creates a new ansible operator controller and adds it to the manager
50-
func Add(mgr manager.Manager, options Options) {
51+
func Add(mgr manager.Manager, options Options) *controller.Controller {
5152
log.Info("Watching resource", "Options.Group", options.GVK.Group, "Options.Version", options.GVK.Version, "Options.Kind", options.GVK.Kind)
5253
if options.EventHandlers == nil {
5354
options.EventHandlers = []events.EventHandler{}
@@ -63,6 +64,13 @@ func Add(mgr manager.Manager, options Options) {
6364
ManageStatus: options.ManageStatus,
6465
}
6566

67+
if mgr.GetScheme().IsVersionRegistered(schema.GroupVersion{
68+
Group: options.GVK.Group,
69+
Version: options.GVK.Version,
70+
}) {
71+
log.Info("Version already registered... skipping")
72+
return nil
73+
}
6674
// Register the GVK with the schema
6775
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
6876
metav1.AddToGroupVersion(mgr.GetScheme(), schema.GroupVersion{
@@ -84,4 +92,5 @@ func Add(mgr manager.Manager, options Options) {
8492
log.Error(err, "")
8593
os.Exit(1)
8694
}
95+
return &c
8796
}

pkg/ansible/operator/operator.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
package operator
1616

1717
import (
18+
"errors"
1819
"math/rand"
1920
"time"
2021

2122
"github.com/operator-framework/operator-sdk/pkg/ansible/controller"
2223
"github.com/operator-framework/operator-sdk/pkg/ansible/flags"
24+
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy"
2325
"github.com/operator-framework/operator-sdk/pkg/ansible/runner"
2426

2527
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -30,7 +32,7 @@ import (
3032
// Run - A blocking function which starts a controller-runtime manager
3133
// It starts an Operator by reading in the values in `./watches.yaml`, adds a controller
3234
// to the manager, and finally running the manager.
33-
func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags) {
35+
func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags, cMap *proxy.ControllerMap) {
3436
watches, err := runner.NewFromWatches(f.WatchesFile)
3537
if err != nil {
3638
logf.Log.WithName("manager").Error(err, "failed to get watches")
@@ -47,7 +49,12 @@ func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags) {
4749
ManageStatus: runner.GetManageStatus(),
4850
}
4951
applyFlagsToControllerOptions(f, &o)
50-
controller.Add(mgr, o)
52+
ctr := controller.Add(mgr, o)
53+
if ctr == nil {
54+
done <- errors.New("failed to add controller")
55+
return
56+
}
57+
cMap.Store(o.GVK, *ctr, runner.GetWatchDependentResources())
5158
}
5259
done <- mgr.Start(c)
5360
}

pkg/ansible/proxy/proxy.go

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"net/http"
2929
"net/http/httputil"
3030
"strings"
31+
"sync"
3132

3233
k8sRequest "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/requestfactory"
3334
"k8s.io/apimachinery/pkg/api/meta"
@@ -38,8 +39,18 @@ import (
3839
"k8s.io/client-go/rest"
3940
"sigs.k8s.io/controller-runtime/pkg/cache"
4041
"sigs.k8s.io/controller-runtime/pkg/client"
42+
"sigs.k8s.io/controller-runtime/pkg/controller"
43+
"sigs.k8s.io/controller-runtime/pkg/handler"
44+
"sigs.k8s.io/controller-runtime/pkg/source"
4145
)
4246

47+
// ControllerMap - map of GVK to controller
48+
type ControllerMap struct {
49+
sync.RWMutex
50+
internal map[schema.GroupVersionKind]controller.Controller
51+
watch map[schema.GroupVersionKind]bool
52+
}
53+
4354
// CacheResponseHandler will handle proxied requests and check if the requested
4455
// resource exists in our cache. If it does then there is no need to bombard
4556
// the APIserver with our request and we should write the response from the
@@ -126,7 +137,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
126137
// InjectOwnerReferenceHandler will handle proxied requests and inject the
127138
// owner refernece found in the authorization header. The Authorization is
128139
// then deleted so that the proxy can re-set with the correct authorization.
129-
func InjectOwnerReferenceHandler(h http.Handler) http.Handler {
140+
func InjectOwnerReferenceHandler(h http.Handler, cMap *ControllerMap) http.Handler {
130141
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
131142
if req.Method == http.MethodPost {
132143
log.Info("injecting owner reference")
@@ -178,6 +189,13 @@ func InjectOwnerReferenceHandler(h http.Handler) http.Handler {
178189
log.V(1).Info("serialized body", "Body", string(newBody))
179190
req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody))
180191
req.ContentLength = int64(len(newBody))
192+
// add watch for resource
193+
err = addWatchToController(owner, cMap, data)
194+
if err != nil {
195+
m := "could not add watch to controller"
196+
log.Error(err, m)
197+
http.Error(w, m, http.StatusInternalServerError)
198+
}
181199
}
182200
// Removing the authorization so that the proxy can set the correct authorization.
183201
req.Header.Del("Authorization")
@@ -200,6 +218,7 @@ type Options struct {
200218
KubeConfig *rest.Config
201219
Cache cache.Cache
202220
RESTMapper meta.RESTMapper
221+
ControllerMap *ControllerMap
203222
}
204223

205224
// Run will start a proxy server in a go routine that returns on the error
@@ -213,6 +232,9 @@ func Run(done chan error, o Options) error {
213232
if o.Handler != nil {
214233
server.Handler = o.Handler(server.Handler)
215234
}
235+
if o.ControllerMap == nil {
236+
return fmt.Errorf("failed to get controller map from options")
237+
}
216238

217239
if o.Cache == nil {
218240
// Need to initialize cache since we don't have one
@@ -236,7 +258,7 @@ func Run(done chan error, o Options) error {
236258
}
237259

238260
if !o.NoOwnerInjection {
239-
server.Handler = InjectOwnerReferenceHandler(server.Handler)
261+
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap)
240262
}
241263
// Always add cache handler
242264
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper)
@@ -251,3 +273,68 @@ func Run(done chan error, o Options) error {
251273
}()
252274
return nil
253275
}
276+
277+
func addWatchToController(owner metav1.OwnerReference, cMap *ControllerMap, resource *unstructured.Unstructured) error {
278+
gv, err := schema.ParseGroupVersion(owner.APIVersion)
279+
if err != nil {
280+
return err
281+
}
282+
gvk := schema.GroupVersionKind{
283+
Group: gv.Group,
284+
Version: gv.Version,
285+
Kind: owner.Kind,
286+
}
287+
c, watch, ok := cMap.Get(gvk)
288+
if !ok {
289+
return errors.New("failed to find controller in map")
290+
}
291+
// Add a watch to controller
292+
if watch {
293+
err = c.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: resource})
294+
if err != nil {
295+
return err
296+
}
297+
}
298+
return nil
299+
}
300+
301+
// NewControllerMap returns a new object that contains a mapping between GVK
302+
// and controller
303+
func NewControllerMap() *ControllerMap {
304+
return &ControllerMap{
305+
internal: make(map[schema.GroupVersionKind]controller.Controller),
306+
watch: make(map[schema.GroupVersionKind]bool),
307+
}
308+
}
309+
310+
// Get - Returns a controller given a GVK as the key. `watch` in the return
311+
// specifies whether or not the operator will watch dependent resources for
312+
// this controller. `ok` returns whether the query was successful. `controller`
313+
// is the associated controller-runtime controller object.
314+
func (cm *ControllerMap) Get(key schema.GroupVersionKind) (controller controller.Controller, watch, ok bool) {
315+
cm.RLock()
316+
defer cm.RUnlock()
317+
result, ok := cm.internal[key]
318+
if !ok {
319+
return result, false, ok
320+
}
321+
watch, ok = cm.watch[key]
322+
return result, watch, ok
323+
}
324+
325+
// Delete - Deletes associated GVK to controller mapping from the ControllerMap
326+
func (cm *ControllerMap) Delete(key schema.GroupVersionKind) {
327+
cm.Lock()
328+
defer cm.Unlock()
329+
delete(cm.internal, key)
330+
}
331+
332+
// Store - Adds a new GVK to controller mapping. Also creates a mapping between
333+
// GVK and a boolean `watch` that specifies whether this controller is watching
334+
// dependent resources.
335+
func (cm *ControllerMap) Store(key schema.GroupVersionKind, value controller.Controller, watch bool) {
336+
cm.Lock()
337+
defer cm.Unlock()
338+
cm.internal[key] = value
339+
cm.watch[key] = watch
340+
}

pkg/ansible/proxy/proxy_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ func TestHandler(t *testing.T) {
3636
t.Fatalf("failed to instantiate manager: %v", err)
3737
}
3838
done := make(chan error)
39+
cMap := NewControllerMap()
3940
err = Run(done, Options{
40-
Address: "localhost",
41-
Port: 8888,
42-
KubeConfig: mgr.GetConfig(),
43-
Cache: nil,
44-
RESTMapper: mgr.GetRESTMapper(),
41+
Address: "localhost",
42+
Port: 8888,
43+
KubeConfig: mgr.GetConfig(),
44+
Cache: nil,
45+
RESTMapper: mgr.GetRESTMapper(),
46+
ControllerMap: cMap,
4547
})
4648
if err != nil {
4749
t.Fatalf("error starting proxy: %v", err)

pkg/ansible/runner/fake/runner.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import (
2525

2626
// Runner - implements the Runner interface for a GVK that's being watched.
2727
type Runner struct {
28-
Finalizer string
29-
ReconcilePeriod time.Duration
30-
ManageStatus bool
28+
Finalizer string
29+
ReconcilePeriod time.Duration
30+
ManageStatus bool
31+
WatchDependentResources bool
3132
// Used to send error if Run should fail.
3233
Error error
3334
// Job Events that will be sent back from the runs channel
@@ -77,6 +78,11 @@ func (r *Runner) GetManageStatus() bool {
7778
return r.ManageStatus
7879
}
7980

81+
// GetWatchDependentResources - get watchDependentResources.
82+
func (r *Runner) GetWatchDependentResources() bool {
83+
return r.WatchDependentResources
84+
}
85+
8086
// GetFinalizer - gets the fake finalizer.
8187
func (r *Runner) GetFinalizer() (string, bool) {
8288
return r.Finalizer, r.Finalizer != ""

pkg/ansible/runner/runner.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,21 @@ type Runner interface {
4444
GetFinalizer() (string, bool)
4545
GetReconcilePeriod() (time.Duration, bool)
4646
GetManageStatus() bool
47+
GetWatchDependentResources() bool
4748
}
4849

4950
// watch holds data used to create a mapping of GVK to ansible playbook or role.
5051
// The mapping is used to compose an ansible operator.
5152
type watch struct {
52-
Version string `yaml:"version"`
53-
Group string `yaml:"group"`
54-
Kind string `yaml:"kind"`
55-
Playbook string `yaml:"playbook"`
56-
Role string `yaml:"role"`
57-
ReconcilePeriod string `yaml:"reconcilePeriod"`
58-
ManageStatus bool `yaml:"manageStatus"`
59-
Finalizer *Finalizer `yaml:"finalizer"`
53+
Version string `yaml:"version"`
54+
Group string `yaml:"group"`
55+
Kind string `yaml:"kind"`
56+
Playbook string `yaml:"playbook"`
57+
Role string `yaml:"role"`
58+
ReconcilePeriod string `yaml:"reconcilePeriod"`
59+
ManageStatus bool `yaml:"manageStatus"`
60+
WatchDependentResources bool `yaml:"watchDependentResources"`
61+
Finalizer *Finalizer `yaml:"finalizer"`
6062
}
6163

6264
// Finalizer - Expose finalizer to be used by a user.
@@ -69,8 +71,9 @@ type Finalizer struct {
6971

7072
// UnmarshalYaml - implements the yaml.Unmarshaler interface
7173
func (w *watch) UnmarshalYAML(unmarshal func(interface{}) error) error {
72-
// by default, the operator will manage status
74+
// by default, the operator will manage status and watch dependent resources
7375
w.ManageStatus = true
76+
w.WatchDependentResources = true
7477

7578
// hide watch data in plain struct to prevent unmarshal from calling
7679
// UnmarshalYAML again
@@ -185,13 +188,14 @@ func NewForRole(path string, gvk schema.GroupVersionKind, finalizer *Finalizer,
185188

186189
// runner - implements the Runner interface for a GVK that's being watched.
187190
type runner struct {
188-
Path string // path on disk to a playbook or role depending on what cmdFunc expects
189-
GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path
190-
Finalizer *Finalizer
191-
cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner
192-
finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd
193-
reconcilePeriod *time.Duration
194-
manageStatus bool
191+
Path string // path on disk to a playbook or role depending on what cmdFunc expects
192+
GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path
193+
Finalizer *Finalizer
194+
cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner
195+
finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd
196+
reconcilePeriod *time.Duration
197+
manageStatus bool
198+
watchDependentResources bool
195199
}
196200

197201
func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig string) (RunResult, error) {
@@ -281,6 +285,11 @@ func (r *runner) GetManageStatus() bool {
281285
return r.manageStatus
282286
}
283287

288+
// GetWatchDependentResources - get the watch dependent resources value
289+
func (r *runner) GetWatchDependentResources() bool {
290+
return r.watchDependentResources
291+
}
292+
284293
func (r *runner) GetFinalizer() (string, bool) {
285294
if r.Finalizer != nil {
286295
return r.Finalizer.Name, true

0 commit comments

Comments
 (0)