diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index 247ffb7ae8..517df6cde8 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -11,7 +11,7 @@ import redis.clients.jedis.annots.Experimental; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; - +import redis.clients.jedis.mcf.ConnectionFailoverException; /** * @author Allen Terleto (aterleto) @@ -43,7 +43,7 @@ public final class MultiClusterClientConfig { private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage private static final List CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); - private static final List> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class); + private static final List> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class, ConnectionFailoverException.class); private final ClusterConfig[] clusterConfigs; diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index e3960862fa..21a126eaad 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -30,6 +30,7 @@ import redis.clients.jedis.json.Path2; import redis.clients.jedis.json.JsonObjectMapper; import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor; +import redis.clients.jedis.mcf.FailoverOptions; import redis.clients.jedis.mcf.MultiClusterPipeline; import redis.clients.jedis.mcf.MultiClusterTransaction; import redis.clients.jedis.params.*; @@ -237,7 +238,19 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo */ @Experimental public UnifiedJedis(MultiClusterPooledConnectionProvider provider) { - this(new CircuitBreakerCommandExecutor(provider), provider); + this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider); + } + + /** + * Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool. + *

+ * With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s) + * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs + *

+ */ + @Experimental + public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) { + this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider); } /** diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 56be63d93d..519ea6372e 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -24,8 +24,12 @@ public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor { - public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) { + private final FailoverOptions options; + + public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider, + FailoverOptions options) { super(provider); + this.options = options != null ? options : FailoverOptions.builder().build(); } @Override @@ -49,9 +53,31 @@ public T executeCommand(CommandObject commandObject) { private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) { try (Connection connection = cluster.getConnection()) { return connection.executeCommand(commandObject); + } catch (Exception e) { + + if (retryOnFailover() && !isActiveCluster(cluster) + && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) { + throw new ConnectionFailoverException( + "Command failed during failover: " + cluster.getCircuitBreaker().getName(), e); + } + + throw e; } } + private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb) { + return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e); + } + + private boolean retryOnFailover() { + return options.isRetryOnFailover(); + } + + private boolean isActiveCluster(Cluster cluster) { + Cluster activeCluster = provider.getCluster(); + return activeCluster != null && activeCluster.equals(cluster); + } + /** * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker * failure scenarios diff --git a/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java b/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java new file mode 100644 index 0000000000..986572cfae --- /dev/null +++ b/src/main/java/redis/clients/jedis/mcf/ConnectionFailoverException.java @@ -0,0 +1,9 @@ +package redis.clients.jedis.mcf; + +import redis.clients.jedis.exceptions.JedisException; + +public class ConnectionFailoverException extends JedisException { + public ConnectionFailoverException(String s, Exception e) { + super(s, e); + } +} diff --git a/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java new file mode 100644 index 0000000000..1d6394bb37 --- /dev/null +++ b/src/main/java/redis/clients/jedis/mcf/FailoverOptions.java @@ -0,0 +1,59 @@ +package redis.clients.jedis.mcf; + +import redis.clients.jedis.annots.Experimental; + +/** + * Configuration options for CircuitBreakerCommandExecutor + */ +@Experimental +public class FailoverOptions { + private final boolean retryOnFailover; + + private FailoverOptions(Builder builder) { + this.retryOnFailover = builder.retryOnFailover; + } + + /** + * Gets whether to retry failed commands during failover + * @return true if retry is enabled, false otherwise + */ + public boolean isRetryOnFailover() { + return retryOnFailover; + } + + /** + * Creates a new builder with default options + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for FailoverOptions + */ + public static class Builder { + private boolean retryOnFailover = false; + + private Builder() { + } + + /** + * Sets whether to retry failed commands during failover + * @param retry true to retry, false otherwise + * @return this builder for method chaining + */ + public Builder retryOnFailover(boolean retry) { + this.retryOnFailover = retry; + return this; + } + + /** + * Builds a new FailoverOptions instance with the configured options + * @return a new FailoverOptions instance + */ + public FailoverOptions build() { + return new FailoverOptions(this); + } + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index 9ac4ac0907..442348c3ee 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -3,6 +3,7 @@ import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import eu.rekawek.toxiproxy.model.Toxic; +import eu.rekawek.toxiproxy.model.ToxicDirection; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -18,6 +19,7 @@ import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.UnifiedJedis; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.mcf.FailoverOptions; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.scenario.RecommendedSettings; @@ -28,6 +30,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -54,7 +58,7 @@ public class FailoverIntegrationTest { private static UnifiedJedis jedis2; private static String JEDIS1_ID = ""; private static String JEDIS2_ID = ""; - MultiClusterPooledConnectionProvider provider; + private MultiClusterPooledConnectionProvider provider; private UnifiedJedis failoverClient; @BeforeAll @@ -81,40 +85,12 @@ public static void cleanupAdminClients() throws IOException { executor.shutdown(); } - private static String getNodeId(UnifiedJedis client) { - - return getNodeId(client.info("server")); - } - - private static String getNodeId(String info) { - - Matcher m = pattern.matcher(info); - if (m.find()) { - return m.group(1); - } - return null; - } - - /** - * Generates a string of a specific byte size. - * @param byteSize The desired size in bytes - * @return A string of the specified byte size - */ - private static String generateTestValue(int byteSize) { - StringBuilder value = new StringBuilder(byteSize); - for (int i = 0; i < byteSize; i++) { - value.append('x'); - } - return value.toString(); - } - @BeforeEach public void setup() throws IOException { tp.getProxies().forEach(proxy -> { try { proxy.enable(); for (Toxic toxic : proxy.toxics().getAll()) { - toxic.remove(); } } catch (IOException e) { @@ -133,18 +109,9 @@ public void setup() throws IOException { JEDIS1_ID = getNodeId(jedis1); JEDIS2_ID = getNodeId(jedis2); - JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() - .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) - .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); - - MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( - getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) - .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) - .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) - .circuitBreakerSlidingWindowMinCalls(1).build(); - - provider = new MultiClusterPooledConnectionProvider(failoverConfig); - failoverClient = new UnifiedJedis(provider); + // Create default provider and client for most tests + provider = createProvider(); + failoverClient = createClient(provider, null); } @AfterEach @@ -156,12 +123,16 @@ public void cleanup() throws IOException { /** * Tests the automatic failover behavior when a Redis server becomes unavailable. This test - * verifies: 1. Initial connection to the first Redis server works correctly 2. When the first - * server is disabled, the first command throws a JedisConnectionException 3. The circuit breaker - * for the first endpoint transitions to OPEN state 4. Subsequent commands are automatically - * routed to the second available endpoint 5. When the second server is also disabled, all - * commands fail with JedisConnectionException 6. The circuit breaker for the second endpoint also - * transitions to OPEN state + * verifies: + *

    + *
  1. Initial connection to the first Redis server works correctly
  2. + *
  3. Disable access, the first command throws
  4. + *
  5. Command failure is propagated to the caller
  6. + *
  7. CB transitions to OPEN, failover is initiated and following commands are sent to the next + * endpoint
  8. + *
  9. Second server is also disabled, all commands fail with JedisConnectionException and error + * is propagated to the caller
  10. + *
*/ @Test public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception { @@ -171,12 +142,11 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception redisProxy1.disable(); // Endpoint 1 not available - // 1. First call should should throw JedisConnectionException and trigger failover - // 1.1. Endpoint1 CB transitions to open - // 2. Subsequent calls should be routed to Endpoint2 + // 1. First call should throw JedisConnectionException and trigger failover + // 2. Endpoint 1 CB transitions to OPEN + // 3. Subsequent calls should be routed to Endpoint 2 assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); - // Check that the circuit breaker for Endpoint 1 is open assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); @@ -186,12 +156,13 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception // Disable also second proxy redisProxy2.disable(); - // Endpoint1 and Endpoint2 are not available, + // Endpoint1 and Endpoint2 are NOT available, assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); assertThat(provider.getCluster(2).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); - // and since no other nodes are available, it should throw an exception for subsequent calls + // and since no other nodes are available, it should propagate the errors to the caller + // subsequent calls assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); } @@ -218,22 +189,17 @@ public void testManualFailoverInflightCommandsCompleteGracefully() throws ExecutionException, InterruptedException { assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS1_ID)); - Future> blpop = executor.submit(() -> { - try { - // This command will block until a value is pushed to the list or timeout occurs - // We will trigger failover while this command is blocking - return failoverClient.blpop(1000, "test-list"); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + + // We will trigger failover while this command is in-flight + Future> blpop = executor.submit(() -> failoverClient.blpop(1000, "test-list")); provider.setActiveMultiClusterIndex(2); - // new command should be executed against the new endpoint + // After the manual failover, commands should be executed against Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); - // Since failover was manually triggered and there were no errors - // previous endpoint CB should be still in CLOSED + + // Failover was manually triggered, and there were no errors + // previous endpoint CB should still be in CLOSED state assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.CLOSED)); @@ -243,8 +209,8 @@ public void testManualFailoverInflightCommandsCompleteGracefully() } /** - * Verify that in-flight commands during manual failover fail gracefully with an error will - * propagate the error to the caller and will toggle CB to OPEN state. + * Verify that in-flight commands that complete with error during manual failover will propagate + * the error to the caller and toggle CB to OPEN state. */ @Test public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws Exception { @@ -255,9 +221,11 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws // trigger failover manually provider.setActiveMultiClusterIndex(2); Future infoCmd = executor.submit(() -> failoverClient.info("server")); - // new command should be executed against the new endpoint + + // After the manual failover, commands should be executed against Endpoint 2 assertThat(getNodeId(infoCmd.get()), equalTo(JEDIS2_ID)); - // Disable redisProxy1 to simulate an error + + // Disable redisProxy1 to drop active connections and trigger an error redisProxy1.disable(); // previously submitted command should fail with JedisConnectionException @@ -268,7 +236,7 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws assertThat(provider.getCluster(1).getCircuitBreaker().getState(), equalTo(CircuitBreaker.State.OPEN)); - // Ensure that active cluster is still Endpoint 2 + // Ensure that the active cluster is still Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); } @@ -328,6 +296,142 @@ public void testCircuitBreakerCountsEachConnectionErrorSeparately() throws IOExc assertThat(getNodeId(client.info("server")), equalTo(JEDIS2_ID)); } + + } + + /** + * Tests that in-flight commands are retried after automatic failover when retry is enabled. + */ + @Test + public void testInflightCommandsAreRetriedAfterFailover() throws Exception { + + MultiClusterPooledConnectionProvider customProvider = createProvider(); + + // Create a custom client with retryOnFailover enabled for this specific test + try (UnifiedJedis customClient = createClient(customProvider, + builder -> builder.retryOnFailover(true))) { + + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); + + // We will trigger failover while this command is in-flight + Future> blpop = executor.submit(() -> customClient.blpop(10000, "test-list-1")); + + // Simulate error by sending more than 100 bytes. This causes the connection close, and + // CB -> OPEN, failover will be actually triggered by the next command + redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); + assertThrows(JedisConnectionException.class, + () -> customClient.set("test-key", generateTestValue(150))); + + // Actual failover is performed on first command received after CB is OPEN + // TODO : Remove second command. Once we Refactor existing code to perform actual failover + // immediately when CB state change to OPEN/FORCED_OPENs + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS2_ID)); + // Check that the circuit breaker for Endpoint 1 is open + assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), + equalTo(CircuitBreaker.State.FORCED_OPEN)); + + // Disable redisProxy1 to enforce connection drop for the in-flight (blpop) command + redisProxy1.disable(); + + // The in-flight command should be retried and succeed after failover + customClient.rpush("test-list-1", "somevalue"); + assertThat(blpop.get(), equalTo(Arrays.asList("test-list-1", "somevalue"))); + } + } + + /** + * Tests that in-flight commands are not retried after automatic failover when retry is disabled. + */ + @Test + public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception { + // Create a custom provider and client with retry disabled for this specific test + MultiClusterPooledConnectionProvider customProvider = createProvider(); + + try (UnifiedJedis customClient = createClient(customProvider, + builder -> builder.retryOnFailover(false))) { + + assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID)); + Future> blpop = executor.submit(() -> customClient.blpop(500, "test-list-2")); + + // Simulate error by sending more than 100 bytes. This causes connection close, and triggers + // failover + redisProxy1.toxics().limitData("simulate-socket-failure", ToxicDirection.UPSTREAM, 100); + assertThrows(JedisConnectionException.class, + () -> customClient.set("test-key", generateTestValue(150))); + + // Check that the circuit breaker for Endpoint 1 is open + assertThat(customProvider.getCluster(1).getCircuitBreaker().getState(), + equalTo(CircuitBreaker.State.OPEN)); + + // Disable redisProxy1 to enforce the current blpop command failure + redisProxy1.disable(); + + // The in-flight command should fail since the retry is disabled + ExecutionException exception = assertThrows(ExecutionException.class, + () -> blpop.get(1, TimeUnit.SECONDS)); + assertThat(exception.getCause(), instanceOf(JedisConnectionException.class)); + } } + private static String getNodeId(UnifiedJedis client) { + + return getNodeId(client.info("server")); + } + + private static String getNodeId(String info) { + + Matcher m = pattern.matcher(info); + if (m.find()) { + return m.group(1); + } + return null; + } + + /** + * Generates a string of a specific byte size. + * @param byteSize The desired size in bytes + * @return A string of the specified byte size + */ + private static String generateTestValue(int byteSize) { + StringBuilder value = new StringBuilder(byteSize); + for (int i = 0; i < byteSize; i++) { + value.append('x'); + } + return value.toString(); + } + + /** + * Creates a MultiClusterPooledConnectionProvider with standard configuration + * @return A configured provider + */ + private MultiClusterPooledConnectionProvider createProvider() { + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) + .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) + .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) + .circuitBreakerSlidingWindowMinCalls(1).build(); + + return new MultiClusterPooledConnectionProvider(failoverConfig); + } + + /** + * Creates a UnifiedJedis client with customizable failover options + * @param provider The connection provider to use + * @param optionsCustomizer A function that customizes the failover options (can be null for + * defaults) + * @return A configured failover client + */ + private UnifiedJedis createClient(MultiClusterPooledConnectionProvider provider, + Function optionsCustomizer) { + FailoverOptions.Builder builder = FailoverOptions.builder(); + if (optionsCustomizer != null) { + builder = optionsCustomizer.apply(builder); + } + + return new UnifiedJedis(provider, builder.build()); + } }