From e5f83003513ea2e9fa0ff7fcff8a820e1d0bacfc Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 13:14:17 +0100 Subject: [PATCH 01/21] FFM-12087 Add no limit retry option to interceptor and use with Eventsource (streaming) code --- .../cf/client/connector/EventSource.java | 2 +- .../client/connector/NewRetryInterceptor.java | 48 ++++++++++++------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java index 2520fdc9..ad02aedc 100644 --- a/src/main/java/io/harness/cf/client/connector/EventSource.java +++ b/src/main/java/io/harness/cf/client/connector/EventSource.java @@ -83,7 +83,7 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List= maxTryCount; - log.warn( - "Request attempt {} to {} was not successful, [{}]{}", - tryCount, - chain.request().url(), - msg, - limitReached - ? ", retry limited reached" - : String.format( - Locale.getDefault(), - ", retrying in %dms (retry-after hdr: %b)", - backOffDelayMs, - retryAfterHeaderValue > 0)); + limitReached = !retryForever && tryCount >= maxTryCount; + // Conditional log message based on retryForever + if (retryForever) { + log.warn( + "Request to {} was not successful, [{}]{}", + chain.request().url(), + msg, + ", continuing retries indefinitely" + ); + } else { + log.warn( + "Request attempt {} to {} was not successful, [{}]{}", + tryCount, + chain.request().url(), + msg, + limitReached + ? ", retry limit reached" + : String.format(Locale.getDefault(), ", retrying in %dms (retry-after hdr: %b)", backOffDelayMs, retryAfterHeaderValue > 0) + ); + } if (!limitReached) { sleep(backOffDelayMs); } } tryCount++; - } while (!successful && !limitReached); + } while (!successful && (retryForever || tryCount <= maxTryCount)); return response; } From 4828530d34adc4cbd56f0f1c8b4e5e21f00d6105 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 13:22:00 +0100 Subject: [PATCH 02/21] FFM-12087 Tidyup --- .../client/connector/NewRetryInterceptor.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index cd60bef2..ba43a414 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -19,12 +19,13 @@ public class NewRetryInterceptor implements Interceptor { private static final SimpleDateFormat imfDateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); private final long retryBackoffDelay; - private final long maxTryCount; private final boolean retryForever; + // Default to 10 retries if not specified + private long maxTryCount = 10; + public NewRetryInterceptor(long retryBackoffDelay) { this.retryBackoffDelay = retryBackoffDelay; - this.maxTryCount = 5; this.retryForever = false; } @@ -83,7 +84,8 @@ public Response intercept(@NotNull Chain chain) throws IOException { log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue); backOffDelayMs = retryAfterHeaderValue * 1000L; } else { - // Else fallback to a randomized exponential backoff with a max delay of 1 minute (60,000 ms) + // Else fallback to a randomized exponential backoff with a max delay of 1 minute (60,000 + // ms) backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L); } @@ -91,21 +93,20 @@ public Response intercept(@NotNull Chain chain) throws IOException { // Conditional log message based on retryForever if (retryForever) { log.warn( - "Request to {} was not successful, [{}]{}", - chain.request().url(), - msg, - ", continuing retries indefinitely" - ); + "Request to {} was not successful, [{}]{}", chain.request().url(), msg, ", retrying"); } else { log.warn( - "Request attempt {} to {} was not successful, [{}]{}", - tryCount, - chain.request().url(), - msg, - limitReached - ? ", retry limit reached" - : String.format(Locale.getDefault(), ", retrying in %dms (retry-after hdr: %b)", backOffDelayMs, retryAfterHeaderValue > 0) - ); + "Request attempt {} to {} was not successful, [{}]{}", + tryCount, + chain.request().url(), + msg, + limitReached + ? ", retry limit reached" + : String.format( + Locale.getDefault(), + ", retrying in %dms (retry-after hdr: %b)", + backOffDelayMs, + retryAfterHeaderValue > 0)); } if (!limitReached) { From b2b7fac41299a2909e4d723f998a60e508c11413 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 13:31:45 +0100 Subject: [PATCH 03/21] FFM-12087 Add clarity to log --- .../client/connector/NewRetryInterceptor.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index ba43a414..f68d0283 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -46,7 +46,7 @@ public NewRetryInterceptor(long retryBackoffDelay, boolean retryForever) { public Response intercept(@NotNull Chain chain) throws IOException { int tryCount = 1; boolean successful; - boolean limitReached = false; + boolean limitReached; Response response = null; String msg = ""; do { @@ -89,25 +89,21 @@ public Response intercept(@NotNull Chain chain) throws IOException { backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L); } + String retryLimitDisplay = retryForever ? "∞" : String.valueOf(maxTryCount); limitReached = !retryForever && tryCount >= maxTryCount; - // Conditional log message based on retryForever - if (retryForever) { - log.warn( - "Request to {} was not successful, [{}]{}", chain.request().url(), msg, ", retrying"); - } else { - log.warn( - "Request attempt {} to {} was not successful, [{}]{}", - tryCount, - chain.request().url(), - msg, - limitReached - ? ", retry limit reached" - : String.format( - Locale.getDefault(), - ", retrying in %dms (retry-after hdr: %b)", - backOffDelayMs, - retryAfterHeaderValue > 0)); - } + log.warn( + "Request attempt {} of {} to {} was not successful, [{}]{}", + tryCount, + retryLimitDisplay, + chain.request().url(), + msg, + limitReached + ? ", retry limit reached" + : String.format( + Locale.getDefault(), + ", retrying in %dms (retry-after hdr: %b)", + backOffDelayMs, + retryAfterHeaderValue > 0)); if (!limitReached) { sleep(backOffDelayMs); From 3fa9230e76dccdd82d939fb578d95ba20168b752 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 13:47:50 +0100 Subject: [PATCH 04/21] FFM-12087 Add maxRequestConfig and format interceptor FFM-12087 Add maxRequestConfig FFM-12087 Add maxRequestConfig FFM-12087 Add maxRequestConfig --- .../io/harness/cf/client/api/BaseConfig.java | 18 ++++++++++++++++++ .../io/harness/cf/client/api/InnerClient.java | 1 + .../cf/client/connector/HarnessConfig.java | 17 +++++++++++++++++ .../cf/client/connector/HarnessConnector.java | 5 +++-- .../client/connector/NewRetryInterceptor.java | 8 ++++++-- 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/BaseConfig.java b/src/main/java/io/harness/cf/client/api/BaseConfig.java index 7986a6ef..74fdf66e 100644 --- a/src/main/java/io/harness/cf/client/api/BaseConfig.java +++ b/src/main/java/io/harness/cf/client/api/BaseConfig.java @@ -14,6 +14,7 @@ @Data public class BaseConfig { public static final int MIN_FREQUENCY = 60; + public static final long DEFAULT_REQUEST_RETRIES = 60; @Builder.Default private final boolean streamEnabled = true; @Builder.Default private final int pollIntervalInSeconds = 60; @@ -53,4 +54,21 @@ public int getFrequency() { @Builder.Default private final Cache cache = new CaffeineCache(10000); private final Storage store; + + /** + * + * Defines the maximum number of retry attempts for certain types of requests: + * + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * + * the SDK will retry up to this number of times before giving up. + * + *

+ * + * - Authentication: Used for retrying authentication requests when the server is unreachable. + * + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * + * where the SDK needs to fetch updated flag or group data. + * + *

+ * + * Note: This setting does not apply to streaming requests (either the initial connection or + * + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * + * (infinite retries). + * + */ + @Builder.Default long maxRequestRetry = DEFAULT_REQUEST_RETRIES; } diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index bec1ed88..84a4ac3a 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -63,6 +63,7 @@ public InnerClient(@NonNull final String sdkKey, @NonNull final Config options) .connectionTimeout(options.getConnectionTimeout()) .readTimeout(options.readTimeout) .writeTimeout(options.getWriteTimeout()) + .maxRequestRetry(options.getMaxRequestRetry()) .build(); HarnessConnector harnessConnector = new HarnessConnector(sdkKey, config); setUp(harnessConnector, options); diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index 013918ec..3d0609ed 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -35,4 +35,21 @@ public class HarnessConfig { * should include intermediate CAs too to allow the HTTP client to build a full trust chain. */ @Builder.Default List tlsTrustedCAs = null; + + /** + * Defines the maximum number of retry attempts for certain types of requests: + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * the SDK will retry up to this number of times before giving up. + *

+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *

+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + */ + @Builder.Default private long maxRequestRetry = 10; } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 2486b837..67a305fb 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -89,7 +89,7 @@ ApiClient makeApiClient(int retryBackOfDelay) { .getHttpClient() .newBuilder() .addInterceptor(this::reauthInterceptor) - .addInterceptor(new NewRetryInterceptor(3, retryBackOfDelay)) + .addInterceptor(new NewRetryInterceptor(options.getMaxRequestRetry(), retryBackOfDelay)) .build()); return apiClient; @@ -127,7 +127,8 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { .getHttpClient() .newBuilder() .addInterceptor(this::metricsInterceptor) - .addInterceptor(new NewRetryInterceptor(3, retryBackoffDelay)) + .addInterceptor( + new NewRetryInterceptor(options.getMaxRequestRetry(), retryBackoffDelay)) .build()); return apiClient; diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index f68d0283..2b8e8a14 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -1,5 +1,7 @@ package io.harness.cf.client.connector; +import static io.harness.cf.client.api.BaseConfig.DEFAULT_REQUEST_RETRIES; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -21,8 +23,8 @@ public class NewRetryInterceptor implements Interceptor { private final long retryBackoffDelay; private final boolean retryForever; - // Default to 10 retries if not specified - private long maxTryCount = 10; + // Use SDK default is not specified + private long maxTryCount = DEFAULT_REQUEST_RETRIES; public NewRetryInterceptor(long retryBackoffDelay) { this.retryBackoffDelay = retryBackoffDelay; @@ -76,9 +78,11 @@ public Response intercept(@NotNull Chain chain) throws IOException { return response; } } + if (!successful) { int retryAfterHeaderValue = getRetryAfterHeaderInSeconds(response); long backOffDelayMs; + if (retryAfterHeaderValue > 0) { // Use Retry-After header if detected first log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue); From acab98dc85c39365b8afe90417cbf80128cbea01 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 14:41:17 +0100 Subject: [PATCH 05/21] FFM-12087 Add maxRequestConfig to config example --- examples/src/main/java/io/harness/ff/examples/ConfigExample.java | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java index 9c4d7778..b9af4ad0 100644 --- a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java +++ b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java @@ -34,6 +34,7 @@ public static void main(String... args) HarnessConfig.builder() .configUrl("http://localhost:3000/api/1.0") .eventUrl("http://localhost:3000/api/1.0") + .maxRequestRetry(20) .build()); client = new CfClient(hc); client.waitForInitialization(); From b4845848a6bf7606daa52472a950180f5f696e0e Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 15:20:37 +0100 Subject: [PATCH 06/21] FFM-12087 Modify tests to account for new stream retry behaviour --- .../cf/client/connector/EventSourceTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java index 9cf33132..d84b9eea 100644 --- a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java +++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java @@ -27,8 +27,11 @@ static class StreamDispatcher extends Dispatcher { protected MockResponse makeStreamResponse() { int reqNo = request.getAndIncrement(); - if (reqNo <= 3) { - // Force a disconnect on the first few attempts + + if (reqNo <= 12) { + // Force a disconnect after the default SDK request retry limit of 10, which does not apply + // to stream requests which have + // no limit on retryable errors out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); } else { @@ -61,7 +64,10 @@ protected MockResponse makeStreamResponse() { int reqNo = request.getAndIncrement(); // Force a disconnect on all requests out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); - return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); + // Set a 400 response so that the stream does not retry. This is because since 1.8.0 the + // stream + // retries forever on retryable errors. + return new MockResponse().setResponseCode(400).setBody("{\"status\":\"failed\"}"); } } @@ -107,10 +113,11 @@ void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail() null)) { eventSource.start(); - TimeUnit.SECONDS.sleep(15); + TimeUnit.SECONDS.sleep(3); } - // for this test, connection to the /stream endpoint will never succeed. + // for this test, connection to the /stream endpoint will never because of an un-retryable + // error. // we expect the disconnect handler to be called, connect handler should not be called assertEquals(0, updater.getConnectCount().get()); From 99abea518299785295ea9b74fded07b86ec083bd Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 15:57:00 +0100 Subject: [PATCH 07/21] FFM-12087 Start tracking isShuttingdown to avoid retries continuing --- .../cf/client/connector/EventSource.java | 11 +++++++--- .../cf/client/connector/HarnessConnector.java | 13 ++++++++--- .../client/connector/NewRetryInterceptor.java | 22 +++++++++++++++---- .../cf/client/connector/EventSourceTest.java | 7 ++++-- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java index ad02aedc..2edb10e5 100644 --- a/src/main/java/io/harness/cf/client/connector/EventSource.java +++ b/src/main/java/io/harness/cf/client/connector/EventSource.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.*; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -37,6 +38,7 @@ public class EventSource implements Callback, AutoCloseable, Service { private final Map headers; private final long sseReadTimeoutMins; private final List trustedCAs; + private final AtomicBoolean isShuttingDown; static { LogUtil.setSystemProps(); @@ -48,7 +50,7 @@ public EventSource( @NonNull Updater updater, long sseReadTimeoutMins) throws ConnectorException { - this(url, headers, updater, sseReadTimeoutMins, 2_000, null); + this(url, headers, updater, sseReadTimeoutMins, 2_000, null, new AtomicBoolean(false)); } EventSource( @@ -57,7 +59,8 @@ public EventSource( @NonNull Updater updater, long sseReadTimeoutMins, int retryBackoffDelay, - List trustedCAs) { + List trustedCAs, + AtomicBoolean isShuttingDown) { this.url = url; this.headers = headers; this.updater = updater; @@ -65,6 +68,7 @@ public EventSource( this.retryBackoffDelay = retryBackoffDelay; this.trustedCAs = trustedCAs; this.loggingInterceptor = new HttpLoggingInterceptor(); + this.isShuttingDown = isShuttingDown; } protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List trustedCAs) @@ -83,7 +87,8 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List Date: Thu, 3 Oct 2024 16:21:12 +0100 Subject: [PATCH 08/21] FFM-12087 Don't use config where it is not needed --- .../io/harness/cf/client/api/BaseConfig.java | 68 ++++++++++++++----- .../io/harness/cf/client/api/InnerClient.java | 1 - .../cf/client/connector/HarnessConfig.java | 24 +++++++ 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/BaseConfig.java b/src/main/java/io/harness/cf/client/api/BaseConfig.java index 74fdf66e..be5b8efc 100644 --- a/src/main/java/io/harness/cf/client/api/BaseConfig.java +++ b/src/main/java/io/harness/cf/client/api/BaseConfig.java @@ -14,7 +14,7 @@ @Data public class BaseConfig { public static final int MIN_FREQUENCY = 60; - public static final long DEFAULT_REQUEST_RETRIES = 60; + public static final long DEFAULT_REQUEST_RETRIES = 10; @Builder.Default private final boolean streamEnabled = true; @Builder.Default private final int pollIntervalInSeconds = 60; @@ -56,19 +56,55 @@ public int getFrequency() { private final Storage store; /** - * + * Defines the maximum number of retry attempts for certain types of requests: - * + * authentication, polling, metrics, and reacting to stream events. If a request fails, - * + * the SDK will retry up to this number of times before giving up. - * + *

- * + * - Authentication: Used for retrying authentication requests when the server is unreachable. - * + * - Polling: Applies to requests that fetch feature flags and target groups periodically. - * + * - Metrics: Applies to analytics requests for sending metrics data to the server. - * + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, - * + * where the SDK needs to fetch updated flag or group data. - * + *

- * + * Note: This setting does not apply to streaming requests (either the initial connection or - * + * reconnecting after a disconnection). Streaming requests will always retry indefinitely - * + * (infinite retries). - * + */ - @Builder.Default long maxRequestRetry = DEFAULT_REQUEST_RETRIES; + * Defines the maximum number of retry attempts for certain types of requests: + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * the SDK will retry up to this number of times before giving up. + *

+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *

+ *

+ * The default value is {@code 10}. + *

+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + *

+ * Example usage: + *

+   * {@code
+   * BaseConfig config = BaseConfig.builder()
+   *     .maxRequestRetry(20)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES; + + /** + * Indicates whether to flush analytics data when the SDK is closed. + *

+ * When set to {@code true}, any remaining analytics data (such as metrics) + * will be sent to the server before the SDK is fully closed. If {@code false}, + * the data will not be flushed, and any unsent analytics data may be lost. + *

+ * The default value is {@code false}. + *

+ * Note: The flush will attempt to send the data in a single request. + * Any failures during this process will not be retried, and the analytics data + * may be lost. + * + *

Example usage: + *

+   * {@code
+   * BaseConfig config = BaseConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final boolean flushAnalyticsOnClose = false; } diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index 84a4ac3a..bec1ed88 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -63,7 +63,6 @@ public InnerClient(@NonNull final String sdkKey, @NonNull final Config options) .connectionTimeout(options.getConnectionTimeout()) .readTimeout(options.readTimeout) .writeTimeout(options.getWriteTimeout()) - .maxRequestRetry(options.getMaxRequestRetry()) .build(); HarnessConnector harnessConnector = new HarnessConnector(sdkKey, config); setUp(harnessConnector, options); diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index 3d0609ed..fc8d0e20 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -52,4 +52,28 @@ public class HarnessConfig { * (infinite retries). */ @Builder.Default private long maxRequestRetry = 10; + + /** + * Indicates whether to flush analytics data when the SDK is closed. + *

+ * When set to {@code true}, any remaining analytics data (such as metrics) + * will be sent to the server before the SDK is fully closed. If {@code false}, + * the data will not be flushed, and any unsent analytics data may be lost. + *

+ * The default value is {@code false}. + *

+ * Note: The flush will attempt to send the data in a single request. + * Any failures during this process will not be retried, and the analytics data + * may be lost. + * + *

Example usage: + *

+   * {@code
+   * BaseConfig config = BaseConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private boolean flushAnalyticsOnClose = false; } From dea1245862b6fa67cc356dfdd1f7f05ac4bf88ff Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 16:51:10 +0100 Subject: [PATCH 09/21] FFM-12087 Start implementing flush on close --- .../io/harness/cf/client/api/InnerClient.java | 6 +++ .../cf/client/api/MetricsProcessor.java | 37 ++++++++++++++++--- .../cf/client/connector/Connector.java | 2 + .../cf/client/connector/HarnessConnector.java | 4 ++ .../cf/client/connector/LocalConnector.java | 3 ++ 5 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index bec1ed88..ba39709a 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -387,6 +387,12 @@ public void processEvaluation( public void close() { log.info("Closing the client"); + // Mark the connector as shutting down to stop request retries from taking place. The + // connections will eventually + // be evicted when the connector is closed, but this ensures that if metrics are flushed when + // closed then it + // won't attempt to retry if the first request fails. + connector.setIsShuttingDown(); closing = true; off(); authService.close(); diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index e1e27590..475de172 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -4,6 +4,7 @@ import static io.harness.cf.client.common.Utils.shutdownExecutorService; import static java.util.concurrent.TimeUnit.SECONDS; +import io.harness.cf.Version; import io.harness.cf.client.common.SdkCodes; import io.harness.cf.client.common.StringUtils; import io.harness.cf.client.connector.Connector; @@ -15,10 +16,7 @@ import io.harness.cf.model.TargetData; import io.harness.cf.model.Variation; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -218,7 +216,7 @@ protected Metrics prepareSummaryMetricsBody(Map data, Set future = null; + try { + future = flushQueue(); + + // Wait for the task to complete or timeout + future.get(timeoutInSeconds, SECONDS); + log.debug("Metrics successfully flushed within the timeout of {} seconds", timeoutInSeconds); + + } catch (TimeoutException e) { + log.debug( + "Metrics flush did not complete within the timeout of {} seconds", timeoutInSeconds); + // Forcefully cancel the flush if it times out + future.cancel(true); + + } catch (InterruptedException | ExecutionException e) { + log.error("Error occurred during metrics flush", e); + Thread.currentThread().interrupt(); + } + } + /* package private */ - synchronized void flushQueue() { + synchronized ScheduledFuture flushQueue() { scheduler.schedule(this::runOneIteration, 0, SECONDS); + return null; } long getMetricsSent() { diff --git a/src/main/java/io/harness/cf/client/connector/Connector.java b/src/main/java/io/harness/cf/client/connector/Connector.java index 67444f6f..85c79301 100644 --- a/src/main/java/io/harness/cf/client/connector/Connector.java +++ b/src/main/java/io/harness/cf/client/connector/Connector.java @@ -29,4 +29,6 @@ public interface Connector { Service stream(Updater updater) throws ConnectorException; void close(); + + void setIsShuttingDown(); } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 44dc005d..404bcfc4 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -445,6 +445,10 @@ private void setupTls(ApiClient apiClient) { } } + public void setIsShuttingDown() { + this.isShuttingDown.set(true); + } + private static boolean isNullOrEmpty(String string) { return string == null || string.trim().isEmpty(); } diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java index 6dd0aa2d..87778759 100644 --- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java +++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java @@ -195,6 +195,9 @@ public void close() { log.debug("LocalConnector closed"); } + @Override + public void setIsShuttingDown() {} + private class FileWatcherService implements Service, AutoCloseable { private final FileWatcher flagWatcher; private final FileWatcher segmentWatcher; From fe5d2a4ed02ab58e7243e4120339758955079c65 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 17:20:49 +0100 Subject: [PATCH 10/21] FFM-12087 Add shutting down check to interceptors --- .../cf/client/api/MetricsProcessor.java | 3 +-- .../cf/client/connector/HarnessConfig.java | 24 ------------------- .../cf/client/connector/HarnessConnector.java | 7 ++++++ 3 files changed, 8 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index 475de172..1cfbb90a 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -360,8 +360,7 @@ public synchronized void flushWithTimeout(int timeoutInSeconds) { /* package private */ synchronized ScheduledFuture flushQueue() { - scheduler.schedule(this::runOneIteration, 0, SECONDS); - return null; + return scheduler.schedule(this::runOneIteration, 0, SECONDS); } long getMetricsSent() { diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index fc8d0e20..3d0609ed 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -52,28 +52,4 @@ public class HarnessConfig { * (infinite retries). */ @Builder.Default private long maxRequestRetry = 10; - - /** - * Indicates whether to flush analytics data when the SDK is closed. - *

- * When set to {@code true}, any remaining analytics data (such as metrics) - * will be sent to the server before the SDK is fully closed. If {@code false}, - * the data will not be flushed, and any unsent analytics data may be lost. - *

- * The default value is {@code false}. - *

- * Note: The flush will attempt to send the data in a single request. - * Any failures during this process will not be retried, and the analytics data - * may be lost. - * - *

Example usage: - *

-   * {@code
-   * BaseConfig config = BaseConfig.builder()
-   *     .flushAnalyticsOnClose(true)
-   *     .build();
-   * }
-   * 
- */ - @Builder.Default private boolean flushAnalyticsOnClose = false; } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 404bcfc4..34e9fe94 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -100,6 +100,10 @@ ApiClient makeApiClient(int retryBackOfDelay) { } private Response reauthInterceptor(Interceptor.Chain chain) throws IOException { + if (isShuttingDown.get()) { + return null; + } + final Request request = chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); log.debug("Checking for 403 in interceptor: requesting url {}", request.url().url()); @@ -140,6 +144,9 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { } private Response metricsInterceptor(Interceptor.Chain chain) throws IOException { + if (isShuttingDown.get()) { + return null; + } final Request request = chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); log.debug("metrics interceptor: requesting url {}", request.url().url()); From 5583cfab2fd7dd40dac90874498d108582953587 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 18:05:47 +0100 Subject: [PATCH 11/21] FFM-12087 Don't poll when SDK is closed after stream is disconnected --- src/main/java/io/harness/cf/client/api/InnerClient.java | 5 ++++- .../java/io/harness/cf/client/connector/EventSource.java | 1 + .../io/harness/cf/client/connector/HarnessConnector.java | 2 +- .../harness/cf/client/connector/NewRetryInterceptor.java | 8 ++++---- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index ba39709a..0d6c0d54 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -228,7 +228,9 @@ public void onDisconnected(String reason) { closing, options.getPollIntervalInSeconds()); log.debug("SSE disconnect detected - asking poller to refresh flags"); - pollProcessor.retrieveAll(); + if (!closing) { + pollProcessor.retrieveAll(); + } } } @@ -386,6 +388,7 @@ public void processEvaluation( } public void close() { + closing = true; log.info("Closing the client"); // Mark the connector as shutting down to stop request retries from taking place. The // connections will eventually diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java index 2edb10e5..4d985991 100644 --- a/src/main/java/io/harness/cf/client/connector/EventSource.java +++ b/src/main/java/io/harness/cf/client/connector/EventSource.java @@ -154,6 +154,7 @@ public void stop() { public void close() { stop(); if (this.streamClient != null) { + this.streamClient.dispatcher().executorService().shutdown(); this.streamClient.connectionPool().evictAll(); } log.debug("EventSource closed"); diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 34e9fe94..8529cbd9 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -134,7 +134,7 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { apiClient .getHttpClient() .newBuilder() - .addInterceptor(this::metricsInterceptor) + // .addInterceptor(this::metricsInterceptor) .addInterceptor( new NewRetryInterceptor( options.getMaxRequestRetry(), retryBackoffDelay, isShuttingDown)) diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index a6b91aa3..842d10d2 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -61,10 +61,10 @@ public Response intercept(@NotNull Chain chain) throws IOException { String msg = ""; do { - if (isShuttingDown.get()) { - log.debug("SDK is shutting down, aborting retry interceptor"); - break; - } + // if (isShuttingDown.get()) { + // log.debug("SDK is shutting down, aborting retry interceptor"); + // return response; + // } try { if (response != null) response.close(); From e40b865218bf23468215a25d78c2ec51d5e6b884 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 18:08:31 +0100 Subject: [PATCH 12/21] FFM-12087 Remove shutting down check from interceptor --- .../io/harness/cf/client/connector/HarnessConnector.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 8529cbd9..62679ab6 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -134,7 +134,7 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { apiClient .getHttpClient() .newBuilder() - // .addInterceptor(this::metricsInterceptor) + .addInterceptor(this::metricsInterceptor) .addInterceptor( new NewRetryInterceptor( options.getMaxRequestRetry(), retryBackoffDelay, isShuttingDown)) @@ -144,9 +144,6 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { } private Response metricsInterceptor(Interceptor.Chain chain) throws IOException { - if (isShuttingDown.get()) { - return null; - } final Request request = chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); log.debug("metrics interceptor: requesting url {}", request.url().url()); From 4dc239caee476e87f29e1d544f4173fc026e3c7a Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 18:37:18 +0100 Subject: [PATCH 13/21] FFM-12087 Apply timeout to metrics flush request - CONFIG option FFM-12087 Apply timeout to metrics flush request --- .../cf/client/api/MetricsProcessor.java | 6 ++-- .../cf/client/connector/HarnessConfig.java | 30 +++++++++++++++++++ .../cf/client/connector/HarnessConnector.java | 28 ++++++++++++++--- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index 1cfbb90a..be087df2 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -304,7 +304,7 @@ public void start() { public void stop() { if (config.isFlushAnalyticsOnClose()) { - flushWithTimeout(10); + flushWithTimeout(config.getFlushAnalyticsOnCloseTimeout()); } log.debug("Stopping MetricsProcessor"); @@ -335,14 +335,14 @@ public boolean isRunning() { return runningTask != null && !runningTask.isCancelled(); } - public synchronized void flushWithTimeout(int timeoutInSeconds) { + public synchronized void flushWithTimeout(long timeoutInSeconds) { log.debug("Flushing metrics with timeout: {} seconds", timeoutInSeconds); ScheduledFuture future = null; try { future = flushQueue(); // Wait for the task to complete or timeout - future.get(timeoutInSeconds, SECONDS); + future.get(1, SECONDS); log.debug("Metrics successfully flushed within the timeout of {} seconds", timeoutInSeconds); } catch (TimeoutException e) { diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index 3d0609ed..2d5d029b 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -30,6 +30,36 @@ public class HarnessConfig { /** read timeout in minutes for SSE connections */ @Builder.Default long sseReadTimeout = 1; + /** + * The timeout for flushing analytics on SDK close. + *

+ * This option sets the maximum duration, in milliseconds, the SDK will wait for the + * analytics data to be flushed after the SDK has been closed. If the flush process takes longer + * than this timeout, the request will be canceled, and any remaining data will + * not be sent. This ensures that the SDK does not hang indefinitely during shutdown. + *

+ * The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK + *

+ * Note: This timeout only applies to the flush process that happens when + * {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other + * requests made by the SDK during normal operation. + * + *

Example usage: + *

+   * {@code
+   * // Timeout the analytics flush request in 3000ms (3 seconds)
+   * HarnessConfig harnessConfig =
+   *                 HarnessConfig.builder().flushAnalyticsOnCloseTimeout(3000).build();
+   *
+   * // flush analytics on close is enabled via BaseConfig
+   * BaseConfig config = BaseConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final long flushAnalyticsOnCloseTimeout = 30000; + /** * list of trusted CAs - for when the given config/event URLs are signed with a private CA. You * should include intermediate CAs too to allow the HTTP client to build a full trust chain. diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 62679ab6..f58d2ac8 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -15,6 +15,7 @@ import java.security.cert.X509Certificate; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.NonNull; import lombok.SneakyThrows; @@ -144,11 +145,30 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { } private Response metricsInterceptor(Interceptor.Chain chain) throws IOException { - final Request request = - chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); - log.debug("metrics interceptor: requesting url {}", request.url().url()); - return chain.proceed(request); + Request originalRequest = chain.request(); + + // If this is flush when the SDK has been closed, then apply a per request timeout instead + // of the okhttp client timeout + if (isShuttingDown.get()) { + log.debug("SDK is shutting down, applying custom call timeout for flush request"); + + Request shutdownRequest = + originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build(); + + // Apply custom timeouts (e.g., 5 seconds for each timeout type) + return chain + .withConnectTimeout(5, TimeUnit.SECONDS) // Custom connect timeout + .withReadTimeout(5, TimeUnit.SECONDS) // Custom read timeout + .withWriteTimeout(5, TimeUnit.SECONDS) // Custom write timeout + .proceed(shutdownRequest); + } else { + final Request request = + originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build(); + log.debug("metrics interceptor: requesting url {}", request.url().url()); + + return chain.proceed(request); + } } protected String getRequestID() { From ac23bb2b2634e6648b8a10d7173d5a4529594c0c Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 19:07:45 +0100 Subject: [PATCH 14/21] FFM-12087 Tidy up flush --- .../cf/client/api/MetricsProcessor.java | 36 ++++++------------- .../cf/client/connector/HarnessConfig.java | 2 +- .../cf/client/connector/HarnessConnector.java | 6 ++-- .../client/connector/NewRetryInterceptor.java | 10 ++++++ 4 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index be087df2..86fb0588 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -304,7 +304,7 @@ public void start() { public void stop() { if (config.isFlushAnalyticsOnClose()) { - flushWithTimeout(config.getFlushAnalyticsOnCloseTimeout()); + flushQueue(); } log.debug("Stopping MetricsProcessor"); @@ -326,7 +326,13 @@ public void close() { shutdownExecutorService( scheduler, SdkCodes::infoMetricsThreadExited, - errMsg -> log.warn("failed to stop metrics scheduler: {}", errMsg)); + errMsg -> { + if (config.isFlushAnalyticsOnClose()) { + log.warn("Waited for flush to finish {}", errMsg); + } else { + log.warn("Failed to stop metrics scheduler: {}", errMsg); + } + }); log.debug("Closing MetricsProcessor"); } @@ -335,32 +341,10 @@ public boolean isRunning() { return runningTask != null && !runningTask.isCancelled(); } - public synchronized void flushWithTimeout(long timeoutInSeconds) { - log.debug("Flushing metrics with timeout: {} seconds", timeoutInSeconds); - ScheduledFuture future = null; - try { - future = flushQueue(); - - // Wait for the task to complete or timeout - future.get(1, SECONDS); - log.debug("Metrics successfully flushed within the timeout of {} seconds", timeoutInSeconds); - - } catch (TimeoutException e) { - log.debug( - "Metrics flush did not complete within the timeout of {} seconds", timeoutInSeconds); - // Forcefully cancel the flush if it times out - future.cancel(true); - - } catch (InterruptedException | ExecutionException e) { - log.error("Error occurred during metrics flush", e); - Thread.currentThread().interrupt(); - } - } - /* package private */ - synchronized ScheduledFuture flushQueue() { - return scheduler.schedule(this::runOneIteration, 0, SECONDS); + synchronized void flushQueue() { + scheduler.schedule(this::runOneIteration, 0, SECONDS); } long getMetricsSent() { diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index 2d5d029b..4e217159 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -58,7 +58,7 @@ public class HarnessConfig { * } * */ - @Builder.Default private final long flushAnalyticsOnCloseTimeout = 30000; + @Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000; /** * list of trusted CAs - for when the given config/event URLs are signed with a private CA. You diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index f58d2ac8..e26664fb 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -158,9 +158,9 @@ private Response metricsInterceptor(Interceptor.Chain chain) throws IOException // Apply custom timeouts (e.g., 5 seconds for each timeout type) return chain - .withConnectTimeout(5, TimeUnit.SECONDS) // Custom connect timeout - .withReadTimeout(5, TimeUnit.SECONDS) // Custom read timeout - .withWriteTimeout(5, TimeUnit.SECONDS) // Custom write timeout + .withConnectTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) + .withReadTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) + .withWriteTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) .proceed(shutdownRequest); } else { final Request request = diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index 842d10d2..305d6ee5 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -109,6 +109,16 @@ public Response intercept(@NotNull Chain chain) throws IOException { String retryLimitDisplay = retryForever ? "∞" : String.valueOf(maxTryCount); limitReached = !retryForever && tryCount >= maxTryCount; + + if (isShuttingDown.get()) { + log.warn( + "Request attempt {} to {} was not successful, [{}], SDK is shutting down, no retries will be attempted", + tryCount, + chain.request().url(), + msg); + return response; // Exit without further retries + } + log.warn( "Request attempt {} of {} to {} was not successful, [{}]{}", tryCount, From f8979055a2fd29332e3ba769ff86ebcd3308a37d Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 19:30:39 +0100 Subject: [PATCH 15/21] FFM-12087 Tests for flush --- .../cf/client/api/MetricsProcessorTest.java | 53 +++++++++++++++++-- .../client/api/testutils/DummyConnector.java | 3 ++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java index 5e6b3efd..cd673083 100644 --- a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java +++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java @@ -1,8 +1,7 @@ package io.harness.cf.client.api; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import io.harness.cf.client.connector.Connector; import io.harness.cf.client.connector.ConnectorException; @@ -34,11 +33,59 @@ public void setup() { MockitoAnnotations.openMocks(this); metricsProcessor = Mockito.spy( - new MetricsProcessor(connector, BaseConfig.builder().bufferSize(10_001).build(), this)); + new MetricsProcessor( + connector, + BaseConfig.builder().bufferSize(10_001).flushAnalyticsOnClose(true).build(), + this)); metricsProcessor.reset(); } + @Test + public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, InterruptedException { + // Arrange + Metrics mockMetrics = mock(Metrics.class); + doNothing().when(connector).postMetrics(mockMetrics); + + // Act: Push some metrics data and call flush + Target target = Target.builder().identifier("target-1").build(); + Variation variation = Variation.builder().identifier("true").value("true").build(); + metricsProcessor.pushToQueue(target, "feature-1", variation); + + // Mimic shutdown behavior + metricsProcessor.close(); + + // Assert: Verify that postMetrics is called during shutdown + verify(connector, times(1)).postMetrics(any(Metrics.class)); + verifyNoMoreInteractions(connector); + } + + @Test + public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, InterruptedException { + // Arrange + metricsProcessor = + Mockito.spy( + new MetricsProcessor( + connector, + BaseConfig.builder().bufferSize(10_001).flushAnalyticsOnClose(false).build(), + this)); + + Metrics mockMetrics = mock(Metrics.class); + doNothing().when(connector).postMetrics(mockMetrics); + + // Act: Push some metrics data and call flush + Target target = Target.builder().identifier("target-1").build(); + Variation variation = Variation.builder().identifier("true").value("true").build(); + metricsProcessor.pushToQueue(target, "feature-1", variation); + + // Mimic shutdown behavior + metricsProcessor.close(); + + // Assert: Verify that postMetrics not called + verify(connector, times(0)).postMetrics(any(Metrics.class)); + metricsProcessor.reset(); + } + @Test public void testPushToQueue() throws InterruptedException { ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(BUFFER_SIZE); diff --git a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java index 69101a7e..82a87233 100644 --- a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java +++ b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java @@ -79,4 +79,7 @@ public int getTotalMetricEvaluations() { @Override public void close() {} + + @Override + public void setIsShuttingDown() {} } From 71bee7325d4622018d8494c9348fa249cc5b4ff6 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Thu, 3 Oct 2024 19:31:26 +0100 Subject: [PATCH 16/21] FFM-12087 Bump version & Tidy up FFM-12087 Bump version --- README.md | 4 ++-- settings.gradle | 2 +- src/main/java/io/harness/cf/client/api/InnerClient.java | 3 +-- .../harness/cf/client/connector/NewRetryInterceptor.java | 9 +-------- .../io/harness/cf/client/connector/EventSourceTest.java | 3 +-- 5 files changed, 6 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 4c6fcc51..8d094969 100644 --- a/README.md +++ b/README.md @@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file: io.harness ff-java-server-sdk - 1.7.0 + 1.8.0 ``` #### Gradle ``` -implementation 'io.harness:ff-java-server-sdk:1.7.0' +implementation 'io.harness:ff-java-server-sdk:1.8.0' ``` ### Code Sample diff --git a/settings.gradle b/settings.gradle index 80326964..04777dcf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,7 +4,7 @@ dependencyResolutionManagement { versionCatalogs { libs { // main sdk version - version('sdk', '1.7.0'); + version('sdk', '1.8.0'); // sdk deps version('okhttp3', '4.12.0') diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index 0d6c0d54..b7ec3a1a 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -393,8 +393,7 @@ public void close() { // Mark the connector as shutting down to stop request retries from taking place. The // connections will eventually // be evicted when the connector is closed, but this ensures that if metrics are flushed when - // closed then it - // won't attempt to retry if the first request fails. + // closed then it won't attempt to retry if the first request fails. connector.setIsShuttingDown(); closing = true; off(); diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index 305d6ee5..55d6725a 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -60,12 +60,6 @@ public Response intercept(@NotNull Chain chain) throws IOException { Response response = null; String msg = ""; do { - - // if (isShuttingDown.get()) { - // log.debug("SDK is shutting down, aborting retry interceptor"); - // return response; - // } - try { if (response != null) response.close(); @@ -102,8 +96,7 @@ public Response intercept(@NotNull Chain chain) throws IOException { log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue); backOffDelayMs = retryAfterHeaderValue * 1000L; } else { - // Else fallback to a randomized exponential backoff with a max delay of 1 minute (60,000 - // ms) + // Else fallback to a randomized exponential backoff with a max delay of 1 minute (60,000ms) backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L); } diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java index 5a5dea54..8d0e5bd1 100644 --- a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java +++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java @@ -120,8 +120,7 @@ void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail() } // for this test, connection to the /stream endpoint will never because of an un-retryable - // error. - // we expect the disconnect handler to be called, connect handler should not be called + // error. We expect the disconnect handler to be called, connect handler should not be called assertEquals(0, updater.getConnectCount().get()); assertEquals(0, updater.getFailureCount().get()); From 878f12e1bfed5fc9de03392fd46bf4e9834a139b Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 12:31:22 +0100 Subject: [PATCH 17/21] FFM-12087 Add flush for local connector --- .../java/io/harness/cf/client/connector/LocalConnector.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java index 87778759..fd75d480 100644 --- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java +++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java @@ -15,6 +15,7 @@ import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.NonNull; @@ -41,6 +42,7 @@ public class LocalConnector implements Connector, AutoCloseable { private static final String SEGMENTS = "segments"; private final String source; private final Gson gson = new Gson(); + private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); static { LogUtil.setSystemProps(); @@ -196,7 +198,9 @@ public void close() { } @Override - public void setIsShuttingDown() {} + public void setIsShuttingDown() { + isShuttingDown.set(true); + } private class FileWatcherService implements Service, AutoCloseable { private final FileWatcher flagWatcher; From ffb00652a1fce62d515762b1f984add5cc9d8b32 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 13:39:25 +0100 Subject: [PATCH 18/21] FFM-12087 Move option to harness config --- .../io/harness/cf/client/api/BaseConfig.java | 24 ------ .../io/harness/cf/client/api/InnerClient.java | 5 +- .../cf/client/api/MetricsProcessor.java | 12 ++- .../cf/client/connector/Connector.java | 2 + .../cf/client/connector/HarnessConfig.java | 78 ++++++++++++------- .../cf/client/connector/HarnessConnector.java | 6 ++ .../cf/client/connector/LocalConnector.java | 6 ++ .../client/connector/NewRetryInterceptor.java | 3 +- .../api/MetricsProcessorStressTest.java | 3 +- .../cf/client/api/MetricsProcessorTest.java | 22 +++--- .../client/api/testutils/DummyConnector.java | 5 ++ 11 files changed, 94 insertions(+), 72 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/BaseConfig.java b/src/main/java/io/harness/cf/client/api/BaseConfig.java index be5b8efc..d64f0c5b 100644 --- a/src/main/java/io/harness/cf/client/api/BaseConfig.java +++ b/src/main/java/io/harness/cf/client/api/BaseConfig.java @@ -83,28 +83,4 @@ public int getFrequency() { * */ @Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES; - - /** - * Indicates whether to flush analytics data when the SDK is closed. - *

- * When set to {@code true}, any remaining analytics data (such as metrics) - * will be sent to the server before the SDK is fully closed. If {@code false}, - * the data will not be flushed, and any unsent analytics data may be lost. - *

- * The default value is {@code false}. - *

- * Note: The flush will attempt to send the data in a single request. - * Any failures during this process will not be retried, and the analytics data - * may be lost. - * - *

Example usage: - *

-   * {@code
-   * BaseConfig config = BaseConfig.builder()
-   *     .flushAnalyticsOnClose(true)
-   *     .build();
-   * }
-   * 
- */ - @Builder.Default private final boolean flushAnalyticsOnClose = false; } diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index b7ec3a1a..c0ee5b03 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf log.info("Starting SDK client with configuration: {}", this.options); this.connector = connector; this.connector.setOnUnauthorized(this::onUnauthorized); - // initialization repository = new StorageRepository( @@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this); pollProcessor = new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this); - metricsProcessor = new MetricsProcessor(this.connector, this.options, this); + metricsProcessor = + new MetricsProcessor( + this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose()); updateProcessor = new UpdateProcessor(this.connector, this.repository, this); // start with authentication diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index 86fb0588..225adbaa 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -111,13 +111,19 @@ public boolean containsKey(K key) { private final LongAdder metricsSent = new LongAdder(); private final int maxFreqMapSize; + private final boolean shouldFlushMetricsOnClose; + public MetricsProcessor( - @NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) { + @NonNull Connector connector, + @NonNull BaseConfig config, + @NonNull MetricsCallback callback, + boolean shouldFlushMetricsOnClose) { this.connector = connector; this.config = config; this.frequencyMap = new FrequencyMap<>(); this.targetsSeen = ConcurrentHashMap.newKeySet(); this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN); + this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose; callback.onMetricsReady(); } @@ -303,7 +309,7 @@ public void start() { } public void stop() { - if (config.isFlushAnalyticsOnClose()) { + if (shouldFlushMetricsOnClose) { flushQueue(); } @@ -327,7 +333,7 @@ public void close() { scheduler, SdkCodes::infoMetricsThreadExited, errMsg -> { - if (config.isFlushAnalyticsOnClose()) { + if (shouldFlushMetricsOnClose) { log.warn("Waited for flush to finish {}", errMsg); } else { log.warn("Failed to stop metrics scheduler: {}", errMsg); diff --git a/src/main/java/io/harness/cf/client/connector/Connector.java b/src/main/java/io/harness/cf/client/connector/Connector.java index 85c79301..a2529aac 100644 --- a/src/main/java/io/harness/cf/client/connector/Connector.java +++ b/src/main/java/io/harness/cf/client/connector/Connector.java @@ -30,5 +30,7 @@ public interface Connector { void close(); + boolean getShouldFlushAnalyticsOnClose(); + void setIsShuttingDown(); } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java index 4e217159..930e0eb0 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConfig.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConfig.java @@ -30,6 +30,53 @@ public class HarnessConfig { /** read timeout in minutes for SSE connections */ @Builder.Default long sseReadTimeout = 1; + /** + * list of trusted CAs - for when the given config/event URLs are signed with a private CA. You + * should include intermediate CAs too to allow the HTTP client to build a full trust chain. + */ + @Builder.Default List tlsTrustedCAs = null; + + /** + * Defines the maximum number of retry attempts for certain types of requests: + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * the SDK will retry up to this number of times before giving up. + *

+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *

+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + */ + @Builder.Default private long maxRequestRetry = 10; + + /** + * Indicates whether to flush analytics data when the SDK is closed. + *

+ * When set to {@code true}, any remaining analytics data (such as metrics) + * will be sent to the server before the SDK is fully closed. If {@code false}, + * the data will not be flushed, and any unsent analytics data may be lost. + *

+ * The default value is {@code false}. + *

+ * Note: The flush will attempt to send the data in a single request. + * Any failures during this process will not be retried, and the analytics data + * may be lost. + * + *

Example usage: + *

+   * {@code
+   * HarnessConfig harnessConfig = HarnessConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final boolean flushAnalyticsOnClose = false; + /** * The timeout for flushing analytics on SDK close. *

@@ -47,39 +94,14 @@ public class HarnessConfig { *

Example usage: *

    * {@code
-   * // Timeout the analytics flush request in 3000ms (3 seconds)
-   * HarnessConfig harnessConfig =
-   *                 HarnessConfig.builder().flushAnalyticsOnCloseTimeout(3000).build();
    *
-   * // flush analytics on close is enabled via BaseConfig
-   * BaseConfig config = BaseConfig.builder()
+   * HarnessConfig harnessConfig = HarnessConfig.builder()
    *     .flushAnalyticsOnClose(true)
+   *      // Timeout the analytics flush request in 3000ms (3 seconds)
+   *     .flushAnalyticsOnCloseTimeout(3000).build();
    *     .build();
    * }
    * 
*/ @Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000; - - /** - * list of trusted CAs - for when the given config/event URLs are signed with a private CA. You - * should include intermediate CAs too to allow the HTTP client to build a full trust chain. - */ - @Builder.Default List tlsTrustedCAs = null; - - /** - * Defines the maximum number of retry attempts for certain types of requests: - * authentication, polling, metrics, and reacting to stream events. If a request fails, - * the SDK will retry up to this number of times before giving up. - *

- * - Authentication: Used for retrying authentication requests when the server is unreachable. - * - Polling: Applies to requests that fetch feature flags and target groups periodically. - * - Metrics: Applies to analytics requests for sending metrics data to the server. - * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, - * where the SDK needs to fetch updated flag or group data. - *

- * Note: This setting does not apply to streaming requests (either the initial connection or - * reconnecting after a disconnection). Streaming requests will always retry indefinitely - * (infinite retries). - */ - @Builder.Default private long maxRequestRetry = 10; } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index e26664fb..e869b66f 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -34,6 +34,7 @@ public class HarnessConnector implements Connector, AutoCloseable { private final MetricsApi metricsApi; private final String apiKey; private final HarnessConfig options; + private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); private String token; @@ -473,6 +474,11 @@ public void setIsShuttingDown() { this.isShuttingDown.set(true); } + @Override + public boolean getShouldFlushAnalyticsOnClose() { + return options.isFlushAnalyticsOnClose(); + } + private static boolean isNullOrEmpty(String string) { return string == null || string.trim().isEmpty(); } diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java index fd75d480..343a71c9 100644 --- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java +++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java @@ -197,6 +197,12 @@ public void close() { log.debug("LocalConnector closed"); } + @Override + public boolean getShouldFlushAnalyticsOnClose() { + // TODO - do we want to support flush for local connector? need to pass it in somehow + return false; + } + @Override public void setIsShuttingDown() { isShuttingDown.set(true); diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index 55d6725a..7df56229 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -96,7 +96,8 @@ public Response intercept(@NotNull Chain chain) throws IOException { log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue); backOffDelayMs = retryAfterHeaderValue * 1000L; } else { - // Else fallback to a randomized exponential backoff with a max delay of 1 minute (60,000ms) + // Else fallback to a randomized exponential backoff with a max delay of 1 minute + // (60,000ms) backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L); } diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java index abb1676d..e2b33e94 100644 --- a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java +++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java @@ -50,7 +50,8 @@ void testRegisterEvaluationContention() throws Exception { BaseConfig.builder() // .globalTargetEnabled(false) .build(), - new DummyMetricsCallback()); + new DummyMetricsCallback(), + false); metricsProcessor.start(); diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java index cd673083..cda8b26a 100644 --- a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java +++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java @@ -34,15 +34,13 @@ public void setup() { metricsProcessor = Mockito.spy( new MetricsProcessor( - connector, - BaseConfig.builder().bufferSize(10_001).flushAnalyticsOnClose(true).build(), - this)); + connector, BaseConfig.builder().bufferSize(10_001).build(), this, false)); metricsProcessor.reset(); } @Test - public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, InterruptedException { + public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, InterruptedException { // Arrange Metrics mockMetrics = mock(Metrics.class); doNothing().when(connector).postMetrics(mockMetrics); @@ -55,20 +53,18 @@ public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, Interr // Mimic shutdown behavior metricsProcessor.close(); - // Assert: Verify that postMetrics is called during shutdown - verify(connector, times(1)).postMetrics(any(Metrics.class)); + // Assert: Verify that postMetrics not called during shutdown + verify(connector, times(0)).postMetrics(any(Metrics.class)); verifyNoMoreInteractions(connector); } @Test - public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, InterruptedException { + public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, InterruptedException { // Arrange metricsProcessor = Mockito.spy( new MetricsProcessor( - connector, - BaseConfig.builder().bufferSize(10_001).flushAnalyticsOnClose(false).build(), - this)); + connector, BaseConfig.builder().bufferSize(10_001).build(), this, true)); Metrics mockMetrics = mock(Metrics.class); doNothing().when(connector).postMetrics(mockMetrics); @@ -81,8 +77,8 @@ public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, Inter // Mimic shutdown behavior metricsProcessor.close(); - // Assert: Verify that postMetrics not called - verify(connector, times(0)).postMetrics(any(Metrics.class)); + // Assert: Verify that postMetrics is called during shutdown + verify(connector, times(1)).postMetrics(any(Metrics.class)); metricsProcessor.reset(); } @@ -230,7 +226,7 @@ void shouldPostCorrectMetrics_WhenGlobalTargetEnabledOrDisabled(boolean globalTa doNothing().when(mockConnector).postMetrics(metricsArgumentCaptor.capture()); final MetricsProcessor processor = - new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class)); + new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class), false); final Target target = Target.builder().identifier("target123").build(); final Variation variation = Variation.builder().identifier("true").value("true").build(); diff --git a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java index 82a87233..df8086e7 100644 --- a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java +++ b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java @@ -80,6 +80,11 @@ public int getTotalMetricEvaluations() { @Override public void close() {} + @Override + public boolean getShouldFlushAnalyticsOnClose() { + return false; + } + @Override public void setIsShuttingDown() {} } From c40f0967eccad6a6f5efb8ea3be829a5a4005fca Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 13:56:10 +0100 Subject: [PATCH 19/21] FFM-12087 Update config example --- .../src/main/java/io/harness/ff/examples/ConfigExample.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java index b9af4ad0..f3632a8f 100644 --- a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java +++ b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java @@ -35,6 +35,8 @@ public static void main(String... args) .configUrl("http://localhost:3000/api/1.0") .eventUrl("http://localhost:3000/api/1.0") .maxRequestRetry(20) + .flushAnalyticsOnClose(true) + .flushAnalyticsOnCloseTimeout(30000) .build()); client = new CfClient(hc); client.waitForInitialization(); From f7640f2130f0dfe4dfe3c382a71f2b3dd9037e09 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 14:23:48 +0100 Subject: [PATCH 20/21] FFM-12087 Defensive check on analytics enabled for flush + tidy up --- src/main/java/io/harness/cf/client/api/InnerClient.java | 4 ++-- src/main/java/io/harness/cf/client/api/MetricsProcessor.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index c0ee5b03..7ef4090d 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -389,14 +389,14 @@ public void processEvaluation( } public void close() { - closing = true; log.info("Closing the client"); + closing = true; + // Mark the connector as shutting down to stop request retries from taking place. The // connections will eventually // be evicted when the connector is closed, but this ensures that if metrics are flushed when // closed then it won't attempt to retry if the first request fails. connector.setIsShuttingDown(); - closing = true; off(); authService.close(); repository.close(); diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index 225adbaa..2958dbb6 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -309,7 +309,7 @@ public void start() { } public void stop() { - if (shouldFlushMetricsOnClose) { + if (shouldFlushMetricsOnClose && config.isAnalyticsEnabled()) { flushQueue(); } From eac09844bfe1116b050661d1a5d66044dee78654 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 14:31:07 +0100 Subject: [PATCH 21/21] FFM-12087 Remove flush from LocalConnector as not supported there --- .../java/io/harness/cf/client/connector/LocalConnector.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java index 343a71c9..5ce77297 100644 --- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java +++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java @@ -15,7 +15,6 @@ import java.util.Date; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.NonNull; @@ -42,7 +41,6 @@ public class LocalConnector implements Connector, AutoCloseable { private static final String SEGMENTS = "segments"; private final String source; private final Gson gson = new Gson(); - private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); static { LogUtil.setSystemProps(); @@ -199,13 +197,12 @@ public void close() { @Override public boolean getShouldFlushAnalyticsOnClose() { - // TODO - do we want to support flush for local connector? need to pass it in somehow return false; } @Override public void setIsShuttingDown() { - isShuttingDown.set(true); + // No need for local connector as no retries used } private class FileWatcherService implements Service, AutoCloseable {