Skip to content

Commit 8446541

Browse files
committed
Refactor the connection factory design to be decoupled from the connect options.
1 parent 603e323 commit 8446541

File tree

26 files changed

+559
-220
lines changed

26 files changed

+559
-220
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.impl.ContextInternal;
2222
import io.vertx.core.impl.EventLoopContext;
2323
import io.vertx.core.impl.VertxInternal;
24+
import io.vertx.core.net.NetClient;
2425
import io.vertx.core.net.NetClientOptions;
2526
import io.vertx.core.net.SocketAddress;
2627
import io.vertx.core.net.impl.NetSocketInternal;
@@ -31,18 +32,19 @@
3132
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
3233
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3334

34-
public class DB2ConnectionFactory extends ConnectionFactoryBase {
35+
import java.util.Map;
36+
import java.util.function.Predicate;
37+
import java.util.function.Supplier;
3538

36-
private int pipeliningLimit;
39+
public class DB2ConnectionFactory extends ConnectionFactoryBase {
3740

38-
public DB2ConnectionFactory(VertxInternal vertx, DB2ConnectOptions options) {
41+
public DB2ConnectionFactory(VertxInternal vertx, Supplier<DB2ConnectOptions> options) {
3942
super(vertx, options);
4043
}
4144

4245
@Override
4346
protected void initializeConfiguration(SqlConnectOptions connectOptions) {
4447
DB2ConnectOptions options = (DB2ConnectOptions) connectOptions;
45-
this.pipeliningLimit = options.getPipeliningLimit();
4648
}
4749

4850
@Override
@@ -51,7 +53,17 @@ protected void configureNetClientOptions(NetClientOptions netClientOptions) {
5153
}
5254

5355
@Override
54-
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
56+
protected Future<Connection> doConnectInternal(SqlConnectOptions options, EventLoopContext context) {
57+
SocketAddress server = options.getSocketAddress();
58+
boolean cachePreparedStatements = options.getCachePreparedStatements();
59+
int preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize();
60+
Predicate<String> preparedStatementCacheSqlFilter = options.getPreparedStatementCacheSqlFilter();
61+
String username = options.getUser();
62+
String password = options.getPassword();
63+
String database = options.getDatabase();
64+
Map<String, String> properties = options.getProperties();
65+
int pipeliningLimit = ((DB2ConnectOptions) options).getPipeliningLimit();
66+
NetClient netClient = netClient(options);
5567
return netClient.connect(server).flatMap(so -> {
5668
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, cachePreparedStatements,
5769
preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
@@ -61,11 +73,11 @@ protected Future<Connection> doConnectInternal(SocketAddress server, String user
6173
}
6274

6375
@Override
64-
public Future<SqlConnection> connect(Context context) {
76+
public Future<SqlConnection> connect(Context context, SqlConnectOptions options) {
6577
ContextInternal contextInternal = (ContextInternal) context;
6678
QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options);
6779
Promise<SqlConnection> promise = contextInternal.promise();
68-
connect(asEventLoopContext(contextInternal))
80+
connect(asEventLoopContext(contextInternal), options)
6981
.map(conn -> {
7082
DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(contextInternal, this, conn, tracer, null);
7183
conn.init(db2Connection);

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static Future<DB2Connection> connect(Vertx vertx, DB2ConnectOptions optio
3636
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
3737
DB2ConnectionFactory client;
3838
try {
39-
client = new DB2ConnectionFactory(ctx.owner(), options);
39+
client = new DB2ConnectionFactory(ctx.owner(), () -> options);
4040
} catch (Exception e) {
4141
return ctx.failedFuture(e);
4242
}

vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.vertx.db2client.DB2ConnectOptions;
2626
import io.vertx.db2client.DB2Pool;
2727
import io.vertx.db2client.impl.*;
28+
import io.vertx.sqlclient.Pool;
2829
import io.vertx.sqlclient.PoolOptions;
2930
import io.vertx.sqlclient.SqlConnectOptions;
3031
import io.vertx.sqlclient.impl.Connection;
@@ -35,6 +36,7 @@
3536
import io.vertx.sqlclient.spi.Driver;
3637

3738
import java.util.List;
39+
import java.util.function.Supplier;
3840
import java.util.stream.Collectors;
3941

4042
public class DB2Driver implements Driver {
@@ -43,6 +45,33 @@ public class DB2Driver implements Driver {
4345

4446
public static final DB2Driver INSTANCE = new DB2Driver();
4547

48+
@Override
49+
public Pool newPool(Vertx vertx, Supplier<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
50+
VertxInternal vx = (VertxInternal) vertx;
51+
PoolImpl pool;
52+
if (options.isShared()) {
53+
pool = vx.createSharedClient(SHARED_CLIENT_KEY, options.getName(), closeFuture, cf -> newPoolImpl(vx, databases, options, cf));
54+
} else {
55+
pool = newPoolImpl(vx, databases, options, closeFuture);
56+
}
57+
return new DB2PoolImpl(vx, closeFuture, pool);
58+
}
59+
60+
private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
61+
DB2ConnectOptions baseConnectOptions = DB2ConnectOptions.wrap(databases.get());
62+
QueryTracer tracer = vertx.tracer() == null ? null : new QueryTracer(vertx.tracer(), baseConnectOptions);
63+
VertxMetrics vertxMetrics = vertx.metricsSPI();
64+
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(baseConnectOptions.getSocketAddress(), "sql", baseConnectOptions.getMetricsName()) : null;
65+
boolean pipelinedPool = options instanceof Db2PoolOptions && ((Db2PoolOptions) options).isPipelined();
66+
int pipeliningLimit = pipelinedPool ? baseConnectOptions.getPipeliningLimit() : 1;
67+
PoolImpl pool = new PoolImpl(vertx, this, tracer, metrics, pipeliningLimit, options, null, null, closeFuture);
68+
ConnectionFactory factory = createConnectionFactory(vertx, databases);
69+
pool.connectionProvider(factory::connect);
70+
pool.init();
71+
closeFuture.add(factory);
72+
return pool;
73+
}
74+
4675
@Override
4776
public DB2Pool newPool(Vertx vertx, List<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
4877
VertxInternal vx = (VertxInternal) vertx;
@@ -84,7 +113,12 @@ public boolean acceptsOptions(SqlConnectOptions options) {
84113

85114
@Override
86115
public ConnectionFactory createConnectionFactory(Vertx vertx, SqlConnectOptions database) {
87-
return new DB2ConnectionFactory((VertxInternal) vertx, DB2ConnectOptions.wrap(database));
116+
return new DB2ConnectionFactory((VertxInternal) vertx, () -> DB2ConnectOptions.wrap(database));
117+
}
118+
119+
@Override
120+
public ConnectionFactory createConnectionFactory(Vertx vertx, Supplier<? extends SqlConnectOptions> database) {
121+
return new DB2ConnectionFactory((VertxInternal) vertx, () -> DB2ConnectOptions.wrap(database.get()));
88122
}
89123

90124
@Override

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.vertx.core.impl.ContextInternal;
1818
import io.vertx.core.impl.EventLoopContext;
1919
import io.vertx.core.impl.VertxInternal;
20+
import io.vertx.core.net.NetClient;
2021
import io.vertx.core.net.NetClientOptions;
2122
import io.vertx.core.net.NetSocket;
2223
import io.vertx.core.net.SocketAddress;
@@ -28,17 +29,15 @@
2829
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
2930
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3031

32+
import java.util.Map;
33+
import java.util.function.Supplier;
34+
3135
import static io.vertx.mssqlclient.impl.codec.EncryptionLevel.*;
3236

3337
public class MSSQLConnectionFactory extends ConnectionFactoryBase {
3438

35-
private final int desiredPacketSize;
36-
private final boolean clientConfigSsl;
37-
38-
public MSSQLConnectionFactory(VertxInternal vertx, MSSQLConnectOptions options) {
39+
public MSSQLConnectionFactory(VertxInternal vertx, Supplier<MSSQLConnectOptions> options) {
3940
super(vertx, options);
40-
desiredPacketSize = options.getPacketSize();
41-
clientConfigSsl = options.isSsl();
4241
}
4342

4443
@Override
@@ -48,23 +47,27 @@ protected void initializeConfiguration(SqlConnectOptions options) {
4847

4948
@Override
5049
protected void configureNetClientOptions(NetClientOptions netClientOptions) {
51-
// Always start unencrypted, the connection will be upgraded if client and server agree
5250
netClientOptions.setSsl(false);
5351
}
5452

5553
@Override
56-
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
57-
return connectOrRedirect(server, username, password, database, context, 0);
54+
protected Future<Connection> doConnectInternal(SqlConnectOptions options, EventLoopContext context) {
55+
return connectOrRedirect((MSSQLConnectOptions) options, context, 0);
5856
}
5957

60-
private Future<Connection> connectOrRedirect(SocketAddress server, String username, String password, String database, EventLoopContext context, int redirections) {
58+
private Future<Connection> connectOrRedirect(MSSQLConnectOptions options, EventLoopContext context, int redirections) {
6159
if (redirections > 1) {
6260
return context.failedFuture("The client can be redirected only once");
6361
}
62+
SocketAddress server = options.getSocketAddress();
63+
boolean clientSslConfig = options.isSsl();
64+
int desiredPacketSize = options.getPacketSize();
65+
// Always start unencrypted, the connection will be upgraded if client and server agree
66+
NetClient netClient = netClient(new NetClientOptions(options).setSsl(false));
6467
return netClient.connect(server)
65-
.map(so -> createSocketConnection(so, context))
66-
.compose(conn -> conn.sendPreLoginMessage(clientConfigSsl)
67-
.compose(encryptionLevel -> login(conn, username, password, database, encryptionLevel, context))
68+
.map(so -> createSocketConnection(so, desiredPacketSize, context))
69+
.compose(conn -> conn.sendPreLoginMessage(clientSslConfig)
70+
.compose(encryptionLevel -> login(conn, options, encryptionLevel, context))
6871
)
6972
.compose(connBase -> {
7073
MSSQLSocketConnection conn = (MSSQLSocketConnection) connBase;
@@ -74,39 +77,44 @@ private Future<Connection> connectOrRedirect(SocketAddress server, String userna
7477
}
7578
Promise<Void> closePromise = context.promise();
7679
conn.close(null, closePromise);
77-
return closePromise.future().transform(v -> connectOrRedirect(alternateServer, username, password, database, context, redirections + 1));
80+
return closePromise.future().transform(v -> connectOrRedirect(options, context, redirections + 1));
7881
});
7982
}
8083

81-
private MSSQLSocketConnection createSocketConnection(NetSocket so, EventLoopContext context) {
84+
private MSSQLSocketConnection createSocketConnection(NetSocket so, int desiredPacketSize, EventLoopContext context) {
8285
MSSQLSocketConnection conn = new MSSQLSocketConnection((NetSocketInternal) so, desiredPacketSize, false, 0, sql -> true, 1, context);
8386
conn.init();
8487
return conn;
8588
}
8689

87-
private Future<Connection> login(MSSQLSocketConnection conn, String username, String password, String database, Byte encryptionLevel, EventLoopContext context) {
88-
if (clientConfigSsl && encryptionLevel != ENCRYPT_ON && encryptionLevel != ENCRYPT_REQ) {
90+
private Future<Connection> login(MSSQLSocketConnection conn, MSSQLConnectOptions options, Byte encryptionLevel, EventLoopContext context) {
91+
boolean clientSslConfig = options.isSsl();
92+
if (clientSslConfig && encryptionLevel != ENCRYPT_ON && encryptionLevel != ENCRYPT_REQ) {
8993
Promise<Void> closePromise = context.promise();
9094
conn.close(null, closePromise);
9195
return closePromise.future().transform(v -> context.failedFuture("The client is configured for encryption but the server does not support it"));
9296
}
9397
Future<Void> future;
9498
if (encryptionLevel != ENCRYPT_NOT_SUP) {
9599
// Start connection encryption ...
96-
future = conn.enableSsl(clientConfigSsl, encryptionLevel, (MSSQLConnectOptions) options);
100+
future = conn.enableSsl(clientSslConfig, encryptionLevel, (MSSQLConnectOptions) options);
97101
} else {
98102
// ... unless the client did not require encryption and the server does not support it
99103
future = context.succeededFuture();
100104
}
105+
String username = options.getUser();
106+
String password = options.getPassword();
107+
String database = options.getDatabase();
108+
Map<String, String> properties = options.getProperties();
101109
return future.compose(v -> conn.sendLoginMessage(username, password, database, properties));
102110
}
103111

104112
@Override
105-
public Future<SqlConnection> connect(Context context) {
113+
public Future<SqlConnection> connect(Context context, SqlConnectOptions options) {
106114
ContextInternal ctx = (ContextInternal) context;
107115
QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options);
108116
Promise<SqlConnection> promise = ctx.promise();
109-
connect(asEventLoopContext(ctx))
117+
connect(asEventLoopContext(ctx), options)
110118
.map(conn -> {
111119
MSSQLConnectionImpl msConn = new MSSQLConnectionImpl(ctx, this, conn, tracer, null);
112120
conn.init(msConn);

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public MSSQLConnectionImpl(ContextInternal context, ConnectionFactory factory, C
3636

3737
public static Future<MSSQLConnection> connect(Vertx vertx, MSSQLConnectOptions options) {
3838
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
39-
MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx.owner(), options);
39+
MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx.owner(), () -> options);
4040
ctx.addCloseHook(client);
4141
return (Future)client.connect(ctx);
4242
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/spi/MSSQLDriver.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.vertx.mssqlclient.impl.MSSQLConnectionImpl;
2929
import io.vertx.mssqlclient.impl.MSSQLConnectionUriParser;
3030
import io.vertx.mssqlclient.impl.MSSQLPoolImpl;
31+
import io.vertx.sqlclient.Pool;
3132
import io.vertx.sqlclient.PoolOptions;
3233
import io.vertx.sqlclient.SqlConnectOptions;
3334
import io.vertx.sqlclient.impl.Connection;
@@ -38,6 +39,7 @@
3839
import io.vertx.sqlclient.spi.Driver;
3940

4041
import java.util.List;
42+
import java.util.function.Supplier;
4143
import java.util.stream.Collectors;
4244

4345
public class MSSQLDriver implements Driver {
@@ -46,6 +48,24 @@ public class MSSQLDriver implements Driver {
4648

4749
public static final MSSQLDriver INSTANCE = new MSSQLDriver();
4850

51+
@Override
52+
public Pool newPool(Vertx vertx, Supplier<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
53+
return null;
54+
}
55+
56+
private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
57+
MSSQLConnectOptions baseConnectOptions = MSSQLConnectOptions.wrap(databases.get());
58+
QueryTracer tracer = vertx.tracer() == null ? null : new QueryTracer(vertx.tracer(), baseConnectOptions);
59+
VertxMetrics vertxMetrics = vertx.metricsSPI();
60+
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(baseConnectOptions.getSocketAddress(), "sql", baseConnectOptions.getMetricsName()) : null;
61+
PoolImpl pool = new PoolImpl(vertx, this, tracer, metrics, 1, options, null, null, closeFuture);
62+
ConnectionFactory factory = createConnectionFactory(vertx, databases);
63+
pool.connectionProvider(factory::connect);
64+
pool.init();
65+
closeFuture.add(factory);
66+
return pool;
67+
}
68+
4969
@Override
5070
public MSSQLPool newPool(Vertx vertx, List<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
5171
VertxInternal vx = (VertxInternal) vertx;
@@ -85,7 +105,12 @@ public boolean acceptsOptions(SqlConnectOptions options) {
85105

86106
@Override
87107
public ConnectionFactory createConnectionFactory(Vertx vertx, SqlConnectOptions database) {
88-
return new MSSQLConnectionFactory((VertxInternal) vertx, MSSQLConnectOptions.wrap(database));
108+
return new MSSQLConnectionFactory((VertxInternal) vertx, () -> MSSQLConnectOptions.wrap(database));
109+
}
110+
111+
@Override
112+
public ConnectionFactory createConnectionFactory(Vertx vertx, Supplier<? extends SqlConnectOptions> database) {
113+
return new MSSQLConnectionFactory((VertxInternal) vertx, () -> MSSQLConnectOptions.wrap(database.get()));
89114
}
90115

91116
@Override

0 commit comments

Comments
 (0)