Skip to content

Commit 07216dd

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related #607
1 parent e6c3d13 commit 07216dd

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

pkg/manager/internal.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const (
5353
defaultRenewDeadline = 10 * time.Second
5454
defaultRetryPeriod = 2 * time.Second
5555
defaultGracefulShutdownPeriod = 30 * time.Second
56+
defaultHookPeriod = 15 * time.Second
5657

5758
defaultReadinessEndpoint = "/readyz"
5859
defaultLivenessEndpoint = "/healthz"
@@ -161,6 +162,13 @@ type controllerManager struct {
161162
// internalProceduresStop channel is used internally to the manager when coordinating
162163
// the proper shutdown of servers. This channel is also used for dependency injection.
163164
internalProceduresStop chan struct{}
165+
166+
// prestartHooks are functions that are run immediately before calling the Start functions
167+
// of the leader election runnables.
168+
prestartHooks []Runnable
169+
170+
// hookTimeout is the duration given to each hook to return successfully.
171+
hookTimeout time.Duration
164172
}
165173

166174
type hasCache interface {
@@ -235,6 +243,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
235243
return cm.cluster.GetHTTPClient()
236244
}
237245

246+
// Hook allows you to add hooks.
247+
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
248+
cm.Lock()
249+
defer cm.Unlock()
250+
251+
if cm.started {
252+
return fmt.Errorf("unable to add new hook because the manager has already been started")
253+
}
254+
255+
switch hook {
256+
case HookPrestartType:
257+
cm.prestartHooks = append(cm.prestartHooks, runnable)
258+
}
259+
260+
return nil
261+
}
262+
238263
func (cm *controllerManager) GetConfig() *rest.Config {
239264
return cm.cluster.GetConfig()
240265
}
@@ -615,6 +640,27 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector,
615640
}
616641

617642
func (cm *controllerManager) startLeaderElectionRunnables() error {
643+
cm.logger.Info("Running prestart hooks")
644+
for _, hook := range cm.prestartHooks {
645+
var ctx context.Context
646+
var cancel context.CancelFunc
647+
648+
if cm.hookTimeout < 0 {
649+
ctx, cancel = context.WithCancel(cm.internalCtx)
650+
} else {
651+
ctx, cancel = context.WithTimeout(cm.internalCtx, cm.hookTimeout)
652+
}
653+
654+
if err := hook.Start(ctx); err != nil {
655+
cancel()
656+
return err
657+
}
658+
cancel()
659+
}
660+
661+
// All the prestart hooks have ben run, clear the slice to free the underlying resources.
662+
cm.prestartHooks = nil
663+
618664
return cm.runnables.LeaderElection.Start(cm.internalCtx)
619665
}
620666

pkg/manager/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ type Manager interface {
7979
// AddReadyzCheck allows you to add Readyz checker
8080
AddReadyzCheck(name string, check healthz.Checker) error
8181

82+
// Hook allows to add Runnables as hooks to modify the behavior.
83+
Hook(hook HookType, runnable Runnable) error
84+
8285
// Start starts all registered Controllers and blocks until the context is cancelled.
8386
// Returns an error if there is an error starting any controller.
8487
//
@@ -269,6 +272,10 @@ type Options struct {
269272
// +optional
270273
Controller config.Controller
271274

275+
// HookTimeout is the duration given to each hook to return successfully.
276+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
277+
HookTimeout *time.Duration
278+
272279
// makeBroadcaster allows deferring the creation of the broadcaster to
273280
// avoid leaking goroutines if we never call Start on this manager. It also
274281
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -283,6 +290,15 @@ type Options struct {
283290
newPprofListener func(addr string) (net.Listener, error)
284291
}
285292

293+
// HookType defines hooks for use with AddHook.
294+
type HookType int
295+
296+
const (
297+
// HookPrestartType defines a hook that is run after leader election and immediately before
298+
// calling Start on the runnables that needed leader election.
299+
HookPrestartType HookType = iota
300+
)
301+
286302
// BaseContextFunc is a function used to provide a base Context to Runnables
287303
// managed by a Manager.
288304
type BaseContextFunc func() context.Context
@@ -438,6 +454,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
438454
livenessEndpointName: options.LivenessEndpointName,
439455
pprofListener: pprofListener,
440456
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
457+
hookTimeout: *options.HookTimeout,
441458
internalProceduresStop: make(chan struct{}),
442459
leaderElectionStopped: make(chan struct{}),
443460
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -539,6 +556,11 @@ func setOptionsDefaults(options Options) Options {
539556
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
540557
}
541558

559+
if options.HookTimeout == nil {
560+
hookTimeout := defaultHookPeriod
561+
options.HookTimeout = &hookTimeout
562+
}
563+
542564
if options.Logger.GetSink() == nil {
543565
options.Logger = log.Log
544566
}

pkg/manager/manager_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,121 @@ var _ = Describe("manger.Manager", func() {
11971197
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
11981198
})
11991199

1200+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1201+
m, err := New(cfg, options)
1202+
Expect(err).NotTo(HaveOccurred())
1203+
for _, cb := range callbacks {
1204+
cb(m)
1205+
}
1206+
1207+
runnableRan := make(chan struct{})
1208+
1209+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1210+
close(runnableRan)
1211+
return nil
1212+
}))).ToNot(HaveOccurred())
1213+
1214+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1215+
Expect(m.Elected()).ShouldNot(BeClosed())
1216+
Consistently(runnableRan).ShouldNot(BeClosed())
1217+
return nil
1218+
}))).ToNot(HaveOccurred())
1219+
1220+
ctx, cancel := context.WithCancel(context.Background())
1221+
defer cancel()
1222+
go func() {
1223+
defer GinkgoRecover()
1224+
Expect(m.Elected()).ShouldNot(BeClosed())
1225+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1226+
}()
1227+
1228+
<-m.Elected()
1229+
})
1230+
1231+
It("should run prestart hooks with timeout", func() {
1232+
m, err := New(cfg, options)
1233+
Expect(err).NotTo(HaveOccurred())
1234+
for _, cb := range callbacks {
1235+
cb(m)
1236+
}
1237+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1238+
1239+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1240+
select {
1241+
case <-ctx.Done():
1242+
return ctx.Err()
1243+
case <-time.After(1 * time.Second):
1244+
return errors.New("prestart hook timeout exceeded expected")
1245+
}
1246+
}))).ToNot(HaveOccurred())
1247+
1248+
ctx, cancel := context.WithCancel(context.Background())
1249+
defer cancel()
1250+
1251+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1252+
})
1253+
1254+
It("should run prestart hooks without timeout", func() {
1255+
m, err := New(cfg, options)
1256+
Expect(err).NotTo(HaveOccurred())
1257+
for _, cb := range callbacks {
1258+
cb(m)
1259+
}
1260+
m.(*controllerManager).hookTimeout = -1 * time.Second
1261+
1262+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1263+
fmt.Println("runnable returning")
1264+
return nil
1265+
}))).ToNot(HaveOccurred())
1266+
1267+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1268+
select {
1269+
case <-ctx.Done():
1270+
return ctx.Err()
1271+
case <-time.After(1 * time.Second):
1272+
fmt.Println("prestart hook returning")
1273+
return nil
1274+
}
1275+
}))).ToNot(HaveOccurred())
1276+
1277+
ctx, cancel := context.WithCancel(context.Background())
1278+
defer cancel()
1279+
1280+
go func() {
1281+
defer GinkgoRecover()
1282+
Expect(m.Elected()).ShouldNot(BeClosed())
1283+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1284+
}()
1285+
1286+
<-m.Elected()
1287+
})
1288+
1289+
It("should not run leader election runnables if prestart hooks fail", func() {
1290+
m, err := New(cfg, options)
1291+
Expect(err).NotTo(HaveOccurred())
1292+
for _, cb := range callbacks {
1293+
cb(m)
1294+
}
1295+
1296+
runnableRan := make(chan struct{})
1297+
1298+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1299+
close(runnableRan)
1300+
return nil
1301+
}))).ToNot(HaveOccurred())
1302+
1303+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1304+
Expect(m.Elected()).ShouldNot(BeClosed())
1305+
Consistently(runnableRan).ShouldNot(BeClosed())
1306+
return errors.New("prestart hook failed")
1307+
}))).ToNot(HaveOccurred())
1308+
1309+
ctx, cancel := context.WithCancel(context.Background())
1310+
defer cancel()
1311+
1312+
Expect(m.Elected()).ShouldNot(BeClosed())
1313+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1314+
})
12001315
}
12011316

12021317
Context("with defaults", func() {

0 commit comments

Comments
 (0)