From a996bcce2c3500d6301c03478d2ea4d446ef7d7a Mon Sep 17 00:00:00 2001 From: Rafael Ribeiro Date: Tue, 24 Jun 2025 23:00:09 +0100 Subject: [PATCH 1/4] separate initial load to avoid retaining list items in memory --- .../informer/cache/ReflectorRunnable.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index 6d62ae1537..dd5e82fe58 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -12,6 +12,18 @@ */ package io.kubernetes.client.informer.cache; +import java.io.IOException; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.kubernetes.client.common.KubernetesListObject; import io.kubernetes.client.common.KubernetesObject; import io.kubernetes.client.informer.EventType; @@ -23,16 +35,6 @@ import io.kubernetes.client.util.CallGeneratorParams; import io.kubernetes.client.util.Strings; import io.kubernetes.client.util.Watchable; -import java.io.IOException; -import java.net.ConnectException; -import java.net.HttpURLConnection; -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ReflectorRunnable< ApiType extends KubernetesObject, ApiListType extends KubernetesListObject> @@ -87,19 +89,7 @@ public void run() { log.info("{}#Start listing and watching...", apiTypeClass); try { - ApiListType list = - listerWatcher.list( - new CallGeneratorParams(Boolean.FALSE, getRelistResourceVersion(), null)); - - V1ListMeta listMeta = list.getMetadata(); - String resourceVersion = listMeta.getResourceVersion(); - List items = list.getItems(); - - if (log.isDebugEnabled()) { - log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion); - } - this.syncWith(items, resourceVersion); - this.lastSyncResourceVersion = resourceVersion; + this.lastSyncResourceVersion = initialLoad(); this.isLastSyncResourceVersionUnavailable = false; if (log.isDebugEnabled()) { @@ -198,6 +188,22 @@ private synchronized void closeWatch() throws IOException { } } + private String initialLoad() throws ApiException { + ApiListType list = + listerWatcher.list( + new CallGeneratorParams(Boolean.FALSE, getRelistResourceVersion(), null)); + + V1ListMeta listMeta = list.getMetadata(); + String resourceVersion = listMeta.getResourceVersion(); + List items = list.getItems(); + + if (log.isDebugEnabled()) { + log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion); + } + this.syncWith(items, resourceVersion); + return resourceVersion; + } + private void syncWith(List items, String resourceVersion) { this.store.replace( (List) items, resourceVersion); // down-casting is safe here From 3a18c511617034a096d8b23f4bc70fd0b8bff121 Mon Sep 17 00:00:00 2001 From: Rafael Ribeiro Date: Wed, 25 Jun 2025 10:35:24 +0100 Subject: [PATCH 2/4] improve watch resource closing logic --- .../informer/cache/ReflectorRunnable.java | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index dd5e82fe58..9af6339ffb 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -50,8 +50,6 @@ public class ReflectorRunnable< private boolean isLastSyncResourceVersionUnavailable; - private Watchable watch; - private ListerWatcher listerWatcher; private DeltaFIFO store; @@ -91,16 +89,11 @@ public void run() { try { this.lastSyncResourceVersion = initialLoad(); this.isLastSyncResourceVersionUnavailable = false; - + Watchable watch = null; if (log.isDebugEnabled()) { log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion); } - while (true) { - if (!isActive.get()) { - closeWatch(); - return; - } - + while (isActive.get()) { try { if (log.isDebugEnabled()) { log.debug( @@ -110,21 +103,14 @@ public void run() { long jitteredWatchTimeoutSeconds = Double.valueOf(REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT.getSeconds() * (1 + Math.random())) .longValue(); - Watchable newWatch = + watch = listerWatcher.watch( new CallGeneratorParams( Boolean.TRUE, lastSyncResourceVersion, Long.valueOf(jitteredWatchTimeoutSeconds).intValue())); - synchronized (this) { - if (!isActive.get()) { - newWatch.close(); - continue; - } - watch = newWatch; - } - watchHandler(newWatch); + watchHandler(watch); } catch (WatchExpiredException e) { // Watch calls were failed due to expired resource-version. Returning // to unwind the list-watch loops so that we can respawn a new round @@ -155,7 +141,13 @@ public void run() { this.exceptionHandler.accept(apiTypeClass, t); return; } finally { - closeWatch(); + if (watch != null) { + try { + watch.close(); + } catch (IOException e) { + log.warn("{}#Error while closing watcher", this.apiTypeClass, e); + } + } } } } catch (ApiException e) { @@ -181,13 +173,6 @@ public void stop() { } } - private synchronized void closeWatch() throws IOException { - if (watch != null) { - watch.close(); - watch = null; - } - } - private String initialLoad() throws ApiException { ApiListType list = listerWatcher.list( @@ -284,6 +269,11 @@ private void watchHandler(Watchable watch) { log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion); } } + try { + watch.close(); + } catch (IOException e) { + log.warn("{}#Error while closing watcher", this.apiTypeClass, e); + } } static void defaultWatchErrorHandler( From 44645e2e4892f4f71fa0f01392d748a178b7f064 Mon Sep 17 00:00:00 2001 From: Rafael Ribeiro Date: Wed, 25 Jun 2025 14:51:07 +0100 Subject: [PATCH 3/4] restore import order --- .../informer/cache/ReflectorRunnable.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index 9af6339ffb..c5164772ac 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -12,18 +12,6 @@ */ package io.kubernetes.client.informer.cache; -import java.io.IOException; -import java.net.ConnectException; -import java.net.HttpURLConnection; -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.kubernetes.client.common.KubernetesListObject; import io.kubernetes.client.common.KubernetesObject; import io.kubernetes.client.informer.EventType; @@ -35,6 +23,16 @@ import io.kubernetes.client.util.CallGeneratorParams; import io.kubernetes.client.util.Strings; import io.kubernetes.client.util.Watchable; +import java.io.IOException; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ReflectorRunnable< ApiType extends KubernetesObject, ApiListType extends KubernetesListObject> @@ -167,7 +165,6 @@ public void run() { public void stop() { try { isActive.set(false); - closeWatch(); } catch (Throwable t) { this.exceptionHandler.accept(apiTypeClass, t); } From 0aa5a286bfb724ab2432f2af0012595de93431b0 Mon Sep 17 00:00:00 2001 From: Rafael Ribeiro Date: Wed, 25 Jun 2025 15:10:42 +0100 Subject: [PATCH 4/4] restore closeWatch --- .../informer/cache/ReflectorRunnable.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index c5164772ac..49dbb4dab1 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -139,13 +139,7 @@ public void run() { this.exceptionHandler.accept(apiTypeClass, t); return; } finally { - if (watch != null) { - try { - watch.close(); - } catch (IOException e) { - log.warn("{}#Error while closing watcher", this.apiTypeClass, e); - } - } + closeWatch(watch); } } } catch (ApiException e) { @@ -170,6 +164,17 @@ public void stop() { } } + private synchronized void closeWatch(Watchable watch) { + try { + if (watch != null) { + watch.close(); + watch = null; + } + } catch (IOException e) { + log.warn("{}#Error while closing watcher", this.apiTypeClass, e); + } + } + private String initialLoad() throws ApiException { ApiListType list = listerWatcher.list( @@ -266,11 +271,7 @@ private void watchHandler(Watchable watch) { log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion); } } - try { - watch.close(); - } catch (IOException e) { - log.warn("{}#Error while closing watcher", this.apiTypeClass, e); - } + closeWatch(watch); } static void defaultWatchErrorHandler(