Skip to content

Commit 4a54ca1

Browse files
committed
Add the ability to set watch timeouts.
Allow setting shorter watch timeout for the firt time alone. What this PR does / why we need it: We have seen in some evnironments (where requests are tunnelled) setting up the (first) watch is blocking if there are no resources available to watch. This causes the apply loop to be blocked when setting up watches before apply.
1 parent b758057 commit 4a54ca1

File tree

1 file changed

+49
-3
lines changed

1 file changed

+49
-3
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"sync/atomic"
2324
"time"
2425

2526
"k8s.io/apimachinery/pkg/api/meta"
@@ -35,8 +36,19 @@ import (
3536
"sigs.k8s.io/controller-runtime/pkg/log"
3637
)
3738

38-
// WatchDelay is the time between a Watch being dropped and attempting to resume it
39-
const WatchDelay = 30 * time.Second
39+
var (
40+
// WatchActivityTimeout sets a timeout for a Watch activity under normal operation
41+
WatchActivityTimeout = 300 * time.Second
42+
// WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path
43+
// We expect the author to set this to a lower value in environments where it makes sense.
44+
// func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... }
45+
WatchActivityFirstTimeout = 300 * time.Second
46+
)
47+
48+
const (
49+
// WatchDelay is the time between a Watch being dropped and attempting to resume it
50+
WatchDelay = 30 * time.Second
51+
)
4052

4153
// NewDynamicWatch constructs a watcher for unstructured objects.
4254
// Deprecated: avoid using directly; will move to internal in future.
@@ -138,13 +150,46 @@ type clientObject struct {
138150
//
139151
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
140152
func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) {
153+
var sawActivity atomic.Bool
154+
141155
log := log.FromContext(ctx)
142156

143157
options := w.FilterOptions
144158
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
145159
options.AllowWatchBookmarks = true
146160

147-
events, err := w.resource.Watch(context.TODO(), options)
161+
activityTimeout := WatchActivityTimeout
162+
if watchStarted != nil {
163+
activityTimeout = WatchActivityFirstTimeout
164+
}
165+
ctx, cancel := context.WithCancel(ctx)
166+
defer cancel()
167+
// Check for events periodically
168+
ticker := time.NewTicker(activityTimeout)
169+
defer ticker.Stop()
170+
sawActivity.Store(false)
171+
172+
go func() {
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return
177+
case <-ticker.C:
178+
if !sawActivity.Load() {
179+
log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch")
180+
cancel()
181+
return
182+
}
183+
sawActivity.Store(false)
184+
}
185+
}
186+
}()
187+
188+
events, err := w.resource.Watch(ctx, options)
189+
// If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context
190+
// We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return
191+
sawActivity.Store(true)
192+
148193
if watchStarted != nil {
149194
watchStarted.Done()
150195
}
@@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met
159204
defer events.Stop()
160205

161206
for clientEvent := range events.ResultChan() {
207+
sawActivity.Store(true)
162208
switch clientEvent.Type {
163209
case watch.Bookmark:
164210
// not an object change, we ignore it

0 commit comments

Comments
 (0)