From b6151849f8f4a532ecfbdb93082cdc388f82f2ab Mon Sep 17 00:00:00 2001 From: ggivo Date: Tue, 10 Jun 2025 16:26:32 +0300 Subject: [PATCH 1/6] Implement failover retry for in-flight commands This change adds support for retrying in-flight commands when a Redis connection fails and the client automatically fails over to another cluster. The feature is configurable through the FailoverOptions builder. Key changes: - Added retryFailedInflightCommands option to FailoverOptions - Implemented retry logic in CircuitBreakerCommandExecutor - Added integration tests to verify both retry and no-retry behavior - Created utility methods for test setup and configuration This enhancement improves resilience for long-running commands like blocking operations, allowing them to transparently continue on the failover cluster without client-side errors. --- .../jedis/MultiClusterClientConfig.java | 4 +- .../redis/clients/jedis/UnifiedJedis.java | 15 +- .../mcf/CircuitBreakerCommandExecutor.java | 32 ++++- .../mcf/ConnectionFailoverException.java | 9 ++ .../clients/jedis/mcf/FailoverOptions.java | 59 ++++++++ .../failover/FailoverIntegrationTest.java | 130 ++++++++++++++++-- 6 files changed, 231 insertions(+), 18 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java create mode 100644 src/main/java/redis/clients/jedis/mcf/FailoverOptions.java diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index 247ffb7ae8..517df6cde8 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -11,7 +11,7 @@ import redis.clients.jedis.annots.Experimental; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; - +import redis.clients.jedis.mcf.ConnectionFailoverException; /** * @author Allen Terleto (aterleto) @@ -43,7 +43,7 @@ public final class MultiClusterClientConfig { private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage private static final List CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); - private static final List> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class); + private static final List> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class, ConnectionFailoverException.class); private final ClusterConfig[] clusterConfigs; diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index e3960862fa..21a126eaad 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -30,6 +30,7 @@ import redis.clients.jedis.json.Path2; import redis.clients.jedis.json.JsonObjectMapper; import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor; +import redis.clients.jedis.mcf.FailoverOptions; import redis.clients.jedis.mcf.MultiClusterPipeline; import redis.clients.jedis.mcf.MultiClusterTransaction; import redis.clients.jedis.params.*; @@ -237,7 +238,19 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo */ @Experimental public UnifiedJedis(MultiClusterPooledConnectionProvider provider) { - this(new CircuitBreakerCommandExecutor(provider), provider); + this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider); + } + + /** + * Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool. + *

+ * With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s) + * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs + *

+ */ + @Experimental + public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) { + this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider); } /** diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 56be63d93d..6db05ed73b 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -24,8 +24,12 @@ public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor { - public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) { + private final FailoverOptions options; + + public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider, + FailoverOptions options) { super(provider); + this.options = options != null ? options : FailoverOptions.builder().build(); } @Override @@ -49,9 +53,35 @@ public T executeCommand(CommandObject commandObject) { private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) { try (Connection connection = cluster.getConnection()) { return connection.executeCommand(commandObject); + } catch (Exception e) { + + if (shouldRetryFailedInflightCommands() && !isActiveCluster(cluster) + && isCircuitBreakerOpen(cluster.getCircuitBreaker()) + && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) { + throw new ConnectionFailoverException( + "Command failed during failover: " + cluster.getCircuitBreaker().getName(), e); + } + + throw e; } } + private boolean isCircuitBreakerOpen(CircuitBreaker circuitBreaker) { + return circuitBreaker.getState() == CircuitBreaker.State.OPEN; + } + + private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb) { + return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e); + } + + private boolean shouldRetryFailedInflightCommands() { + return options.isRetryFailedInflightCommands(); + } + + private boolean isActiveCluster(Cluster cluster) { + return provider.getCluster() != cluster; + } + /** * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker * failure scenarios diff --git a/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java b/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java new file mode 100644 index 0000000000..986572cfae --- /dev/null +++ b/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java @@ -0,0 +1,9 @@ +package redis.clients.jedis.mcf; + +import redis.clients.jedis.exceptions.JedisException; + +public class ConnectionFailoverException extends JedisException { + public ConnectionFailoverException(String s, Exception e) { + super(s, e); + } +} diff --git a/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java new file mode 100644 index 0000000000..d232f64bd1 --- /dev/null +++ b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java @@ -0,0 +1,59 @@ +package redis.clients.jedis.mcf; + +import redis.clients.jedis.annots.Experimental; + +/** + * Configuration options for CircuitBreakerCommandExecutor + */ +@Experimental +public class FailoverOptions { + private final boolean retryFailedInflightCommands; + + private FailoverOptions(Builder builder) { + this.retryFailedInflightCommands = builder.retryFailedInflightCommands; + } + + /** + * Gets whether to retry failed in-flight commands during failover + * @return true if retry is enabled, false otherwise + */ + public boolean isRetryFailedInflightCommands() { + return retryFailedInflightCommands; + } + + /** + * Creates a new builder with default options + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for FailoverOptions + */ + public static class Builder { + private boolean retryFailedInflightCommands = false; + + private Builder() { + } + + /** + * Sets whether to retry failed in-flight commands during failover + * @param retry true to retry, false otherwise + * @return this builder for method chaining + */ + public Builder retryFailedInflightCommands(boolean retry) { + this.retryFailedInflightCommands = retry; + return this; + } + + /** + * Builds a new FailoverOptions instance with the configured options + * @return a new FailoverOptions instance + */ + public FailoverOptions build() { + return new FailoverOptions(this); + } + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index 9ac4ac0907..a71d3239df 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -3,6 +3,7 @@ import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import eu.rekawek.toxiproxy.model.Toxic; +import eu.rekawek.toxiproxy.model.ToxicDirection; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -18,6 +19,7 @@ import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.UnifiedJedis; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.mcf.FailoverOptions; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.scenario.RecommendedSettings; @@ -28,6 +30,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -54,9 +58,44 @@ public class FailoverIntegrationTest { private static UnifiedJedis jedis2; private static String JEDIS1_ID = ""; private static String JEDIS2_ID = ""; - MultiClusterPooledConnectionProvider provider; + private MultiClusterPooledConnectionProvider provider; private UnifiedJedis failoverClient; + /** + * Creates a MultiClusterPooledConnectionProvider with standard configuration + * @return A configured provider + */ + private MultiClusterPooledConnectionProvider createProvider() { + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) + .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) + .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) + .circuitBreakerSlidingWindowMinCalls(1).build(); + + return new MultiClusterPooledConnectionProvider(failoverConfig); + } + + /** + * Creates a UnifiedJedis client with customizable failover options + * @param provider The connection provider to use + * @param optionsCustomizer A function that customizes the failover options (can be null for + * defaults) + * @return A configured failover client + */ + private UnifiedJedis createClient(MultiClusterPooledConnectionProvider provider, + Function optionsCustomizer) { + FailoverOptions.Builder builder = FailoverOptions.builder(); + if (optionsCustomizer != null) { + builder = optionsCustomizer.apply(builder); + } + + return new UnifiedJedis(provider, builder.build()); + } + @BeforeAll public static void setupAdminClients() throws IOException { if (tp.getProxyOrNull("redis-1") != null) { @@ -114,7 +153,6 @@ public void setup() throws IOException { try { proxy.enable(); for (Toxic toxic : proxy.toxics().getAll()) { - toxic.remove(); } } catch (IOException e) { @@ -133,18 +171,9 @@ public void setup() throws IOException { JEDIS1_ID = getNodeId(jedis1); JEDIS2_ID = getNodeId(jedis2); - JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() - .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) - .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); - - MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( - getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) - .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) - .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) - .circuitBreakerSlidingWindowMinCalls(1).build(); - - provider = new MultiClusterPooledConnectionProvider(failoverConfig); - failoverClient = new UnifiedJedis(provider); + // Create default provider and client for most tests + provider = createProvider(); + failoverClient = createClient(provider, null); } @AfterEach @@ -328,6 +357,79 @@ public void testCircuitBreakerCountsEachConnectionErrorSeparately() throws IOExc assertThat(getNodeId(client.info("server")), equalTo(JEDIS2_ID)); } + } + /** + * Tests that in-flight commands are retried after automatic failover when retry is enabled. + */ + @Test + public void testInflightCommandsAreRetriedAfterFailover() throws Exception { + // Create a custom provider and client with retry enabled for this specific test + MultiClusterPooledConnectionProvider customProvider = createProvider(); + + try (UnifiedJedis customClient = createClient(customProvider, + builder -> builder.retryFailedInflightCommands(true))) { + + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); + Future> blpop = executor.submit(() -> { + try { + // This command will block until a value is pushed to the list or timeout occurs + // We will trigger failover while this command is blocking + return customClient.blpop(10000, "test-list-1"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // info command will return more than 100 bytes so we simulate connection dropped + redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); + assertThrows(JedisConnectionException.class, + () -> customClient.set("test-key", generateTestValue(150))); + + // Disable redisProxy1 to enforce current blpop command failure + redisProxy1.disable(); + + // Check that the circuit breaker for Endpoint 1 is open + assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), + equalTo(CircuitBreaker.State.OPEN)); + + customClient.rpush("test-list-1", "somevalue"); + assertThat(blpop.get(), equalTo(Arrays.asList("test-list-1", "somevalue"))); + } + } + + /** + * Tests that in-flight commands are not retried after automatic failover when retry is disabled. + */ + @Test + public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception { + // Create a custom provider and client with retry disabled for this specific test + MultiClusterPooledConnectionProvider customProvider = createProvider(); + + try (UnifiedJedis customClient = createClient(customProvider, + builder -> builder.retryFailedInflightCommands(false))) { + + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); + Future> blpop = executor.submit(() -> customClient.blpop(10000, "test-list-2")); + + // info command will return more than 100 bytes so we simulate connection dropped + redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); + assertThrows(JedisConnectionException.class, + () -> customClient.set("test-key", generateTestValue(150))); + + // Disable redisProxy1 to enforce current blpop command failure + redisProxy1.disable(); + + // Check that the circuit breaker for Endpoint 1 is open + assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), + equalTo(CircuitBreaker.State.OPEN)); + + // The blpop command should fail since retry is disabled + customClient.rpush("test-list-2", "somevalue"); + ExecutionException exception = assertThrows(ExecutionException.class, + () -> blpop.get(1, TimeUnit.SECONDS)); + assertThat(exception.getCause(), instanceOf(JedisConnectionException.class)); + } + } } From f3ab1fae7ba81da3ffeb26643d0ca721a9b0ee69 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 11 Jun 2025 09:15:21 +0300 Subject: [PATCH 2/6] Update src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> # Conflicts: # src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java --- .../redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 6db05ed73b..c4948b81b8 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -79,7 +79,8 @@ private boolean shouldRetryFailedInflightCommands() { } private boolean isActiveCluster(Cluster cluster) { - return provider.getCluster() != cluster; + Cluster activeCluster = provider.getCluster(); + return activeCluster != null && !activeCluster.equals(cluster); } /** From adf7f575aa9f13dd95caab87d4b2f5baebb17e66 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 11 Jun 2025 09:46:31 +0300 Subject: [PATCH 3/6] format format & clean-up --- .../mcf/CircuitBreakerCommandExecutor.java | 4 +- .../failover/FailoverIntegrationTest.java | 222 +++++++++--------- 2 files changed, 112 insertions(+), 114 deletions(-) diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index c4948b81b8..e7caa93220 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -79,8 +79,8 @@ private boolean shouldRetryFailedInflightCommands() { } private boolean isActiveCluster(Cluster cluster) { - Cluster activeCluster = provider.getCluster(); - return activeCluster != null && !activeCluster.equals(cluster); + Cluster activeCluster = provider.getCluster(); + return activeCluster != null && !activeCluster.equals(cluster); } /** diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index a71d3239df..2e12508435 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -61,41 +61,6 @@ public class FailoverIntegrationTest { private MultiClusterPooledConnectionProvider provider; private UnifiedJedis failoverClient; - /** - * Creates a MultiClusterPooledConnectionProvider with standard configuration - * @return A configured provider - */ - private MultiClusterPooledConnectionProvider createProvider() { - JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() - .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) - .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); - - MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( - getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) - .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) - .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) - .circuitBreakerSlidingWindowMinCalls(1).build(); - - return new MultiClusterPooledConnectionProvider(failoverConfig); - } - - /** - * Creates a UnifiedJedis client with customizable failover options - * @param provider The connection provider to use - * @param optionsCustomizer A function that customizes the failover options (can be null for - * defaults) - * @return A configured failover client - */ - private UnifiedJedis createClient(MultiClusterPooledConnectionProvider provider, - Function optionsCustomizer) { - FailoverOptions.Builder builder = FailoverOptions.builder(); - if (optionsCustomizer != null) { - builder = optionsCustomizer.apply(builder); - } - - return new UnifiedJedis(provider, builder.build()); - } - @BeforeAll public static void setupAdminClients() throws IOException { if (tp.getProxyOrNull("redis-1") != null) { @@ -120,33 +85,6 @@ public static void cleanupAdminClients() throws IOException { executor.shutdown(); } - private static String getNodeId(UnifiedJedis client) { - - return getNodeId(client.info("server")); - } - - private static String getNodeId(String info) { - - Matcher m = pattern.matcher(info); - if (m.find()) { - return m.group(1); - } - return null; - } - - /** - * Generates a string of a specific byte size. - * @param byteSize The desired size in bytes - * @return A string of the specified byte size - */ - private static String generateTestValue(int byteSize) { - StringBuilder value = new StringBuilder(byteSize); - for (int i = 0; i < byteSize; i++) { - value.append('x'); - } - return value.toString(); - } - @BeforeEach public void setup() throws IOException { tp.getProxies().forEach(proxy -> { @@ -185,12 +123,16 @@ public void cleanup() throws IOException { /** * Tests the automatic failover behavior when a Redis server becomes unavailable. This test - * verifies: 1. Initial connection to the first Redis server works correctly 2. When the first - * server is disabled, the first command throws a JedisConnectionException 3. The circuit breaker - * for the first endpoint transitions to OPEN state 4. Subsequent commands are automatically - * routed to the second available endpoint 5. When the second server is also disabled, all - * commands fail with JedisConnectionException 6. The circuit breaker for the second endpoint also - * transitions to OPEN state + * verifies: + *

    + *
  1. Initial connection to the first Redis server works correctly
  2. + *
  3. Disable access, the first command throws
  4. + *
  5. Command failure is propagated to the caller
  6. + *
  7. CB transitions to OPEN, failover is initiated and following commands are sent to the next + * endpoint
  8. + *
  9. Second server is also disabled, all commands fail with JedisConnectionException and error + * is propagated to the caller
  10. + *
*/ @Test public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception { @@ -200,12 +142,11 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception redisProxy1.disable(); // Endpoint 1 not available - // 1. First call should should throw JedisConnectionException and trigger failover - // 1.1. Endpoint1 CB transitions to open - // 2. Subsequent calls should be routed to Endpoint2 + // 1. First call should throw JedisConnectionException and trigger failover + // 2. Endpoint 1 CB transitions to OPEN + // 3. Subsequent calls should be routed to Endpoint 2 assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); - // Check that the circuit breaker for Endpoint 1 is open assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); @@ -215,12 +156,13 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception // Disable also second proxy redisProxy2.disable(); - // Endpoint1 and Endpoint2 are not available, + // Endpoint1 and Endpoint2 are NOT available, assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); assertThat(provider.getCluster(2).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); - // and since no other nodes are available, it should throw an exception for subsequent calls + // and since no other nodes are available, it should propagate the errors to the caller + // subsequent calls assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); } @@ -247,22 +189,17 @@ public void testManualFailoverInflightCommandsCompleteGracefully() throws ExecutionException, InterruptedException { assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS1_ID)); - Future> blpop = executor.submit(() -> { - try { - // This command will block until a value is pushed to the list or timeout occurs - // We will trigger failover while this command is blocking - return failoverClient.blpop(1000, "test-list"); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + + // We will trigger failover while this command is in-flight + Future> blpop = executor.submit(() -> failoverClient.blpop(1000, "test-list")); provider.setActiveMultiClusterIndex(2); - // new command should be executed against the new endpoint + // After the manual failover, commands should be executed against Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); - // Since failover was manually triggered and there were no errors - // previous endpoint CB should be still in CLOSED + + // Failover was manually triggered, and there were no errors + // previous endpoint CB should still be in CLOSED state assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.CLOSED)); @@ -272,8 +209,8 @@ public void testManualFailoverInflightCommandsCompleteGracefully() } /** - * Verify that in-flight commands during manual failover fail gracefully with an error will - * propagate the error to the caller and will toggle CB to OPEN state. + * Verify that in-flight commands that complete with error during manual failover will propagate + * the error to the caller and toggle CB to OPEN state. */ @Test public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws Exception { @@ -284,9 +221,11 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws // trigger failover manually provider.setActiveMultiClusterIndex(2); Future infoCmd = executor.submit(() -> failoverClient.info("server")); - // new command should be executed against the new endpoint + + // After the manual failover, commands should be executed against Endpoint 2 assertThat(getNodeId(infoCmd.get()), equalTo(JEDIS2_ID)); - // Disable redisProxy1 to simulate an error + + // Disable redisProxy1 to drop active connections and trigger an error redisProxy1.disable(); // previously submitted command should fail with JedisConnectionException @@ -297,7 +236,7 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); - // Ensure that active cluster is still Endpoint 2 + // Ensure that the active cluster is still Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); } @@ -365,35 +304,32 @@ public void testCircuitBreakerCountsEachConnectionErrorSeparately() throws IOExc */ @Test public void testInflightCommandsAreRetriedAfterFailover() throws Exception { - // Create a custom provider and client with retry enabled for this specific test + MultiClusterPooledConnectionProvider customProvider = createProvider(); + // Create a custom client with retryFailedInflightCommands enabled for this specific test try (UnifiedJedis customClient = createClient(customProvider, builder -> builder.retryFailedInflightCommands(true))) { assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); - Future> blpop = executor.submit(() -> { - try { - // This command will block until a value is pushed to the list or timeout occurs - // We will trigger failover while this command is blocking - return customClient.blpop(10000, "test-list-1"); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - // info command will return more than 100 bytes so we simulate connection dropped + // We will trigger failover while this command is in-flight + Future> blpop = executor.submit(() -> customClient.blpop(10000, "test-list-1")); + + // Simulate error by sending more than 100 bytes. This causes the connection close, and + // triggers + // failover redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); assertThrows(JedisConnectionException.class, () -> customClient.set("test-key", generateTestValue(150))); - // Disable redisProxy1 to enforce current blpop command failure - redisProxy1.disable(); - // Check that the circuit breaker for Endpoint 1 is open assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); + // Disable redisProxy1 to enforce the current blpop command failure + redisProxy1.disable(); + customClient.rpush("test-list-1", "somevalue"); assertThat(blpop.get(), equalTo(Arrays.asList("test-list-1", "somevalue"))); } @@ -411,25 +347,87 @@ public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception { builder -> builder.retryFailedInflightCommands(false))) { assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); - Future> blpop = executor.submit(() -> customClient.blpop(10000, "test-list-2")); + Future> blpop = executor.submit(() -> customClient.blpop(1000, "test-list-2")); - // info command will return more than 100 bytes so we simulate connection dropped + // Simulate error by sending more than 100 bytes. This causes connection close, and triggers + // failover redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); assertThrows(JedisConnectionException.class, () -> customClient.set("test-key", generateTestValue(150))); - // Disable redisProxy1 to enforce current blpop command failure - redisProxy1.disable(); - // Check that the circuit breaker for Endpoint 1 is open assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); - // The blpop command should fail since retry is disabled - customClient.rpush("test-list-2", "somevalue"); + // Disable redisProxy1 to enforce the current blpop command failure + redisProxy1.disable(); + + // The in-flight command should fail since the retry is disabled ExecutionException exception = assertThrows(ExecutionException.class, () -> blpop.get(1, TimeUnit.SECONDS)); assertThat(exception.getCause(), instanceOf(JedisConnectionException.class)); } } + + private static String getNodeId(UnifiedJedis client) { + + return getNodeId(client.info("server")); + } + + private static String getNodeId(String info) { + + Matcher m = pattern.matcher(info); + if (m.find()) { + return m.group(1); + } + return null; + } + + /** + * Generates a string of a specific byte size. + * @param byteSize The desired size in bytes + * @return A string of the specified byte size + */ + private static String generateTestValue(int byteSize) { + StringBuilder value = new StringBuilder(byteSize); + for (int i = 0; i < byteSize; i++) { + value.append('x'); + } + return value.toString(); + } + + /** + * Creates a MultiClusterPooledConnectionProvider with standard configuration + * @return A configured provider + */ + private MultiClusterPooledConnectionProvider createProvider() { + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) + .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) + .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) + .circuitBreakerSlidingWindowMinCalls(1).build(); + + return new MultiClusterPooledConnectionProvider(failoverConfig); + } + + /** + * Creates a UnifiedJedis client with customizable failover options + * @param provider The connection provider to use + * @param optionsCustomizer A function that customizes the failover options (can be null for + * defaults) + * @return A configured failover client + */ + private UnifiedJedis createClient(MultiClusterPooledConnectionProvider provider, + Function optionsCustomizer) { + FailoverOptions.Builder builder = FailoverOptions.builder(); + if (optionsCustomizer != null) { + builder = optionsCustomizer.apply(builder); + } + + return new UnifiedJedis(provider, builder.build()); + } } From 7ecbe037756d1622e844d3d33b12d5eeae9ea599 Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 13 Jun 2025 12:45:20 +0300 Subject: [PATCH 4/6] fix FailoverIntegrationTest.testInflightCommandsAreNotRetriedAfterFailover blpop timeout should be less than async command timeout to prevent completing with java.util.concurrent.TimeoutException instead of actuall command failure --- .../redis/clients/jedis/failover/FailoverIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index 2e12508435..b99a699169 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -347,7 +347,7 @@ public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception { builder -> builder.retryFailedInflightCommands(false))) { assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); - Future> blpop = executor.submit(() -> customClient.blpop(1000, "test-list-2")); + Future> blpop = executor.submit(() -> customClient.blpop(500, "test-list-2")); // Simulate error by sending more than 100 bytes. This causes connection close, and triggers // failover From 1e25a092e641849e9296c5a2a8660fbf7262acce Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 13 Jun 2025 22:26:46 +0300 Subject: [PATCH 5/6] Address comments from review - rename retryFailedInflightCommands->retryOnFailover - remove check for CB in OPEN state --- .../mcf/CircuitBreakerCommandExecutor.java | 9 ++++----- .../clients/jedis/mcf/FailoverOptions.java | 18 +++++++++--------- .../failover/FailoverIntegrationTest.java | 18 +++++++++++------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index e7caa93220..a43e9d4cca 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -55,8 +55,7 @@ private T handleExecuteCommand(CommandObject commandObject, Cluster clust return connection.executeCommand(commandObject); } catch (Exception e) { - if (shouldRetryFailedInflightCommands() && !isActiveCluster(cluster) - && isCircuitBreakerOpen(cluster.getCircuitBreaker()) + if (retryOnFailover() && !isActiveCluster(cluster) && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) { throw new ConnectionFailoverException( "Command failed during failover: " + cluster.getCircuitBreaker().getName(), e); @@ -74,13 +73,13 @@ private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb) return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e); } - private boolean shouldRetryFailedInflightCommands() { - return options.isRetryFailedInflightCommands(); + private boolean retryOnFailover() { + return options.isRetryOnFailover(); } private boolean isActiveCluster(Cluster cluster) { Cluster activeCluster = provider.getCluster(); - return activeCluster != null && !activeCluster.equals(cluster); + return activeCluster != null && activeCluster.equals(cluster); } /** diff --git a/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java index d232f64bd1..1d6394bb37 100644 --- a/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java +++ b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java @@ -7,18 +7,18 @@ */ @Experimental public class FailoverOptions { - private final boolean retryFailedInflightCommands; + private final boolean retryOnFailover; private FailoverOptions(Builder builder) { - this.retryFailedInflightCommands = builder.retryFailedInflightCommands; + this.retryOnFailover = builder.retryOnFailover; } /** - * Gets whether to retry failed in-flight commands during failover + * Gets whether to retry failed commands during failover * @return true if retry is enabled, false otherwise */ - public boolean isRetryFailedInflightCommands() { - return retryFailedInflightCommands; + public boolean isRetryOnFailover() { + return retryOnFailover; } /** @@ -33,18 +33,18 @@ public static Builder builder() { * Builder for FailoverOptions */ public static class Builder { - private boolean retryFailedInflightCommands = false; + private boolean retryOnFailover = false; private Builder() { } /** - * Sets whether to retry failed in-flight commands during failover + * Sets whether to retry failed commands during failover * @param retry true to retry, false otherwise * @return this builder for method chaining */ - public Builder retryFailedInflightCommands(boolean retry) { - this.retryFailedInflightCommands = retry; + public Builder retryOnFailover(boolean retry) { + this.retryOnFailover = retry; return this; } diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index b99a699169..442348c3ee 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -307,9 +307,9 @@ public void testInflightCommandsAreRetriedAfterFailover() throws Exception { MultiClusterPooledConnectionProvider customProvider = createProvider(); - // Create a custom client with retryFailedInflightCommands enabled for this specific test + // Create a custom client with retryOnFailover enabled for this specific test try (UnifiedJedis customClient = createClient(customProvider, - builder -> builder.retryFailedInflightCommands(true))) { + builder -> builder.retryOnFailover(true))) { assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); @@ -317,19 +317,23 @@ public void testInflightCommandsAreRetriedAfterFailover() throws Exception { Future> blpop = executor.submit(() -> customClient.blpop(10000, "test-list-1")); // Simulate error by sending more than 100 bytes. This causes the connection close, and - // triggers - // failover + // CB -> OPEN, failover will be actually triggered by the next command redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); assertThrows(JedisConnectionException.class, () -> customClient.set("test-key", generateTestValue(150))); + // Actual failover is performed on first command received after CB is OPEN + // TODO : Remove second command. Once we Refactor existing code to perform actual failover + // immediately when CB state change to OPEN/FORCED_OPENs + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS2_ID)); // Check that the circuit breaker for Endpoint 1 is open assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), - equalTo(CircuitBreaker.State.OPEN)); + equalTo(CircuitBreaker.State.FORCED_OPEN)); - // Disable redisProxy1 to enforce the current blpop command failure + // Disable redisProxy1 to enforce connection drop for the in-flight (blpop) command redisProxy1.disable(); + // The in-flight command should be retried and succeed after failover customClient.rpush("test-list-1", "somevalue"); assertThat(blpop.get(), equalTo(Arrays.asList("test-list-1", "somevalue"))); } @@ -344,7 +348,7 @@ public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception { MultiClusterPooledConnectionProvider customProvider = createProvider(); try (UnifiedJedis customClient = createClient(customProvider, - builder -> builder.retryFailedInflightCommands(false))) { + builder -> builder.retryOnFailover(false))) { assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); Future> blpop = executor.submit(() -> customClient.blpop(500, "test-list-2")); From 2ae7186d2e17788327cab0f0d32caf28c6880a64 Mon Sep 17 00:00:00 2001 From: ggivo Date: Mon, 16 Jun 2025 07:10:53 +0300 Subject: [PATCH 6/6] remove unused method --- .../clients/jedis/mcf/CircuitBreakerCommandExecutor.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index a43e9d4cca..519ea6372e 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -65,10 +65,6 @@ && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) { } } - private boolean isCircuitBreakerOpen(CircuitBreaker circuitBreaker) { - return circuitBreaker.getState() == CircuitBreaker.State.OPEN; - } - private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb) { return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e); }