diff --git a/.github/workflows/test-on-docker.yml b/.github/workflows/test-on-docker.yml index de4dc71d06..da45bd0504 100644 --- a/.github/workflows/test-on-docker.yml +++ b/.github/workflows/test-on-docker.yml @@ -1,6 +1,6 @@ --- -name: Build and Test using containerized environment +name: Build and Test using a containerized environment on: push: @@ -11,10 +11,12 @@ on: branches: - master - '[0-9].*' + - 'feature/**' pull_request: branches: - master - '[0-9].*' + - 'feature/**' schedule: - cron: '0 1 * * *' # nightly build workflow_dispatch: diff --git a/pom.xml b/pom.xml index b5f4083e82..5dc119ef0d 100644 --- a/pom.xml +++ b/pom.xml @@ -334,6 +334,8 @@ ${project.basedir}/hbase-formatter.xml ${project.basedir}/src/main/java/redis/clients/jedis/annots + ${project.basedir}/src/main/java/redis/clients/jedis/mcf + ${project.basedir}/src/test/java/redis/clients/jedis/failover diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 5d54b67d17..16b8fd7efb 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -13,51 +13,57 @@ /** * @author Allen Terleto (aterleto) - *

- * CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database endpoint. - * With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s) - * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs - *

+ *

+ * CommandExecutor with built-in retry, circuit-breaker, and failover to another + * cluster/database endpoint. With this executor users can seamlessly failover to Disaster + * Recovery (DR), Backup, and Active-Active cluster(s) by using simple configuration which + * is passed through from Resilience4j - https://resilience4j.readme.io/docs + *

*/ @Experimental -public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor { +public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase + implements CommandExecutor { - public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) { - super(provider); - } + public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) { + super(provider); + } - @Override - public T executeCommand(CommandObject commandObject) { - Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety + @Override + public T executeCommand(CommandObject commandObject) { + Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety - DecorateSupplier supplier = Decorators.ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster)); + DecorateSupplier supplier = Decorators + .ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster)); - supplier.withRetry(cluster.getRetry()); - supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(provider.getFallbackExceptionList(), - e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); + supplier.withRetry(cluster.getRetry()); + supplier.withCircuitBreaker(cluster.getCircuitBreaker()); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); - return supplier.decorate().get(); - } + return supplier.decorate().get(); + } - /** - * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios - */ - private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) { - try (Connection connection = cluster.getConnection()) { - return connection.executeCommand(commandObject); - } + /** + * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios + */ + private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) { + try (Connection connection = cluster.getConnection()) { + return connection.executeCommand(commandObject); } + } - /** - * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios - */ - private T handleClusterFailover(CommandObject commandObject, CircuitBreaker circuitBreaker) { + /** + * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker + * failure scenarios + */ + private T handleClusterFailover(CommandObject commandObject, + CircuitBreaker circuitBreaker) { - clusterFailover(circuitBreaker); + clusterFailover(circuitBreaker); - // Recursive call to the initiating method so the operation can be retried on the next cluster connection - return executeCommand(commandObject); - } + // Recursive call to the initiating method so the operation can be retried on the next cluster + // connection + return executeCommand(commandObject); + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index 91ad29d9a0..50e8e827be 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -10,59 +10,68 @@ /** * @author Allen Terleto (aterleto) - *

- * Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database - * endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active - * cluster(s) by using simple configuration which is passed through from - * Resilience4j - https://resilience4j.readme.io/docs - *

+ *

+ * Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to + * another cluster/database endpoint. With this executor users can seamlessly failover to + * Disaster Recovery (DR), Backup, and Active-Active cluster(s) by using simple + * configuration which is passed through from Resilience4j - + * https://resilience4j.readme.io/docs + *

*/ @Experimental public class CircuitBreakerFailoverBase implements AutoCloseable { - private final Lock lock = new ReentrantLock(true); + private final Lock lock = new ReentrantLock(true); - protected final MultiClusterPooledConnectionProvider provider; + protected final MultiClusterPooledConnectionProvider provider; - public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) { - this.provider = provider; - } + public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) { + this.provider = provider; + } - @Override - public void close() { - IOUtils.closeQuietly(this.provider); - } + @Override + public void close() { + IOUtils.closeQuietly(this.provider); + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker + * failure scenarios + */ + protected void clusterFailover(CircuitBreaker circuitBreaker) { + lock.lock(); - /** - * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios - */ - protected void clusterFailover(CircuitBreaker circuitBreaker) { - lock.lock(); - - try { - // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent - if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { + try { + // Check state to handle race conditions since incrementActiveMultiClusterIndex() is + // non-idempotent + if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { - // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. - // To recover/transition from this forced state the user will need to manually failback - circuitBreaker.transitionToForcedOpenState(); + // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and + // event publishing. + // To recover/transition from this forced state the user will need to manually failback + circuitBreaker.transitionToForcedOpenState(); - // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() - // to use the next cluster's connection pool - according to the configuration's prioritization/order - int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); + // Incrementing the activeMultiClusterIndex will allow subsequent calls to the + // executeCommand() + // to use the next cluster's connection pool - according to the configuration's + // prioritization/order + int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); - // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging - provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); - } + // Implementation is optionally provided during configuration. Typically, used for + // activeMultiClusterIndex persistence or custom logging + provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); + } - // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail - else if (provider.isLastClusterCircuitBreakerForcedOpen()) { - throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + - "provided with an additional cluster/database endpoint according to its prioritized sequence. " + - "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); - } - } finally { - lock.unlock(); - } + // Once the priority list is exhausted only a manual failback can open the circuit breaker so + // all subsequent operations will fail + else if (provider.isLastClusterCircuitBreakerForcedOpen()) { + throw new JedisConnectionException( + "Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + + "provided with an additional cluster/database endpoint according to its prioritized sequence. " + + "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + } + } finally { + lock.unlock(); } + } } diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java index cd0e91015a..91e2ee772d 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java @@ -10,48 +10,52 @@ import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster; /** - * ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database endpoint. - * With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s) - * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs + * ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database + * endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and + * Active-Active cluster(s) by using simple configuration which is passed through from Resilience4j + * - https://resilience4j.readme.io/docs */ @Experimental public class CircuitBreakerFailoverConnectionProvider extends CircuitBreakerFailoverBase { - public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) { - super(provider); - } - - public Connection getConnection() { - Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety - - DecorateSupplier supplier = Decorators.ofSupplier(() -> this.handleGetConnection(cluster)); - - supplier.withRetry(cluster.getRetry()); - supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(provider.getFallbackExceptionList(), - e -> this.handleClusterFailover(cluster.getCircuitBreaker())); - - return supplier.decorate().get(); - } - - /** - * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios - */ - private Connection handleGetConnection(Cluster cluster) { - Connection connection = cluster.getConnection(); - connection.ping(); - return connection; - } - - /** - * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios - */ - private Connection handleClusterFailover(CircuitBreaker circuitBreaker) { - - clusterFailover(circuitBreaker); - - // Recursive call to the initiating method so the operation can be retried on the next cluster connection - return getConnection(); - } + public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) { + super(provider); + } + + public Connection getConnection() { + Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety + + DecorateSupplier supplier = Decorators + .ofSupplier(() -> this.handleGetConnection(cluster)); + + supplier.withRetry(cluster.getRetry()); + supplier.withCircuitBreaker(cluster.getCircuitBreaker()); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(cluster.getCircuitBreaker())); + + return supplier.decorate().get(); + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios + */ + private Connection handleGetConnection(Cluster cluster) { + Connection connection = cluster.getConnection(); + connection.ping(); + return connection; + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker + * failure scenarios + */ + private Connection handleClusterFailover(CircuitBreaker circuitBreaker) { + + clusterFailover(circuitBreaker); + + // Recursive call to the initiating method so the operation can be retried on the next cluster + // connection + return getConnection(); + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java index 2f21700b8f..25791f67b6 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java @@ -12,7 +12,8 @@ /** * This is high memory dependent solution as all the appending commands will be hold in memory until - * {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called. + * {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets + * called. */ @Experimental public class MultiClusterPipeline extends PipelineBase implements Closeable { @@ -32,7 +33,8 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) } } - public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) { + public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, + CommandObjects commandObjects) { super(commandObjects); this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); } @@ -52,8 +54,9 @@ public void close() { } /** - * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values - * from pipelined commands, capture the different Response<?> of the commands you execute. + * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to + * get return values from pipelined commands, capture the different Response<?> of the + * commands you execute. */ @Override public void sync() { diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java index f4ca7a627f..51f8c49c89 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java @@ -24,7 +24,7 @@ public class MultiClusterTransaction extends TransactionBase { private static final Builder NO_OP_BUILDER = BuilderFactory.RAW_OBJECT; - + private static final String GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE = "Graph commands are not supported."; private final CircuitBreakerFailoverConnectionProvider failoverProvider; @@ -47,7 +47,6 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) { /** * A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should * be {@code doMulti=false}. - * * @param provider * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI */ @@ -66,12 +65,12 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo /** * A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should * be {@code doMulti=false}. - * * @param provider * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI * @param commandObjects command objects */ - public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) { + public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, + CommandObjects commandObjects) { super(commandObjects); this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider); @@ -169,7 +168,7 @@ public final List exec() { } List formatted = new ArrayList<>(unformatted.size() - extraCommandCount.get()); - for (Object rawReply: unformatted) { + for (Object rawReply : unformatted) { try { Response response = commands.poll().getValue(); response.set(rawReply); diff --git a/src/main/java/redis/clients/jedis/mcf/package-info.java b/src/main/java/redis/clients/jedis/mcf/package-info.java index 60b1f9c123..c79cb52383 100644 --- a/src/main/java/redis/clients/jedis/mcf/package-info.java +++ b/src/main/java/redis/clients/jedis/mcf/package-info.java @@ -1,5 +1,6 @@ /** - * This package contains the classes that are related to Active-Active cluster(s) and Multi-Cluster failover. + * This package contains the classes that are related to Active-Active cluster(s) and Multi-Cluster + * failover. */ @Experimental package redis.clients.jedis.mcf; diff --git a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java index fe833d6097..e24ab0f764 100644 --- a/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/failover/FailoverIntegrationTest.java @@ -139,9 +139,9 @@ public void setup() throws IOException { MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig.Builder( getClusterConfigs(clientConfig, endpoint1, endpoint2)).retryMaxAttempts(1) - .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) - .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) - .circuitBreakerSlidingWindowMinCalls(1).build(); + .retryWaitDuration(1).circuitBreakerSlidingWindowType(COUNT_BASED) + .circuitBreakerSlidingWindowSize(1).circuitBreakerFailureRateThreshold(100) + .circuitBreakerSlidingWindowMinCalls(1).build(); provider = new MultiClusterPooledConnectionProvider(failoverConfig); failoverClient = new UnifiedJedis(provider); @@ -171,14 +171,14 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception redisProxy1.disable(); // Endpoint 1 not available - // 1. First call should should throw JedisConnectionException and trigger failover - // 1.1. Endpoint1 CB transitions to open + // 1. First call should should throw JedisConnectionException and trigger failover + // 1.1. Endpoint1 CB transitions to open // 2. Subsequent calls should be routed to Endpoint2 assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); // Check that the circuit breaker for Endpoint 1 is open assertThat(provider.getCluster(1).getCircuitBreaker().getState(), - equalTo(CircuitBreaker.State.OPEN)); + equalTo(CircuitBreaker.State.OPEN)); // Check that the failoverClient is now using Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); @@ -189,7 +189,7 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception // Endpoint1 and Endpoint2 are not available, assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); assertThat(provider.getCluster(2).getCircuitBreaker().getState(), - equalTo(CircuitBreaker.State.OPEN)); + equalTo(CircuitBreaker.State.OPEN)); // and since no other nodes are available, it should throw an exception for subsequent calls assertThrows(JedisConnectionException.class, () -> failoverClient.info("server")); @@ -232,10 +232,10 @@ public void testManualFailoverInflightCommandsCompleteGracefully() // new command should be executed against the new endpoint assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID)); - //Since failover was manually triggered and there were no errors + // Since failover was manually triggered and there were no errors // previous endpoint CB should be still in CLOSED assertThat(provider.getCluster(1).getCircuitBreaker().getState(), - equalTo(CircuitBreaker.State.CLOSED)); + equalTo(CircuitBreaker.State.CLOSED)); jedis1.rpush("test-list", "somevalue"); @@ -266,7 +266,7 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws // Check that the circuit breaker for Endpoint 1 is open after the error assertThat(provider.getCluster(1).getCircuitBreaker().getState(), - equalTo(CircuitBreaker.State.OPEN)); + equalTo(CircuitBreaker.State.OPEN)); // Ensure that active cluster is still Endpoint 2 assertThat(getNodeId(failoverClient.info("server")), equalTo(JEDIS2_ID));