Skip to content

FFM-12087 New configuration options + close related bug fixes #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e5f8300
FFM-12087 Add no limit retry option to interceptor and use with Event…
erdirowlands Oct 3, 2024
4828530
FFM-12087 Tidyup
erdirowlands Oct 3, 2024
b2b7fac
FFM-12087 Add clarity to log
erdirowlands Oct 3, 2024
3fa9230
FFM-12087 Add maxRequestConfig and format interceptor
erdirowlands Oct 3, 2024
acab98d
FFM-12087 Add maxRequestConfig to config example
erdirowlands Oct 3, 2024
b484584
FFM-12087 Modify tests to account for new stream retry behaviour
erdirowlands Oct 3, 2024
99abea5
FFM-12087 Start tracking isShuttingdown to avoid retries continuing
erdirowlands Oct 3, 2024
f8aec1b
FFM-12087 Don't use config where it is not needed
erdirowlands Oct 3, 2024
dea1245
FFM-12087 Start implementing flush on close
erdirowlands Oct 3, 2024
fe5d2a4
FFM-12087 Add shutting down check to interceptors
erdirowlands Oct 3, 2024
5583cfa
FFM-12087 Don't poll when SDK is closed after stream is disconnected
erdirowlands Oct 3, 2024
e40b865
FFM-12087 Remove shutting down check from interceptor
erdirowlands Oct 3, 2024
4dc239c
FFM-12087 Apply timeout to metrics flush request - CONFIG option
erdirowlands Oct 3, 2024
ac23bb2
FFM-12087 Tidy up flush
erdirowlands Oct 3, 2024
f897905
FFM-12087 Tests for flush
erdirowlands Oct 3, 2024
71bee73
FFM-12087 Bump version & Tidy up
erdirowlands Oct 3, 2024
878f12e
FFM-12087 Add flush for local connector
erdirowlands Oct 4, 2024
ffb0065
FFM-12087 Move option to harness config
erdirowlands Oct 4, 2024
c40f096
FFM-12087 Update config example
erdirowlands Oct 4, 2024
f7640f2
FFM-12087 Defensive check on analytics enabled for flush + tidy up
erdirowlands Oct 4, 2024
eac0984
FFM-12087 Remove flush from LocalConnector as not supported there
erdirowlands Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
<dependency>
<groupId>io.harness</groupId>
<artifactId>ff-java-server-sdk</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>
</dependency>
```

#### Gradle

```
implementation 'io.harness:ff-java-server-sdk:1.7.0'
implementation 'io.harness:ff-java-server-sdk:1.8.0'
```

### Code Sample
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static void main(String... args)
HarnessConfig.builder()
.configUrl("http://localhost:3000/api/1.0")
.eventUrl("http://localhost:3000/api/1.0")
.maxRequestRetry(20)
.flushAnalyticsOnClose(true)
.flushAnalyticsOnCloseTimeout(30000)
.build());
client = new CfClient(hc);
client.waitForInitialization();
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencyResolutionManagement {
versionCatalogs {
libs {
// main sdk version
version('sdk', '1.7.0');
version('sdk', '1.8.0');

// sdk deps
version('okhttp3', '4.12.0')
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/harness/cf/client/api/BaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@Data
public class BaseConfig {
public static final int MIN_FREQUENCY = 60;
public static final long DEFAULT_REQUEST_RETRIES = 10;

@Builder.Default private final boolean streamEnabled = true;
@Builder.Default private final int pollIntervalInSeconds = 60;
Expand Down Expand Up @@ -53,4 +54,33 @@ public int getFrequency() {
@Builder.Default private final Cache cache = new CaffeineCache(10000);

private final Storage store;

/**
* Defines the maximum number of retry attempts for certain types of requests:
* authentication, polling, metrics, and reacting to stream events. If a request fails,
* the SDK will retry up to this number of times before giving up.
* <p>
* - Authentication: Used for retrying authentication requests when the server is unreachable.
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
* - Metrics: Applies to analytics requests for sending metrics data to the server.
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
* where the SDK needs to fetch updated flag or group data.
* <p>
* <p>
* The default value is {@code 10}.
* <p>
* <b>Note:</b> This setting does not apply to streaming requests (either the initial connection or
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
* (infinite retries).
* <p>
* Example usage:
* <pre>
* {@code
* BaseConfig config = BaseConfig.builder()
* .maxRequestRetry(20)
* .build();
* }
* </pre>
*/
@Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES;
}
15 changes: 12 additions & 3 deletions src/main/java/io/harness/cf/client/api/InnerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
log.info("Starting SDK client with configuration: {}", this.options);
this.connector = connector;
this.connector.setOnUnauthorized(this::onUnauthorized);

// initialization
repository =
new StorageRepository(
Expand All @@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this);
pollProcessor =
new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this);
metricsProcessor = new MetricsProcessor(this.connector, this.options, this);
metricsProcessor =
new MetricsProcessor(
this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose());
updateProcessor = new UpdateProcessor(this.connector, this.repository, this);

// start with authentication
Expand Down Expand Up @@ -228,7 +229,9 @@ public void onDisconnected(String reason) {
closing,
options.getPollIntervalInSeconds());
log.debug("SSE disconnect detected - asking poller to refresh flags");
pollProcessor.retrieveAll();
if (!closing) {
pollProcessor.retrieveAll();
}
}
}

Expand Down Expand Up @@ -388,6 +391,12 @@ public void processEvaluation(
public void close() {
log.info("Closing the client");
closing = true;

// Mark the connector as shutting down to stop request retries from taking place. The
// connections will eventually
// be evicted when the connector is closed, but this ensures that if metrics are flushed when
// closed then it won't attempt to retry if the first request fails.
connector.setIsShuttingDown();
off();
authService.close();
repository.close();
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/io/harness/cf/client/api/MetricsProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.harness.cf.client.common.Utils.shutdownExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;

import io.harness.cf.Version;
import io.harness.cf.client.common.SdkCodes;
import io.harness.cf.client.common.StringUtils;
import io.harness.cf.client.connector.Connector;
Expand All @@ -15,10 +16,7 @@
import io.harness.cf.model.TargetData;
import io.harness.cf.model.Variation;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -113,13 +111,19 @@ public boolean containsKey(K key) {
private final LongAdder metricsSent = new LongAdder();
private final int maxFreqMapSize;

private final boolean shouldFlushMetricsOnClose;

public MetricsProcessor(
@NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) {
@NonNull Connector connector,
@NonNull BaseConfig config,
@NonNull MetricsCallback callback,
boolean shouldFlushMetricsOnClose) {
this.connector = connector;
this.config = config;
this.frequencyMap = new FrequencyMap<>();
this.targetsSeen = ConcurrentHashMap.newKeySet();
this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN);
this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose;
callback.onMetricsReady();
}

Expand Down Expand Up @@ -218,7 +222,7 @@ protected Metrics prepareSummaryMetricsBody(Map<MetricEvent, Long> data, Set<Tar
new KeyValue(TARGET_ATTRIBUTE, summary.getTargetIdentifier()),
new KeyValue(SDK_TYPE, SERVER),
new KeyValue(SDK_LANGUAGE, "java"),
new KeyValue(SDK_VERSION, io.harness.cf.Version.VERSION)));
new KeyValue(SDK_VERSION, Version.VERSION)));
if (metrics.getMetricsData() != null) {
metrics.getMetricsData().add(metricsData);
}
Expand Down Expand Up @@ -305,6 +309,10 @@ public void start() {
}

public void stop() {
if (shouldFlushMetricsOnClose && config.isAnalyticsEnabled()) {
flushQueue();
}

log.debug("Stopping MetricsProcessor");
if (scheduler.isShutdown()) {
return;
Expand All @@ -324,7 +332,13 @@ public void close() {
shutdownExecutorService(
scheduler,
SdkCodes::infoMetricsThreadExited,
errMsg -> log.warn("failed to stop metrics scheduler: {}", errMsg));
errMsg -> {
if (shouldFlushMetricsOnClose) {
log.warn("Waited for flush to finish {}", errMsg);
} else {
log.warn("Failed to stop metrics scheduler: {}", errMsg);
}
});

log.debug("Closing MetricsProcessor");
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/harness/cf/client/connector/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ public interface Connector {
Service stream(Updater updater) throws ConnectorException;

void close();

boolean getShouldFlushAnalyticsOnClose();

void setIsShuttingDown();
}
12 changes: 9 additions & 3 deletions src/main/java/io/harness/cf/client/connector/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.*;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,6 +38,7 @@ public class EventSource implements Callback, AutoCloseable, Service {
private final Map<String, String> headers;
private final long sseReadTimeoutMins;
private final List<X509Certificate> trustedCAs;
private final AtomicBoolean isShuttingDown;

static {
LogUtil.setSystemProps();
Expand All @@ -48,7 +50,7 @@ public EventSource(
@NonNull Updater updater,
long sseReadTimeoutMins)
throws ConnectorException {
this(url, headers, updater, sseReadTimeoutMins, 2_000, null);
this(url, headers, updater, sseReadTimeoutMins, 2_000, null, new AtomicBoolean(false));
}

EventSource(
Expand All @@ -57,14 +59,16 @@ public EventSource(
@NonNull Updater updater,
long sseReadTimeoutMins,
int retryBackoffDelay,
List<X509Certificate> trustedCAs) {
List<X509Certificate> trustedCAs,
AtomicBoolean isShuttingDown) {
this.url = url;
this.headers = headers;
this.updater = updater;
this.sseReadTimeoutMins = sseReadTimeoutMins;
this.retryBackoffDelay = retryBackoffDelay;
this.trustedCAs = trustedCAs;
this.loggingInterceptor = new HttpLoggingInterceptor();
this.isShuttingDown = isShuttingDown;
}

protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certificate> trustedCAs)
Expand All @@ -83,7 +87,8 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certif
httpClientBuilder.interceptors().remove(loggingInterceptor);
}

httpClientBuilder.addInterceptor(new NewRetryInterceptor(retryBackoffDelay));
httpClientBuilder.addInterceptor(
new NewRetryInterceptor(retryBackoffDelay, true, isShuttingDown));
return httpClientBuilder.build();
}

Expand Down Expand Up @@ -149,6 +154,7 @@ public void stop() {
public void close() {
stop();
if (this.streamClient != null) {
this.streamClient.dispatcher().executorService().shutdown();
this.streamClient.connectionPool().evictAll();
}
log.debug("EventSource closed");
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/io/harness/cf/client/connector/HarnessConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,73 @@ public class HarnessConfig {
* should include intermediate CAs too to allow the HTTP client to build a full trust chain.
*/
@Builder.Default List<X509Certificate> tlsTrustedCAs = null;

/**
* Defines the maximum number of retry attempts for certain types of requests:
* authentication, polling, metrics, and reacting to stream events. If a request fails,
* the SDK will retry up to this number of times before giving up.
* <p>
* - Authentication: Used for retrying authentication requests when the server is unreachable.
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
* - Metrics: Applies to analytics requests for sending metrics data to the server.
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
* where the SDK needs to fetch updated flag or group data.
* <p>
* Note: This setting does not apply to streaming requests (either the initial connection or
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
* (infinite retries).
*/
@Builder.Default private long maxRequestRetry = 10;

/**
* Indicates whether to flush analytics data when the SDK is closed.
* <p>
* When set to {@code true}, any remaining analytics data (such as metrics)
* will be sent to the server before the SDK is fully closed. If {@code false},
* the data will not be flushed, and any unsent analytics data may be lost.
* <p>
* The default value is {@code false}.
* <p>
* <b>Note:</b> The flush will attempt to send the data in a single request.
* Any failures during this process will not be retried, and the analytics data
* may be lost.
*
* <p>Example usage:
* <pre>
* {@code
* HarnessConfig harnessConfig = HarnessConfig.builder()
* .flushAnalyticsOnClose(true)
* .build();
* }
* </pre>
*/
@Builder.Default private final boolean flushAnalyticsOnClose = false;

/**
* The timeout for flushing analytics on SDK close.
* <p>
* This option sets the maximum duration, in milliseconds, the SDK will wait for the
* analytics data to be flushed after the SDK has been closed. If the flush process takes longer
* than this timeout, the request will be canceled, and any remaining data will
* not be sent. This ensures that the SDK does not hang indefinitely during shutdown.
* <p>
* The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK
* <p>
* <b>Note:</b> This timeout only applies to the flush process that happens when
* {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other
* requests made by the SDK during normal operation.
*
* <p>Example usage:
* <pre>
* {@code
*
* HarnessConfig harnessConfig = HarnessConfig.builder()
* .flushAnalyticsOnClose(true)
* // Timeout the analytics flush request in 3000ms (3 seconds)
* .flushAnalyticsOnCloseTimeout(3000).build();
* .build();
* }
* </pre>
*/
@Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000;
}
Loading
Loading