Skip to content

[automatic-failover] Enforce formatting for automatic failover codebase #4176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/test-on-docker.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---

name: Build and Test using containerized environment
name: Build and Test using a containerized environment

on:
push:
Expand All @@ -11,10 +11,12 @@ on:
branches:
- master
- '[0-9].*'
- 'feature/**'
pull_request:
branches:
- master
- '[0-9].*'
- 'feature/**'
schedule:
- cron: '0 1 * * *' # nightly build
workflow_dispatch:
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@
<configFile>${project.basedir}/hbase-formatter.xml</configFile>
<directories>
<directory>${project.basedir}/src/main/java/redis/clients/jedis/annots</directory>
<directory>${project.basedir}/src/main/java/redis/clients/jedis/mcf</directory>
<directory>${project.basedir}/src/test/java/redis/clients/jedis/failover</directory>
</directories>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,57 @@

/**
* @author Allen Terleto (aterleto)
* <p>
* CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database endpoint.
* With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
* <p>
* CommandExecutor with built-in retry, circuit-breaker, and failover to another
* cluster/database endpoint. With this executor users can seamlessly failover to Disaster
* Recovery (DR), Backup, and Active-Active cluster(s) by using simple configuration which
* is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
@Experimental
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase
implements CommandExecutor {

public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
super(provider);
}
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
super(provider);
}

@Override
public <T> T executeCommand(CommandObject<T> commandObject) {
Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety
@Override
public <T> T executeCommand(CommandObject<T> commandObject) {
Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety

DecorateSupplier<T> supplier = Decorators.ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster));
DecorateSupplier<T> supplier = Decorators
.ofSupplier(() -> this.handleExecuteCommand(commandObject, cluster));

supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(provider.getFallbackExceptionList(),
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(provider.getFallbackExceptionList(),
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));

return supplier.decorate().get();
}
return supplier.decorate().get();
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster cluster) {
try (Connection connection = cluster.getConnection()) {
return connection.executeCommand(commandObject);
}
/**
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster cluster) {
try (Connection connection = cluster.getConnection()) {
return connection.executeCommand(commandObject);
}
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
private <T> T handleClusterFailover(CommandObject<T> commandObject, CircuitBreaker circuitBreaker) {
/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
* failure scenarios
*/
private <T> T handleClusterFailover(CommandObject<T> commandObject,
CircuitBreaker circuitBreaker) {

clusterFailover(circuitBreaker);
clusterFailover(circuitBreaker);

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return executeCommand(commandObject);
}
// Recursive call to the initiating method so the operation can be retried on the next cluster
// connection
return executeCommand(commandObject);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,59 +10,68 @@

/**
* @author Allen Terleto (aterleto)
* <p>
* Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database
* endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active
* cluster(s) by using simple configuration which is passed through from
* Resilience4j - https://resilience4j.readme.io/docs
* <p>
* <p>
* Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to
* another cluster/database endpoint. With this executor users can seamlessly failover to
* Disaster Recovery (DR), Backup, and Active-Active cluster(s) by using simple
* configuration which is passed through from Resilience4j -
* https://resilience4j.readme.io/docs
* <p>
*/
@Experimental
public class CircuitBreakerFailoverBase implements AutoCloseable {
private final Lock lock = new ReentrantLock(true);
private final Lock lock = new ReentrantLock(true);

protected final MultiClusterPooledConnectionProvider provider;
protected final MultiClusterPooledConnectionProvider provider;

public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
}
public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
}

@Override
public void close() {
IOUtils.closeQuietly(this.provider);
}
@Override
public void close() {
IOUtils.closeQuietly(this.provider);
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
* failure scenarios
*/
protected void clusterFailover(CircuitBreaker circuitBreaker) {
lock.lock();

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
protected void clusterFailover(CircuitBreaker circuitBreaker) {
lock.lock();

try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is
// non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();
// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and
// event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
// Incrementing the activeMultiClusterIndex will allow subsequent calls to the
// executeCommand()
// to use the next cluster's connection pool - according to the configuration's
// prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}
// Implementation is optionally provided during configuration. Typically, used for
// activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
} finally {
lock.unlock();
}
// Once the priority list is exhausted only a manual failback can open the circuit breaker so
// all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException(
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
} finally {
lock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,52 @@
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;

/**
* ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database endpoint.
* With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database
* endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and
* Active-Active cluster(s) by using simple configuration which is passed through from Resilience4j
* - https://resilience4j.readme.io/docs
*/
@Experimental
public class CircuitBreakerFailoverConnectionProvider extends CircuitBreakerFailoverBase {

public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) {
super(provider);
}

public Connection getConnection() {
Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety

DecorateSupplier<Connection> supplier = Decorators.ofSupplier(() -> this.handleGetConnection(cluster));

supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(provider.getFallbackExceptionList(),
e -> this.handleClusterFailover(cluster.getCircuitBreaker()));

return supplier.decorate().get();
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private Connection handleGetConnection(Cluster cluster) {
Connection connection = cluster.getConnection();
connection.ping();
return connection;
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {

clusterFailover(circuitBreaker);

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return getConnection();
}
public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) {
super(provider);
}

public Connection getConnection() {
Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety

DecorateSupplier<Connection> supplier = Decorators
.ofSupplier(() -> this.handleGetConnection(cluster));

supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(provider.getFallbackExceptionList(),
e -> this.handleClusterFailover(cluster.getCircuitBreaker()));

return supplier.decorate().get();
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private Connection handleGetConnection(Cluster cluster) {
Connection connection = cluster.getConnection();
connection.ping();
return connection;
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
* failure scenarios
*/
private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {

clusterFailover(circuitBreaker);

// Recursive call to the initiating method so the operation can be retried on the next cluster
// connection
return getConnection();
}

}
11 changes: 7 additions & 4 deletions src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

/**
* This is high memory dependent solution as all the appending commands will be hold in memory until
* {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called.
* {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets
* called.
*/
@Experimental
public class MultiClusterPipeline extends PipelineBase implements Closeable {
Expand All @@ -32,7 +33,8 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider)
}
}

public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) {
public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider,
CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
}
Expand All @@ -52,8 +54,9 @@ public void close() {
}

/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values
* from pipelined commands, capture the different Response&lt;?&gt; of the commands you execute.
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
* get return values from pipelined commands, capture the different Response&lt;?&gt; of the
* commands you execute.
*/
@Override
public void sync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class MultiClusterTransaction extends TransactionBase {

private static final Builder<?> NO_OP_BUILDER = BuilderFactory.RAW_OBJECT;

private static final String GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE = "Graph commands are not supported.";

private final CircuitBreakerFailoverConnectionProvider failoverProvider;
Expand All @@ -47,7 +47,6 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
Expand All @@ -66,12 +65,12 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param commandObjects command objects
*/
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) {
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti,
CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);

Expand Down Expand Up @@ -169,7 +168,7 @@ public final List<Object> exec() {
}

List<Object> formatted = new ArrayList<>(unformatted.size() - extraCommandCount.get());
for (Object rawReply: unformatted) {
for (Object rawReply : unformatted) {
try {
Response<?> response = commands.poll().getValue();
response.set(rawReply);
Expand Down
Loading
Loading