diff --git a/vertx-pg-client/src/main/generated/io/vertx/pgclient/PgConnectOptionsConverter.java b/vertx-pg-client/src/main/generated/io/vertx/pgclient/PgConnectOptionsConverter.java index 2f7b5ffbb..6a1bf27d0 100644 --- a/vertx-pg-client/src/main/generated/io/vertx/pgclient/PgConnectOptionsConverter.java +++ b/vertx-pg-client/src/main/generated/io/vertx/pgclient/PgConnectOptionsConverter.java @@ -25,6 +25,11 @@ public static void fromJson(Iterable> json, obj.setPipeliningLimit(((Number)member.getValue()).intValue()); } break; + case "shouldQueryServerType": + if (member.getValue() instanceof Boolean) { + obj.setShouldQueryServerType((Boolean)member.getValue()); + } + break; case "sslMode": if (member.getValue() instanceof String) { obj.setSslMode(io.vertx.pgclient.SslMode.valueOf((String)member.getValue())); @@ -42,6 +47,7 @@ public static void toJson(PgConnectOptions obj, JsonObject json) { public static void toJson(PgConnectOptions obj, java.util.Map json) { json.put("pipeliningLimit", obj.getPipeliningLimit()); + json.put("shouldQueryServerType", obj.getShouldQueryServerType()); if (obj.getSslMode() != null) { json.put("sslMode", obj.getSslMode().name()); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java index 649704824..99fa350f7 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java @@ -111,6 +111,7 @@ public static PgConnectOptions fromEnv() { public static final String DEFAULT_PASSWORD = "pass"; public static final int DEFAULT_PIPELINING_LIMIT = 256; public static final SslMode DEFAULT_SSLMODE = SslMode.DISABLE; + public static final boolean DEFAULT_SHOULD_QUERY_SERVER_TYPE = false; public static final Map DEFAULT_PROPERTIES; static { @@ -123,6 +124,7 @@ public static PgConnectOptions fromEnv() { } private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT; + private boolean shouldQueryServerType = DEFAULT_SHOULD_QUERY_SERVER_TYPE; private SslMode sslMode = DEFAULT_SSLMODE; public PgConnectOptions() { @@ -224,6 +226,13 @@ public SslMode getSslMode() { return sslMode; } + /** + * @return the value of current shouldQueryServerType + */ + public boolean getShouldQueryServerType() { + return shouldQueryServerType; + } + /** * Set {@link SslMode} for the client, this option can be used to provide different levels of secure protection. * @@ -235,6 +244,19 @@ public PgConnectOptions setSslMode(SslMode sslmode) { return this; } + /** + * Set whether the client should query server type, + * If true, connection should issue implementation specific query + * to read {@link io.vertx.sqlclient.ServerType} of host being connected to + * + * @param shouldQueryServerType the value of shouldQueryServerType + * @return a reference to this, so the API can be used fluently + */ + public PgConnectOptions setShouldQueryServerType(boolean shouldQueryServerType) { + this.shouldQueryServerType = shouldQueryServerType; + return this; + } + @Override public PgConnectOptions setSendBufferSize(int sendBufferSize) { return (PgConnectOptions)super.setSendBufferSize(sendBufferSize); @@ -464,6 +486,7 @@ protected void init() { this.setUser(DEFAULT_USER); this.setPassword(DEFAULT_PASSWORD); this.setDatabase(DEFAULT_DATABASE); + this.setShouldQueryServerType(DEFAULT_SHOULD_QUERY_SERVER_TYPE); this.setProperties(new HashMap<>(DEFAULT_PROPERTIES)); } @@ -493,6 +516,7 @@ public boolean equals(Object o) { if (pipeliningLimit != that.pipeliningLimit) return false; if (sslMode != that.sslMode) return false; + if (shouldQueryServerType != that.shouldQueryServerType) return false; return true; } @@ -502,6 +526,7 @@ public int hashCode() { int result = super.hashCode(); result = 31 * result + pipeliningLimit; result = 31 * result + sslMode.hashCode(); + result = 31 * result + (shouldQueryServerType ? 1 : 0); return result; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index 1b684fa3c..ddbbd4a78 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -19,6 +19,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.pgclient.impl.PgConnectionImpl; +import io.vertx.sqlclient.ServerType; import io.vertx.sqlclient.PreparedStatement; import io.vertx.sqlclient.SqlConnection; import io.vertx.codegen.annotations.Fluent; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java index 17fa28afb..2fef0ec05 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java @@ -32,12 +32,18 @@ import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.SslMode; +import io.vertx.sqlclient.ServerType; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.ConnectionFactoryBase; import io.vertx.sqlclient.impl.tracing.QueryTracer; +import java.util.Collections; +import java.util.stream.Stream; + +import static io.vertx.sqlclient.ServerType.*; + /** * @author Julien Viet */ @@ -45,6 +51,8 @@ public class PgConnectionFactory extends ConnectionFactoryBase { private SslMode sslMode; private int pipeliningLimit; + private boolean shouldQueryServerType; + private ServerType serverType = UNDEFINED; public PgConnectionFactory(VertxInternal context, PgConnectOptions options) { super(context, options); @@ -55,6 +63,7 @@ protected void initializeConfiguration(SqlConnectOptions connectOptions) { PgConnectOptions options = (PgConnectOptions) connectOptions; this.pipeliningLimit = options.getPipeliningLimit(); this.sslMode = options.isUsingDomainSocket() ? SslMode.DISABLE : options.getSslMode(); + this.shouldQueryServerType = options.getShouldQueryServerType(); // check ssl mode here switch (sslMode) { @@ -151,16 +160,35 @@ public Future connect(Context context) { ContextInternal contextInternal = (ContextInternal) context; PromiseInternal promise = contextInternal.promise(); connect(asEventLoopContext(contextInternal)) - .map(conn -> { + .flatMap(conn -> { QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options); PgConnectionImpl pgConn = new PgConnectionImpl(this, contextInternal, conn, tracer, null); conn.init(pgConn); - return (SqlConnection)pgConn; + serverType = ((PgSocketConnection) conn).serverType; + if (serverType == UNDEFINED && shouldQueryServerType) { + String paramStatus = conn.getDatabaseMetaData().majorVersion() >= 14 ? "in_hot_standby" : "transaction_read_only"; + return pgConn.query(String.format("SHOW %s;", paramStatus)).execute().map(pgRowSet -> { + pgRowSet.forEach(row -> + serverType = "off".equalsIgnoreCase(row.getString(paramStatus)) ? PRIMARY : REPLICA + ); + return pgConn; + }); + } else { + return Future.succeededFuture((SqlConnection) pgConn); + } }) .onComplete(promise); return promise.future(); } + /** + * {@inheritDoc} + */ + @Override + public ServerType getServerType() { + return serverType; + } + private PgSocketConnection newSocketConnection(EventLoopContext context, NetSocketInternal socket) { return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 5d9e1b6c2..1e6b2de5c 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -21,6 +21,7 @@ import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgNotification; +import io.vertx.sqlclient.ServerType; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; import io.vertx.sqlclient.impl.SqlConnectionImpl; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java index 4db91c730..dff572ae0 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java @@ -28,6 +28,7 @@ import io.vertx.pgclient.*; import io.vertx.pgclient.spi.PgDriver; import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.ServerRequirement; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.impl.Connection; @@ -71,8 +72,12 @@ public static PgPoolImpl create(final VertxInternal vertx, boolean pipelined, Li int pipeliningLimit = pipelined ? baseConnectOptions.getPipeliningLimit() : 1; PgPoolImpl pool = new PgPoolImpl(vx, baseConnectOptions, tracer, metrics, pipeliningLimit, poolOptions); PgDriver driver = new PgDriver(); - List lst = servers.stream().map(options -> driver.createConnectionFactory(vx, options)).collect(Collectors.toList()); - ConnectionFactory factory = ConnectionFactory.roundRobinSelector(lst); + List lst = servers.stream() + .map(options -> driver.createConnectionFactory(vx, PgConnectOptions.wrap(options) + .setShouldQueryServerType(poolOptions.getServerRequirement() != ServerRequirement.ANY)) + ) + .collect(Collectors.toList()); + ConnectionFactory factory = ConnectionFactory.selector(lst, poolOptions.getServerRequirement()); pool.connectionProvider(factory::connect); pool.init(); CloseFuture closeFuture = pool.closeFuture(); diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java index fbbd6d2e1..634ece21d 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java @@ -28,6 +28,7 @@ import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.pgclient.PgException; import io.vertx.pgclient.impl.codec.PgCodec; +import io.vertx.sqlclient.ServerType; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.sqlclient.impl.*; import io.vertx.sqlclient.impl.command.*; @@ -36,6 +37,8 @@ import java.util.Map; import java.util.function.Predicate; +import static io.vertx.sqlclient.ServerType.UNDEFINED; + /** * @author Julien Viet */ @@ -45,6 +48,8 @@ public class PgSocketConnection extends SocketConnectionBase { public int processId; public int secretKey; public PgDatabaseMetadata dbMetaData; + // TODO: consider defining getter on SocketConnectionBase + public ServerType serverType = UNDEFINED; public PgSocketConnection(NetSocketInternal socket, boolean cachePreparedStatements, diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java index 522d9b38d..ff9b803ac 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java @@ -27,6 +27,9 @@ import io.vertx.sqlclient.impl.command.CommandResponse; import io.vertx.sqlclient.impl.command.InitCommand; +import static io.vertx.sqlclient.ServerType.PRIMARY; +import static io.vertx.sqlclient.ServerType.REPLICA; + class InitCommandCodec extends PgCommandCodec { private PgEncoder encoder; @@ -87,6 +90,9 @@ public void handleParameterStatus(String key, String value) { if(key.equals("server_version")) { ((PgSocketConnection)cmd.connection()).dbMetaData = new PgDatabaseMetadata(value); } + if(key.equals("in_hot_standby")) { + ((PgSocketConnection)cmd.connection()).serverType = "off".equalsIgnoreCase(value) ? PRIMARY : REPLICA; + } } @Override diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolWithRequirementTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolWithRequirementTest.java new file mode 100644 index 000000000..5021121aa --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolWithRequirementTest.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.pgclient; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.pgclient.junit.ContainerPgRule; +import io.vertx.sqlclient.PoolOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.HashSet; +import java.util.Set; + +import static io.vertx.sqlclient.ServerRequirement.*; + +@RunWith(VertxUnitRunner.class) +public class PgPoolWithRequirementTest { + + @ClassRule + public static ContainerPgRule pg14Rule = new ContainerPgRule().setPostgresVersion("14"); + @ClassRule + public static ContainerPgRule pg13Rule = new ContainerPgRule().setPostgresVersion("13"); + + protected PgConnectOptions pg14Options; + protected PgConnectOptions pg13Options; + + @Before + public void setup() throws Exception { + pg14Options = pg14Rule.options(); + pg13Options = pg13Rule.options(); + } + + private Set pools = new HashSet<>(); + + @After + public void tearDown(TestContext ctx) { + int size = pools.size(); + if (size > 0) { + Async async = ctx.async(size); + Set pools = this.pools; + this.pools = new HashSet<>(); + pools.forEach(pool -> { + pool.close(ar -> { + async.countDown(); + }); + }); + async.awaitSuccess(20_000); + } + } + + protected PgPool createPool(PgConnectOptions connectOptions, PoolOptions poolOptions) { + PgPool pool = PgPool.pool(Vertx.vertx(), connectOptions, poolOptions); + pools.add(pool); + return pool; + } + + @Test + public void testPrimaryRequirementForSinglePrimaryFactory14(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg14Options), new PoolOptions().setServerRequirement(PRIMARY)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } + + @Test + public void testPrimaryRequirementForSinglePrimaryFactory13(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg13Options), new PoolOptions().setServerRequirement(PRIMARY)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } + + @Test + public void testExplicitAnyRequirementForSinglePrimaryFactory14(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg14Options), new PoolOptions().setServerRequirement(ANY)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } + + @Test + public void testExplicitAnyRequirementForSinglePrimaryFactory13(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg13Options), new PoolOptions().setServerRequirement(ANY)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } + + @Test + public void testReplicaRequirementForSinglePrimaryFactory14(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg14Options), new PoolOptions().setServerRequirement(REPLICA)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertFailure(v -> { + ctx.assertEquals("No suitable server of type REPLICA was found", v.getMessage()); + async.complete(); + })); + async.await(4000); + } + + @Test + public void testReplicaRequirementForSinglePrimaryFactory13(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg13Options), new PoolOptions().setServerRequirement(REPLICA)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertFailure(v -> { + ctx.assertEquals("No suitable server of type REPLICA was found", v.getMessage()); + async.complete(); + })); + async.await(4000); + } + + @Test + public void testExplicitPreferReplicaRequirementForSinglePrimaryFactory14(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg14Options), new PoolOptions().setServerRequirement(PREFER_REPLICA)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } + + @Test + public void testExplicitPreferReplicaRequirementForSinglePrimaryFactory13(TestContext ctx) { + Async async = ctx.async(); + PgPool pool = createPool(new PgConnectOptions(pg13Options), new PoolOptions().setServerRequirement(PREFER_REPLICA)); + pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> { + async.complete(); + })); + async.await(4000); + } +} diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/junit/ContainerPgRule.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/junit/ContainerPgRule.java index e1e97210d..f5993624f 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/junit/ContainerPgRule.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/junit/ContainerPgRule.java @@ -55,6 +55,11 @@ public ContainerPgRule ssl(boolean ssl) { return this; } + public ContainerPgRule setPostgresVersion(String databaseVersion) { + this.databaseVersion = databaseVersion; + return this; + } + public PgConnectOptions options() { return new PgConnectOptions(options); } @@ -166,7 +171,9 @@ protected void before() throws Throwable { return; } - this.databaseVersion = getPostgresVersion(); + if (this.databaseVersion == null) { + this.databaseVersion = getPostgresVersion(); + } options = startServer(databaseVersion); } diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index 44272cbd3..f355482a2 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -55,6 +55,11 @@ public static void fromJson(Iterable> json, obj.setPoolCleanerPeriod(((Number)member.getValue()).intValue()); } break; + case "serverRequirement": + if (member.getValue() instanceof String) { + obj.setServerRequirement(io.vertx.sqlclient.ServerRequirement.valueOf((String)member.getValue())); + } + break; } } } @@ -75,5 +80,8 @@ public static void toJson(PoolOptions obj, java.util.Map json) { json.put("maxSize", obj.getMaxSize()); json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize()); json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod()); + if (obj.getServerRequirement() != null) { + json.put("serverRequirement", obj.getServerRequirement().name()); + } } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index ece6b3bfc..3ade5c31c 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -69,6 +69,11 @@ public class PoolOptions { */ public static final TimeUnit DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS; + /** + * Default server requirement in the pool + */ + public static final ServerRequirement DEFAULT_SERVER_REQUIREMENT = ServerRequirement.ANY; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -76,6 +81,7 @@ public class PoolOptions { private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD; private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT; + private ServerRequirement serverRequirement = DEFAULT_SERVER_REQUIREMENT; public PoolOptions() { } @@ -89,6 +95,7 @@ public PoolOptions(PoolOptions other) { maxWaitQueueSize = other.maxWaitQueueSize; idleTimeout = other.idleTimeout; idleTimeoutUnit = other.idleTimeoutUnit; + serverRequirement = other.serverRequirement; } /** @@ -215,7 +222,7 @@ public int getConnectionTimeout() { * Set the amount of time a client will wait for a connection from the pool. If the time is exceeded * without a connection available, an exception is provided. * - * @param timeout the pool connection idle time unitq + * @param timeout the pool connection idle time unit * @return a reference to this, so the API can be used fluently */ public PoolOptions setConnectionTimeout(int timeout) { @@ -223,6 +230,24 @@ public PoolOptions setConnectionTimeout(int timeout) { return this; } + /** + * @return the pool connection server requirement. See {@link #setServerRequirement(ServerRequirement)} + */ + public ServerRequirement getServerRequirement() { + return serverRequirement; + } + + /** + * Set the server requirement + * + * @param serverRequirement the pool connection server requirement + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setServerRequirement(ServerRequirement serverRequirement) { + this.serverRequirement = serverRequirement; + return this; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); PoolOptionsConverter.toJson(this, json); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerRequirement.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerRequirement.java new file mode 100644 index 000000000..c1cc784c8 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerRequirement.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.sqlclient; + +import io.vertx.core.Vertx; +import io.vertx.sqlclient.spi.Driver; + +import java.util.List; + +/** + * This option determines whether the session must have certain properties to be acceptable. + * It's typically used in combination with {@link Driver#createPool(Vertx, List, PoolOptions)} overload + * to select the first acceptable alternative among several hosts. + */ +public enum ServerRequirement { + /** + * Any successful connection is acceptable (default) + */ + ANY, + /** + * Server must not be in hot standby mode, usually but not necessary such server allows read-write connections + */ + PRIMARY, + /** + * Server must be in hot standby mode, only read-only connections are allowed + */ + REPLICA, + /** + * First try to find a standby server, but if none of the listed hosts is a standby server, try again in any mode + */ + PREFER_REPLICA; +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerType.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerType.java new file mode 100644 index 000000000..5bfb3efd8 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/ServerType.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.sqlclient; + +/** + * Indicates a particular property of a session + */ +public enum ServerType { + /** + * No certain properties are known about server yet (default) + */ + UNDEFINED, + /** + * Server is in hot standby mode, usually but not necessary such server allows read-write connections + */ + PRIMARY, + /** + * Server is in hot standby mode, only read-only connections are allowed + */ + REPLICA +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 1492f9d29..074930cc6 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -116,7 +116,7 @@ public void connect(EventLoopContext context, PoolConnector.Listener listener, H Future future = connectionProvider.apply(context); future.onComplete(ar -> { if (ar.succeeded()) { - SqlConnectionImpl res = (SqlConnectionImpl) ar.result(); + SqlConnectionImpl res = (SqlConnectionImpl) ar.result(); Connection conn = res.unwrap(); if (conn.isValid()) { PooledConnection pooled = new PooledConnection(res.factory(), conn, listener); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java index de84b0279..6bc0ace24 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java @@ -1,20 +1,31 @@ package io.vertx.sqlclient.spi; -import io.vertx.core.Closeable; -import io.vertx.core.CompositeFuture; -import io.vertx.core.Context; -import io.vertx.core.Future; -import io.vertx.core.Promise; +import io.vertx.core.*; +import io.vertx.sqlclient.ServerRequirement; +import io.vertx.sqlclient.ServerType; import io.vertx.sqlclient.SqlConnection; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * A connection factory, can be obtained from {@link Driver#createConnectionFactory} */ public interface ConnectionFactory extends Closeable { + default void close(Promise promise, List factories) { + List list = new ArrayList<>(factories.size()); + for (ConnectionFactory factory : factories) { + Promise p = Promise.promise(); + factory.close(p); + list.add(p.future()); + } + CompositeFuture.all(list) + .mapEmpty() + .onComplete(promise); + } + /** * @return a connection factory that connects with a round-robin policy */ @@ -32,20 +43,149 @@ public Future connect(Context context) { } @Override public void close(Promise promise) { - List list = new ArrayList<>(factories.size()); - for (ConnectionFactory factory : factories) { - Promise p = Promise.promise(); - factory.close(p); - list.add(p.future()); - } - CompositeFuture.all(list) - .mapEmpty() - .onComplete(promise); + close(promise, factories); } }; } } + /** + * @return a connection factory that connects with respect to server requirement + */ + static ConnectionFactory selector(List factories, ServerRequirement serverRequirement) { + switch (serverRequirement) { + case ANY: + return roundRobinSelector(factories); + case PRIMARY: + return constrainedSelector(factories, ServerType.PRIMARY); + case REPLICA: + return constrainedSelector(factories, ServerType.REPLICA); + case PREFER_REPLICA: + return prioritySelector(factories, ServerType.REPLICA); + default: + throw new IllegalStateException("Unknown server requirement: " + serverRequirement); + } + } + + /** + * {@link ServerType} of a host to which particular factory opens connections is usually not known + * upon factory construction. First opportunity to reveal such server property is available when + * first {@link io.vertx.sqlclient.impl.SocketConnectionBase} is made. For instance Postgres 14 and newer + * has dedicated `GUC_REPORT` mechanism that reports a property `in_hot_standby` when connection being established. + * If there is no server-sent status mechanism available implementation specific `SHOW xxx` command could be used. + * + * If connection was established and its server type appeared to be mot compatible with requested one + * then the connection is closed right away and next factory is probed. If all factories are probed and no + * connection with desired server type found, IllegalStateException is thrown. + * + * @param factories list of factories to filter based on server type requirement + * @param serverType requirement for the host being connected through a factory + * @return a connection factory that load balances connections + * to a subset of hosts which satisfying requested server type + * @throws IllegalStateException when no suitable connection could be established + */ + static ConnectionFactory constrainedSelector(List factories, ServerType serverType) { + return new ConnectionFactory() { + private int idx = 0; + + @Override + public Future connect(Context context) { + int oldIdx = idx; + idx = (idx + 1) % factories.size(); + return connectingRound(context, oldIdx, factories.size()); + } + + private Future connectingRound(Context context, int roundIdx, int attemptsLeft) { + if (attemptsLeft == 0) { + throw new IllegalStateException(String.format("No suitable server of type %s was found", serverType)); + } + ConnectionFactory f = factories.get(roundIdx); + if (f.getServerType() == serverType) { + return f.connect(context); + } else if (f.getServerType() == ServerType.UNDEFINED) { + return f.connect(context).flatMap(conn -> { + if (f.getServerType() == serverType) { + return Future.succeededFuture(conn); + } else { + return conn.close().flatMap(__ -> + connectingRound(context, (roundIdx + 1) % factories.size(), attemptsLeft - 1) + ); + } + }); + } else { + return connectingRound(context, (roundIdx + 1) % factories.size(), attemptsLeft - 1); + } + } + + @Override + public void close(Promise promise) { + close(promise, factories); + } + }; + } + + /** + * {@link ServerType} of a host to which particular factory opens connections is usually not known + * upon factory construction. First opportunity to reveal such server property is available when + * first {@link io.vertx.sqlclient.impl.SocketConnectionBase} is made. For instance Postgres 14 and newer + * has dedicated `GUC_REPORT` mechanism that reports a property `in_hot_standby` when connection being established. + * If there is no server-sent status mechanism available implementation specific `SHOW xxx` command could be used. + * + * Upon first connection to host its server type is examinated and corresponding factory is marked either + * prioritized one or fallback. Connection factory is returned no matter of underlying host type. + * After all factories were probed, subsequent requests are optimistically fulfilled by the subset of prioritized + * factories. If prioritized factory could not establish the connection, fallback is made to a "fallback" factory. + * If a "fallback" factory fails as well, next pair of factories is chosen in round-robin manner. + * + * @param factories list of factories to filter based on server type requirement + * @param serverType preferred property of the host being connected through a factory + * @return a connection factory that load balances connections + * to a subset of hosts with bias towards requested server type + */ + static ConnectionFactory prioritySelector(List factories, ServerType serverType) { + return new ConnectionFactory() { + private int idx = 0; + private CopyOnWriteArrayList prioritized = new CopyOnWriteArrayList<>(); + private CopyOnWriteArrayList fallback = new CopyOnWriteArrayList<>(); + + @Override + public Future connect(Context context) { + if (prioritized.size() + fallback.size() != factories.size()) { + return connectAndPrioritize(context); + } else { + return connectByPriority(context); + } + } + + private Future connectAndPrioritize(Context context) { + ConnectionFactory f = factories.get(idx); + idx = (idx + 1) % factories.size(); + if (f.getServerType() == serverType) { + prioritized.addIfAbsent(f); + return f.connect(context).recover(__ -> connect(context)); + } else if (f.getServerType() == ServerType.UNDEFINED) { + // this factory will be added to prioritized/fallback during some later invocation of this method + return f.connect(context).recover(__ -> connect(context)); + } else { + fallback.addIfAbsent(f); + return f.connect(context).recover(__ -> connect(context)); + } + } + + private Future connectByPriority(Context context) { + ConnectionFactory pf = prioritized.get(idx % prioritized.size()); + ConnectionFactory ff = fallback.get(idx % fallback.size()); + idx = (idx + 1) % factories.size(); + return pf.connect(context).recover(__ -> ff.connect(context)).recover(__ -> connect(context)); + } + + @Override + public void close(Promise promise) { + close(promise, factories); + } + }; + } + /** * Create a connection using the given {@code context}. * @@ -54,4 +194,18 @@ public void close(Promise promise) { */ Future connect(Context context); + + /** + * Server type could be updated asynchronously: for instance, in Postgres case + * server type can be reported as ParamStatus message in response to explicit 'SHOW xxx' command + * as well as in response to Simple/Extended query if dedicated GUC_REPORT is supported. + * Given that, there could be situations when concurrent write to particular factory's serverType is happening + * when this method is being called or just after this method has returned. + * Users should be aware that serverType nature is dynamic. + * + * @return server type of host this factory connected to. + */ + default ServerType getServerType() { + return ServerType.UNDEFINED; + } } diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/spi/ConnectionFactoryTest.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/spi/ConnectionFactoryTest.java new file mode 100644 index 000000000..5328ac16b --- /dev/null +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/spi/ConnectionFactoryTest.java @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.sqlclient.spi; + +import io.vertx.core.*; +import io.vertx.sqlclient.*; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import static io.vertx.sqlclient.ServerType.*; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class ConnectionFactoryTest { + Context context = Vertx.vertx().getOrCreateContext(); + + static class DummySqlConnection implements SqlConnection { + + @Override + public Query> query(String sql) { + return null; + } + + @Override + public PreparedQuery> preparedQuery(String sql) { + return null; + } + + @Override + public PreparedQuery> preparedQuery(String sql, PrepareOptions options) { + return null; + } + + @Override + public Future close() { + return Future.succeededFuture(); + } + + @Override + public SqlConnection prepare(String sql, Handler> handler) { + return null; + } + + @Override + public Future prepare(String sql) { + return null; + } + + @Override + public SqlConnection prepare(String sql, PrepareOptions options, Handler> handler) { + return null; + } + + @Override + public Future prepare(String sql, PrepareOptions options) { + return null; + } + + @Override + public SqlConnection exceptionHandler(Handler handler) { + return null; + } + + @Override + public SqlConnection closeHandler(Handler handler) { + return null; + } + + @Override + public void begin(Handler> handler) { + + } + + @Override + public Future begin() { + return null; + } + + @Override + public boolean isSSL() { + return false; + } + + @Override + public void close(Handler> handler) { + + } + + @Override + public DatabaseMetadata databaseMetadata() { + return null; + } + } + + static class StaticFactory implements ConnectionFactory { + + public StaticFactory(ServerType serverType, SqlConnection connection) { + this(serverType, null, connection, false); + } + + public StaticFactory(ServerType serverType, SqlConnection connection, boolean failing) { + this(serverType, null, connection, failing); + } + + public StaticFactory(ServerType initialServerType, ServerType promotedServerType, SqlConnection connection) { + this(initialServerType, promotedServerType, connection, false); + } + + public StaticFactory(ServerType initialServerType, ServerType promotedServerType, SqlConnection connection, boolean failing) { + this.serverType = initialServerType; + this.promotedServerType = promotedServerType; + this.connection = connection; + this.failing = failing; + } + + private ServerType serverType; + private ServerType promotedServerType; + private final SqlConnection connection; + private final boolean failing; + + @Override + public ServerType getServerType() { + return serverType; + } + + @Override + public Future connect(Context context) { + if (promotedServerType != null) { + serverType = promotedServerType; + } + if (failing) { + return Future.failedFuture("unable to connect"); + } else { + return Future.succeededFuture(connection); + } + } + + @Override + public void close(Promise promise) {} + } + + @Test + public void testRoundRobinSelector() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + SqlConnection conn3 = new DummySqlConnection(); + ConnectionFactory factory = ConnectionFactory.roundRobinSelector( + Stream.of(conn1, conn2, conn3).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toList()) + ); + assertEquals(conn1, factory.connect(context).result()); + assertEquals(conn2, factory.connect(context).result()); + assertEquals(conn3, factory.connect(context).result()); + assertEquals(conn1, factory.connect(context).result()); + } + + @Test + public void testAnySelector() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + SqlConnection conn3 = new DummySqlConnection(); + ConnectionFactory factory = ConnectionFactory.selector( + Stream.of(conn1, conn2, conn3).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toList()), + ServerRequirement.ANY + ); + assertEquals(conn1, factory.connect(context).result()); + assertEquals(conn2, factory.connect(context).result()); + assertEquals(conn3, factory.connect(context).result()); + assertEquals(conn1, factory.connect(context).result()); + } + + @Test + public void testPrimarySelectorAmongUndefinedFactories() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1, conn2).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toCollection(() -> factories)); + SqlConnection primaryConn = new DummySqlConnection(); + factories.add(new StaticFactory(PRIMARY, primaryConn)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PRIMARY); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + } + + @Test + public void testPrimarySelectorAmongReplicaFactories() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1, conn2).map(conn -> new StaticFactory(REPLICA, conn)).collect(toCollection(() -> factories)); + SqlConnection primaryConn = new DummySqlConnection(); + factories.add(new StaticFactory(PRIMARY, primaryConn)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PRIMARY); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + } + + @Test + public void testPrimarySelectorAmongReplicaAndUndefinedFactories() { + SqlConnection conn1Replica = new DummySqlConnection(); + SqlConnection conn2Replica = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1Replica, conn2Replica).map(conn -> new StaticFactory(REPLICA, conn)).collect(toCollection(() -> factories)); + SqlConnection conn1Undefined = new DummySqlConnection(); + SqlConnection conn2Undefined = new DummySqlConnection(); + Stream.of(conn1Undefined, conn2Undefined).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toCollection(() -> factories)); + SqlConnection primaryConn = new DummySqlConnection(); + factories.add(new StaticFactory(PRIMARY, primaryConn)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PRIMARY); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + } + + @Test + public void testPrimarySelectorAmongUndefinedFactoriesWithOnePromotedToPrimary() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1, conn2).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toCollection(() -> factories)); + SqlConnection primaryConn = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, PRIMARY, primaryConn)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PRIMARY); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + } + + @Test + public void testReplicaSelectorAmongUndefinedFactoriesWithOnePromotedToReplica() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1, conn2).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toCollection(() -> factories)); + SqlConnection replicaConn = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, REPLICA, replicaConn)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.REPLICA); + assertEquals(replicaConn, factory.connect(context).result()); + assertEquals(replicaConn, factory.connect(context).result()); + assertEquals(replicaConn, factory.connect(context).result()); + assertEquals(replicaConn, factory.connect(context).result()); + } + + @Test + public void testReplicaSelectorAmongUndefinedFactories() { + List factories = new ArrayList<>(); + SqlConnection conn1Undefined = new DummySqlConnection(); + SqlConnection conn2Undefined = new DummySqlConnection(); + Stream.of(conn1Undefined, conn2Undefined).map(conn -> new StaticFactory(UNDEFINED, conn)).collect(toCollection(() -> factories)); + SqlConnection replicaConn1 = new DummySqlConnection(); + SqlConnection replicaConn2 = new DummySqlConnection(); + factories.add(new StaticFactory(REPLICA, replicaConn1)); + factories.add(new StaticFactory(REPLICA, replicaConn2)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.REPLICA); + // This is a pathological case for round-robin selector with filtering + // Due to multiple fallbacks first server with target type will be chosen more often + // Selector algorithm should be optimized. + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + } + + @Test + public void testPreferReplicaSelectorAmongUndefinedFactories() { + SqlConnection conn1 = new DummySqlConnection(); + SqlConnection conn2 = new DummySqlConnection(); + List factories = new ArrayList<>(); + Stream.of(conn1, conn2).map(conn -> new StaticFactory(UNDEFINED, PRIMARY, conn)).collect(toCollection(() -> factories)); + SqlConnection replicaConn1 = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, REPLICA, replicaConn1)); + SqlConnection replicaConn2 = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, REPLICA, replicaConn2)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PREFER_REPLICA); + // first full round of probing of unknown hosts + assertEquals(conn1, factory.connect(context).result()); + assertEquals(conn2, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + // then round of prioritizing + assertEquals(conn1, factory.connect(context).result()); + assertEquals(conn2, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + // then always return prioritized connections + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + assertEquals(replicaConn1, factory.connect(context).result()); + assertEquals(replicaConn2, factory.connect(context).result()); + } + + @Test + public void testPreferReplicaSelectorAmongPrimaryAndUndefinedFactoriesWhenReplicaFails() { + List factories = new ArrayList<>(); + SqlConnection replicaConn1 = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, REPLICA, replicaConn1, true)); + SqlConnection replicaConn2 = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, REPLICA, replicaConn2, true)); + SqlConnection primaryConn = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, PRIMARY, primaryConn, false)); + SqlConnection primaryConnFailing = new DummySqlConnection(); + factories.add(new StaticFactory(UNDEFINED, PRIMARY, primaryConnFailing, true)); + ConnectionFactory factory = ConnectionFactory.selector(factories, ServerRequirement.PREFER_REPLICA); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + assertEquals(primaryConn, factory.connect(context).result()); + } +}