diff --git a/.github/workflows/test-on-docker.yml b/.github/workflows/test-on-docker.yml
index de4dc71d06..da45bd0504 100644
--- a/.github/workflows/test-on-docker.yml
+++ b/.github/workflows/test-on-docker.yml
@@ -1,6 +1,6 @@
---
-name: Build and Test using containerized environment
+name: Build and Test using a containerized environment
on:
push:
@@ -11,10 +11,12 @@ on:
branches:
- master
- '[0-9].*'
+ - 'feature/**'
pull_request:
branches:
- master
- '[0-9].*'
+ - 'feature/**'
schedule:
- cron: '0 1 * * *' # nightly build
workflow_dispatch:
diff --git a/pom.xml b/pom.xml
index b5f4083e82..5dc119ef0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -334,6 +334,8 @@
${project.basedir}/hbase-formatter.xml${project.basedir}/src/main/java/redis/clients/jedis/annots
+ ${project.basedir}/src/main/java/redis/clients/jedis/mcf
+ ${project.basedir}/src/test/java/redis/clients/jedis/failover
diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java
index 5d54b67d17..16b8fd7efb 100644
--- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java
+++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java
@@ -13,51 +13,57 @@
/**
* @author Allen Terleto (aterleto)
- *
- * CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database endpoint.
- * With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
- * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
- *
+ *
+ * CommandExecutor with built-in retry, circuit-breaker, and failover to another
+ * cluster/database endpoint. With this executor users can seamlessly failover to Disaster
+ * Recovery (DR), Backup, and Active-Active cluster(s) by using simple configuration which
+ * is passed through from Resilience4j - https://resilience4j.readme.io/docs
+ *
*/
@Experimental
-public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {
+public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase
+ implements CommandExecutor {
- public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
- super(provider);
- }
+ public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
+ super(provider);
+ }
- @Override
- public T executeCommand(CommandObject commandObject) {
- Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety
+ @Override
+ public T executeCommand(CommandObject commandObject) {
+ Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety
- DecorateSupplier supplier = Decorators.ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster));
+ DecorateSupplier supplier = Decorators
+ .ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster));
- supplier.withRetry(cluster.getRetry());
- supplier.withCircuitBreaker(cluster.getCircuitBreaker());
- supplier.withFallback(provider.getFallbackExceptionList(),
- e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
+ supplier.withRetry(cluster.getRetry());
+ supplier.withCircuitBreaker(cluster.getCircuitBreaker());
+ supplier.withFallback(provider.getFallbackExceptionList(),
+ e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
- return supplier.decorate().get();
- }
+ return supplier.decorate().get();
+ }
- /**
- * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
- */
- private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) {
- try (Connection connection = cluster.getConnection()) {
- return connection.executeCommand(commandObject);
- }
+ /**
+ * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
+ */
+ private T handleExecuteCommand(CommandObject commandObject, Cluster cluster) {
+ try (Connection connection = cluster.getConnection()) {
+ return connection.executeCommand(commandObject);
}
+ }
- /**
- * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
- */
- private T handleClusterFailover(CommandObject commandObject, CircuitBreaker circuitBreaker) {
+ /**
+ * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
+ * failure scenarios
+ */
+ private T handleClusterFailover(CommandObject commandObject,
+ CircuitBreaker circuitBreaker) {
- clusterFailover(circuitBreaker);
+ clusterFailover(circuitBreaker);
- // Recursive call to the initiating method so the operation can be retried on the next cluster connection
- return executeCommand(commandObject);
- }
+ // Recursive call to the initiating method so the operation can be retried on the next cluster
+ // connection
+ return executeCommand(commandObject);
+ }
}
\ No newline at end of file
diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java
index 91ad29d9a0..50e8e827be 100644
--- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java
+++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java
@@ -10,59 +10,68 @@
/**
* @author Allen Terleto (aterleto)
- *
- * Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database
- * endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active
- * cluster(s) by using simple configuration which is passed through from
- * Resilience4j - https://resilience4j.readme.io/docs
- *
+ *
+ * Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to
+ * another cluster/database endpoint. With this executor users can seamlessly failover to
+ * Disaster Recovery (DR), Backup, and Active-Active cluster(s) by using simple
+ * configuration which is passed through from Resilience4j -
+ * https://resilience4j.readme.io/docs
+ *
*/
@Experimental
public class CircuitBreakerFailoverBase implements AutoCloseable {
- private final Lock lock = new ReentrantLock(true);
+ private final Lock lock = new ReentrantLock(true);
- protected final MultiClusterPooledConnectionProvider provider;
+ protected final MultiClusterPooledConnectionProvider provider;
- public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) {
- this.provider = provider;
- }
+ public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) {
+ this.provider = provider;
+ }
- @Override
- public void close() {
- IOUtils.closeQuietly(this.provider);
- }
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(this.provider);
+ }
+
+ /**
+ * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
+ * failure scenarios
+ */
+ protected void clusterFailover(CircuitBreaker circuitBreaker) {
+ lock.lock();
- /**
- * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
- */
- protected void clusterFailover(CircuitBreaker circuitBreaker) {
- lock.lock();
-
- try {
- // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
- if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
+ try {
+ // Check state to handle race conditions since incrementActiveMultiClusterIndex() is
+ // non-idempotent
+ if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
- // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
- // To recover/transition from this forced state the user will need to manually failback
- circuitBreaker.transitionToForcedOpenState();
+ // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and
+ // event publishing.
+ // To recover/transition from this forced state the user will need to manually failback
+ circuitBreaker.transitionToForcedOpenState();
- // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
- // to use the next cluster's connection pool - according to the configuration's prioritization/order
- int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
+ // Incrementing the activeMultiClusterIndex will allow subsequent calls to the
+ // executeCommand()
+ // to use the next cluster's connection pool - according to the configuration's
+ // prioritization/order
+ int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
- // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
- provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
- }
+ // Implementation is optionally provided during configuration. Typically, used for
+ // activeMultiClusterIndex persistence or custom logging
+ provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
+ }
- // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
- else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
- throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
- "provided with an additional cluster/database endpoint according to its prioritized sequence. " +
- "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
- }
- } finally {
- lock.unlock();
- }
+ // Once the priority list is exhausted only a manual failback can open the circuit breaker so
+ // all subsequent operations will fail
+ else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
+ throw new JedisConnectionException(
+ "Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
+ + "provided with an additional cluster/database endpoint according to its prioritized sequence. "
+ + "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
+ }
+ } finally {
+ lock.unlock();
}
+ }
}
diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java
index cd0e91015a..91e2ee772d 100644
--- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java
+++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java
@@ -10,48 +10,52 @@
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
/**
- * ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database endpoint.
- * With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
- * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
+ * ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database
+ * endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and
+ * Active-Active cluster(s) by using simple configuration which is passed through from Resilience4j
+ * - https://resilience4j.readme.io/docs
*/
@Experimental
public class CircuitBreakerFailoverConnectionProvider extends CircuitBreakerFailoverBase {
- public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) {
- super(provider);
- }
-
- public Connection getConnection() {
- Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety
-
- DecorateSupplier supplier = Decorators.ofSupplier(() -> this.handleGetConnection(cluster));
-
- supplier.withRetry(cluster.getRetry());
- supplier.withCircuitBreaker(cluster.getCircuitBreaker());
- supplier.withFallback(provider.getFallbackExceptionList(),
- e -> this.handleClusterFailover(cluster.getCircuitBreaker()));
-
- return supplier.decorate().get();
- }
-
- /**
- * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
- */
- private Connection handleGetConnection(Cluster cluster) {
- Connection connection = cluster.getConnection();
- connection.ping();
- return connection;
- }
-
- /**
- * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
- */
- private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {
-
- clusterFailover(circuitBreaker);
-
- // Recursive call to the initiating method so the operation can be retried on the next cluster connection
- return getConnection();
- }
+ public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) {
+ super(provider);
+ }
+
+ public Connection getConnection() {
+ Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety
+
+ DecorateSupplier supplier = Decorators
+ .ofSupplier(() -> this.handleGetConnection(cluster));
+
+ supplier.withRetry(cluster.getRetry());
+ supplier.withCircuitBreaker(cluster.getCircuitBreaker());
+ supplier.withFallback(provider.getFallbackExceptionList(),
+ e -> this.handleClusterFailover(cluster.getCircuitBreaker()));
+
+ return supplier.decorate().get();
+ }
+
+ /**
+ * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
+ */
+ private Connection handleGetConnection(Cluster cluster) {
+ Connection connection = cluster.getConnection();
+ connection.ping();
+ return connection;
+ }
+
+ /**
+ * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
+ * failure scenarios
+ */
+ private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {
+
+ clusterFailover(circuitBreaker);
+
+ // Recursive call to the initiating method so the operation can be retried on the next cluster
+ // connection
+ return getConnection();
+ }
}
\ No newline at end of file
diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java
index 2f21700b8f..25791f67b6 100644
--- a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java
+++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java
@@ -12,7 +12,8 @@
/**
* This is high memory dependent solution as all the appending commands will be hold in memory until
- * {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called.
+ * {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets
+ * called.
*/
@Experimental
public class MultiClusterPipeline extends PipelineBase implements Closeable {
@@ -32,7 +33,8 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider)
}
}
- public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) {
+ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider,
+ CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
}
@@ -52,8 +54,9 @@ public void close() {
}
/**
- * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values
- * from pipelined commands, capture the different Response<?> of the commands you execute.
+ * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
+ * get return values from pipelined commands, capture the different Response<?> of the
+ * commands you execute.
*/
@Override
public void sync() {
diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java
index f4ca7a627f..51f8c49c89 100644
--- a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java
+++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java
@@ -24,7 +24,7 @@
public class MultiClusterTransaction extends TransactionBase {
private static final Builder> NO_OP_BUILDER = BuilderFactory.RAW_OBJECT;
-
+
private static final String GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE = "Graph commands are not supported.";
private final CircuitBreakerFailoverConnectionProvider failoverProvider;
@@ -47,7 +47,6 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
- *
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
@@ -66,12 +65,12 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
- *
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param commandObjects command objects
*/
- public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) {
+ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti,
+ CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);
@@ -169,7 +168,7 @@ public final List