diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index 6d62ae1537..49dbb4dab1 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -48,8 +48,6 @@ public class ReflectorRunnable< private boolean isLastSyncResourceVersionUnavailable; - private Watchable watch; - private ListerWatcher listerWatcher; private DeltaFIFO store; @@ -87,30 +85,13 @@ public void run() { log.info("{}#Start listing and watching...", apiTypeClass); try { - ApiListType list = - listerWatcher.list( - new CallGeneratorParams(Boolean.FALSE, getRelistResourceVersion(), null)); - - V1ListMeta listMeta = list.getMetadata(); - String resourceVersion = listMeta.getResourceVersion(); - List items = list.getItems(); - - if (log.isDebugEnabled()) { - log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion); - } - this.syncWith(items, resourceVersion); - this.lastSyncResourceVersion = resourceVersion; + this.lastSyncResourceVersion = initialLoad(); this.isLastSyncResourceVersionUnavailable = false; - + Watchable watch = null; if (log.isDebugEnabled()) { log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion); } - while (true) { - if (!isActive.get()) { - closeWatch(); - return; - } - + while (isActive.get()) { try { if (log.isDebugEnabled()) { log.debug( @@ -120,21 +101,14 @@ public void run() { long jitteredWatchTimeoutSeconds = Double.valueOf(REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT.getSeconds() * (1 + Math.random())) .longValue(); - Watchable newWatch = + watch = listerWatcher.watch( new CallGeneratorParams( Boolean.TRUE, lastSyncResourceVersion, Long.valueOf(jitteredWatchTimeoutSeconds).intValue())); - synchronized (this) { - if (!isActive.get()) { - newWatch.close(); - continue; - } - watch = newWatch; - } - watchHandler(newWatch); + watchHandler(watch); } catch (WatchExpiredException e) { // Watch calls were failed due to expired resource-version. Returning // to unwind the list-watch loops so that we can respawn a new round @@ -165,7 +139,7 @@ public void run() { this.exceptionHandler.accept(apiTypeClass, t); return; } finally { - closeWatch(); + closeWatch(watch); } } } catch (ApiException e) { @@ -185,19 +159,38 @@ public void run() { public void stop() { try { isActive.set(false); - closeWatch(); } catch (Throwable t) { this.exceptionHandler.accept(apiTypeClass, t); } } - private synchronized void closeWatch() throws IOException { - if (watch != null) { - watch.close(); - watch = null; + private synchronized void closeWatch(Watchable watch) { + try { + if (watch != null) { + watch.close(); + watch = null; + } + } catch (IOException e) { + log.warn("{}#Error while closing watcher", this.apiTypeClass, e); } } + private String initialLoad() throws ApiException { + ApiListType list = + listerWatcher.list( + new CallGeneratorParams(Boolean.FALSE, getRelistResourceVersion(), null)); + + V1ListMeta listMeta = list.getMetadata(); + String resourceVersion = listMeta.getResourceVersion(); + List items = list.getItems(); + + if (log.isDebugEnabled()) { + log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion); + } + this.syncWith(items, resourceVersion); + return resourceVersion; + } + private void syncWith(List items, String resourceVersion) { this.store.replace( (List) items, resourceVersion); // down-casting is safe here @@ -278,6 +271,7 @@ private void watchHandler(Watchable watch) { log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion); } } + closeWatch(watch); } static void defaultWatchErrorHandler(