@@ -19,6 +19,7 @@ package watch
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "sync"
22
23
"time"
23
24
24
25
"k8s.io/apimachinery/pkg/api/meta"
@@ -100,16 +101,25 @@ func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.List
100
101
return fmt .Errorf ("creating client for (%s): %v" , trigger .String (), err )
101
102
}
102
103
104
+ var watchStarted sync.WaitGroup
105
+
106
+ watchStarted .Add (1 )
107
+
103
108
go func () {
109
+ firstWatchStarted := & watchStarted
110
+
104
111
for {
105
112
ctx := context .TODO ()
106
113
107
- dkw .watchUntilClosed (ctx , target )
114
+ dkw .watchUntilClosed (ctx , target , firstWatchStarted )
115
+ firstWatchStarted = nil // only notify once
108
116
109
117
time .Sleep (WatchDelay )
110
118
}
111
119
}()
112
120
121
+ watchStarted .Wait ()
122
+
113
123
return nil
114
124
}
115
125
@@ -127,14 +137,17 @@ type clientObject struct {
127
137
// from this Watch but it will ensure we always Reconcile when needed`.
128
138
//
129
139
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
130
- func (w * dynamicKindWatch ) watchUntilClosed (ctx context.Context , eventTarget metav1.ObjectMeta ) {
140
+ func (w * dynamicKindWatch ) watchUntilClosed (ctx context.Context , eventTarget metav1.ObjectMeta , watchStarted * sync. WaitGroup ) {
131
141
log := log .FromContext (ctx )
132
142
133
143
options := w .FilterOptions
134
144
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
135
145
options .AllowWatchBookmarks = true
136
146
137
147
events , err := w .resource .Watch (context .TODO (), options )
148
+ if watchStarted != nil {
149
+ watchStarted .Done ()
150
+ }
138
151
if err != nil {
139
152
log .WithValues ("kind" , w .GVK .String ()).WithValues ("namespace" , w .FilterNamespace ).WithValues ("labels" , options .LabelSelector ).Error (err , "failed to add watch to dynamic client" )
140
153
return
0 commit comments