Skip to content

Commit 1d283bb

Browse files
authored
Merge pull request #1305 from eclipse-vertx/connect-options-supplier
Connect options supplier support
2 parents 603e323 + 757e2f2 commit 1d283bb

File tree

60 files changed

+1082
-823
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1082
-823
lines changed

vertx-db2-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,11 @@ public static void poolSharing3(Vertx vertx, DB2ConnectOptions database, int max
407407
.setEventLoopSize(4));
408408
}
409409

410-
public void dynamicPoolConfig(Vertx vertx, DB2Pool pool) {
411-
pool.connectionProvider(ctx -> {
412-
Future<DB2ConnectOptions> fut = retrieveOptions();
413-
return fut.compose(connectOptions -> {
414-
// Do not forget to close later
415-
ConnectionFactory factory = DB2Driver.INSTANCE.createConnectionFactory(vertx, connectOptions);
416-
return factory.connect(ctx);
417-
});
418-
});
410+
public void dynamicPoolConfig(Vertx vertx, PoolOptions poolOptions) {
411+
DB2Pool pool = DB2Pool.pool(vertx, () -> {
412+
Future<DB2ConnectOptions> connectOptions = retrieveOptions();
413+
return connectOptions;
414+
}, poolOptions);
419415
}
420416

421417
private Future<DB2ConnectOptions> retrieveOptions() {

vertx-db2-client/src/main/java/io/vertx/db2client/DB2Pool.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.vertx.db2client;
1717

1818
import io.vertx.codegen.annotations.Fluent;
19+
import io.vertx.codegen.annotations.GenIgnore;
1920
import io.vertx.codegen.annotations.VertxGen;
2021
import io.vertx.core.Context;
2122
import io.vertx.core.Future;
@@ -31,6 +32,7 @@
3132
import java.util.Collections;
3233
import java.util.List;
3334
import java.util.function.Function;
35+
import java.util.function.Supplier;
3436

3537
import static io.vertx.db2client.DB2ConnectOptions.fromUri;
3638

@@ -109,6 +111,28 @@ static DB2Pool pool(Vertx vertx, List<DB2ConnectOptions> databases, PoolOptions
109111
return (DB2Pool) DB2Driver.INSTANCE.createPool(vertx, databases, options);
110112
}
111113

114+
/**
115+
* Create a connection pool to the DB2 {@code databases}. The supplier is called
116+
* to provide the options when a new connection is created by the pool.
117+
*
118+
* @param databases the databases supplier
119+
* @param poolOptions the options for creating the pool
120+
* @return the connection pool
121+
*/
122+
@GenIgnore(GenIgnore.PERMITTED_TYPE)
123+
static DB2Pool pool(Supplier<Future<DB2ConnectOptions>> databases, PoolOptions poolOptions) {
124+
return pool(null, databases, poolOptions);
125+
}
126+
127+
128+
/**
129+
* Like {@link #pool(Supplier, PoolOptions)} with a specific {@link Vertx} instance.
130+
*/
131+
@GenIgnore(GenIgnore.PERMITTED_TYPE)
132+
static DB2Pool pool(Vertx vertx, Supplier<Future<DB2ConnectOptions>> databases, PoolOptions poolOptions) {
133+
return (DB2Pool) DB2Driver.INSTANCE.createPool(vertx, databases, poolOptions);
134+
}
135+
112136
/**
113137
* Like {@link #client(String, PoolOptions)} with default options.
114138
*/

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,53 +21,57 @@
2121
import io.vertx.core.impl.ContextInternal;
2222
import io.vertx.core.impl.EventLoopContext;
2323
import io.vertx.core.impl.VertxInternal;
24-
import io.vertx.core.net.NetClientOptions;
24+
import io.vertx.core.net.NetClient;
2525
import io.vertx.core.net.SocketAddress;
2626
import io.vertx.core.net.impl.NetSocketInternal;
27+
import io.vertx.core.spi.metrics.ClientMetrics;
28+
import io.vertx.core.spi.metrics.VertxMetrics;
2729
import io.vertx.db2client.DB2ConnectOptions;
2830
import io.vertx.sqlclient.SqlConnectOptions;
2931
import io.vertx.sqlclient.SqlConnection;
3032
import io.vertx.sqlclient.impl.Connection;
3133
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
32-
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3334

34-
public class DB2ConnectionFactory extends ConnectionFactoryBase {
35+
import java.util.Map;
36+
import java.util.function.Predicate;
37+
import java.util.function.Supplier;
3538

36-
private int pipeliningLimit;
39+
public class DB2ConnectionFactory extends ConnectionFactoryBase {
3740

38-
public DB2ConnectionFactory(VertxInternal vertx, DB2ConnectOptions options) {
41+
public DB2ConnectionFactory(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> options) {
3942
super(vertx, options);
4043
}
4144

4245
@Override
43-
protected void initializeConfiguration(SqlConnectOptions connectOptions) {
44-
DB2ConnectOptions options = (DB2ConnectOptions) connectOptions;
45-
this.pipeliningLimit = options.getPipeliningLimit();
46-
}
47-
48-
@Override
49-
protected void configureNetClientOptions(NetClientOptions netClientOptions) {
50-
// currently no-op
51-
}
52-
53-
@Override
54-
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
46+
protected Future<Connection> doConnectInternal(SqlConnectOptions options, EventLoopContext context) {
47+
DB2ConnectOptions db2Options = DB2ConnectOptions.wrap(options);
48+
SocketAddress server = options.getSocketAddress();
49+
boolean cachePreparedStatements = options.getCachePreparedStatements();
50+
int preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize();
51+
Predicate<String> preparedStatementCacheSqlFilter = options.getPreparedStatementCacheSqlFilter();
52+
String username = options.getUser();
53+
String password = options.getPassword();
54+
String database = options.getDatabase();
55+
Map<String, String> properties = options.getProperties();
56+
int pipeliningLimit = db2Options.getPipeliningLimit();
57+
NetClient netClient = netClient(options);
5558
return netClient.connect(server).flatMap(so -> {
56-
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, cachePreparedStatements,
59+
VertxMetrics vertxMetrics = vertx.metricsSPI();
60+
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(db2Options.getSocketAddress(), "sql", db2Options.getMetricsName()) : null;
61+
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, metrics, db2Options, cachePreparedStatements,
5762
preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
5863
conn.init();
5964
return Future.future(p -> conn.sendStartupMessage(username, password, database, properties, p));
6065
});
6166
}
6267

6368
@Override
64-
public Future<SqlConnection> connect(Context context) {
69+
public Future<SqlConnection> connect(Context context, SqlConnectOptions options) {
6570
ContextInternal contextInternal = (ContextInternal) context;
66-
QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options);
6771
Promise<SqlConnection> promise = contextInternal.promise();
68-
connect(asEventLoopContext(contextInternal))
72+
connect(asEventLoopContext(contextInternal), options)
6973
.map(conn -> {
70-
DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(contextInternal, this, conn, tracer, null);
74+
DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(contextInternal, this, conn);
7175
conn.init(db2Connection);
7276
return (SqlConnection)db2Connection;
7377
}).onComplete(promise);

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import io.vertx.core.Handler;
2121
import io.vertx.core.Vertx;
2222
import io.vertx.core.impl.ContextInternal;
23-
import io.vertx.core.spi.metrics.ClientMetrics;
2423
import io.vertx.db2client.DB2ConnectOptions;
2524
import io.vertx.db2client.DB2Connection;
2625
import io.vertx.db2client.impl.command.PingCommand;
2726
import io.vertx.db2client.spi.DB2Driver;
2827
import io.vertx.sqlclient.impl.Connection;
28+
import io.vertx.sqlclient.impl.SingletonSupplier;
2929
import io.vertx.sqlclient.impl.SqlConnectionBase;
30-
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3130
import io.vertx.sqlclient.spi.ConnectionFactory;
3231

3332
public class DB2ConnectionImpl extends SqlConnectionBase<DB2ConnectionImpl> implements DB2Connection {
@@ -36,16 +35,16 @@ public static Future<DB2Connection> connect(Vertx vertx, DB2ConnectOptions optio
3635
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
3736
DB2ConnectionFactory client;
3837
try {
39-
client = new DB2ConnectionFactory(ctx.owner(), options);
38+
client = new DB2ConnectionFactory(ctx.owner(), SingletonSupplier.wrap(options));
4039
} catch (Exception e) {
4140
return ctx.failedFuture(e);
4241
}
4342
ctx.addCloseHook(client);
4443
return (Future) client.connect(ctx);
4544
}
4645

47-
public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
48-
super(context, factory, conn, DB2Driver.INSTANCE, tracer, metrics);
46+
public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) {
47+
super(context, factory, conn, DB2Driver.INSTANCE);
4948
}
5049

5150
@Override

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.vertx.core.AsyncResult;
2323
import io.vertx.core.Handler;
2424
import io.vertx.core.Promise;
25-
import io.vertx.core.impl.ContextInternal;
2625
import io.vertx.core.impl.EventLoopContext;
2726
import io.vertx.core.net.impl.NetSocketInternal;
27+
import io.vertx.core.spi.metrics.ClientMetrics;
28+
import io.vertx.db2client.DB2ConnectOptions;
2829
import io.vertx.db2client.impl.codec.DB2Codec;
2930
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
3031
import io.vertx.db2client.impl.drda.ConnectionMetaData;
32+
import io.vertx.sqlclient.SqlConnectOptions;
3133
import io.vertx.sqlclient.impl.Connection;
3234
import io.vertx.sqlclient.impl.QueryResultHandler;
3335
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -40,17 +42,21 @@
4042

4143
public class DB2SocketConnection extends SocketConnectionBase {
4244

45+
private final DB2ConnectOptions connectOptions;
4346
private DB2Codec codec;
4447
private Handler<Void> closeHandler;
4548
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
4649

4750
public DB2SocketConnection(NetSocketInternal socket,
51+
ClientMetrics clientMetrics,
52+
DB2ConnectOptions connectOptions,
4853
boolean cachePreparedStatements,
4954
int preparedStatementCacheSize,
5055
Predicate<String> preparedStatementCacheSqlFilter,
5156
int pipeliningLimit,
5257
EventLoopContext context) {
53-
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
58+
super(socket, clientMetrics, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
59+
this.connectOptions = connectOptions;
5460
}
5561

5662
// TODO RETURN FUTURE ???
@@ -63,6 +69,11 @@ void sendStartupMessage(String username,
6369
schedule(context, cmd).onComplete(completionHandler);
6470
}
6571

72+
@Override
73+
protected SqlConnectOptions connectOptions() {
74+
return connectOptions;
75+
}
76+
6677
@Override
6778
public void init() {
6879
codec = new DB2Codec(this);

vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,25 @@
1515
*/
1616
package io.vertx.db2client.spi;
1717

18+
import io.vertx.core.Future;
1819
import io.vertx.core.Vertx;
1920
import io.vertx.core.impl.CloseFuture;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.core.impl.VertxInternal;
2223
import io.vertx.core.json.JsonObject;
23-
import io.vertx.core.spi.metrics.ClientMetrics;
24-
import io.vertx.core.spi.metrics.VertxMetrics;
2524
import io.vertx.db2client.DB2ConnectOptions;
26-
import io.vertx.db2client.DB2Pool;
2725
import io.vertx.db2client.impl.*;
26+
import io.vertx.sqlclient.Pool;
2827
import io.vertx.sqlclient.PoolOptions;
2928
import io.vertx.sqlclient.SqlConnectOptions;
3029
import io.vertx.sqlclient.impl.Connection;
3130
import io.vertx.sqlclient.impl.PoolImpl;
31+
import io.vertx.sqlclient.impl.SingletonSupplier;
3232
import io.vertx.sqlclient.impl.SqlConnectionInternal;
33-
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3433
import io.vertx.sqlclient.spi.ConnectionFactory;
3534
import io.vertx.sqlclient.spi.Driver;
3635

37-
import java.util.List;
38-
import java.util.stream.Collectors;
36+
import java.util.function.Supplier;
3937

4038
public class DB2Driver implements Driver {
4139

@@ -44,7 +42,7 @@ public class DB2Driver implements Driver {
4442
public static final DB2Driver INSTANCE = new DB2Driver();
4543

4644
@Override
47-
public DB2Pool newPool(Vertx vertx, List<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
45+
public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
4846
VertxInternal vx = (VertxInternal) vertx;
4947
PoolImpl pool;
5048
if (options.isShared()) {
@@ -55,16 +53,10 @@ public DB2Pool newPool(Vertx vertx, List<? extends SqlConnectOptions> databases,
5553
return new DB2PoolImpl(vx, closeFuture, pool);
5654
}
5755

58-
private PoolImpl newPoolImpl(VertxInternal vertx, List<? extends SqlConnectOptions> databases, PoolOptions options, CloseFuture closeFuture) {
59-
DB2ConnectOptions baseConnectOptions = DB2ConnectOptions.wrap(databases.get(0));
60-
QueryTracer tracer = vertx.tracer() == null ? null : new QueryTracer(vertx.tracer(), baseConnectOptions);
61-
VertxMetrics vertxMetrics = vertx.metricsSPI();
62-
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(baseConnectOptions.getSocketAddress(), "sql", baseConnectOptions.getMetricsName()) : null;
56+
private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
6357
boolean pipelinedPool = options instanceof Db2PoolOptions && ((Db2PoolOptions) options).isPipelined();
64-
int pipeliningLimit = pipelinedPool ? baseConnectOptions.getPipeliningLimit() : 1;
65-
PoolImpl pool = new PoolImpl(vertx, this, tracer, metrics, pipeliningLimit, options, null, null, closeFuture);
66-
List<ConnectionFactory> lst = databases.stream().map(o -> createConnectionFactory(vertx, o)).collect(Collectors.toList());
67-
ConnectionFactory factory = ConnectionFactory.roundRobinSelector(lst);
58+
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture);
59+
ConnectionFactory factory = createConnectionFactory(vertx, databases);
6860
pool.connectionProvider(factory::connect);
6961
pool.init();
7062
closeFuture.add(factory);
@@ -84,11 +76,16 @@ public boolean acceptsOptions(SqlConnectOptions options) {
8476

8577
@Override
8678
public ConnectionFactory createConnectionFactory(Vertx vertx, SqlConnectOptions database) {
87-
return new DB2ConnectionFactory((VertxInternal) vertx, DB2ConnectOptions.wrap(database));
79+
return new DB2ConnectionFactory((VertxInternal) vertx, SingletonSupplier.wrap(database));
8880
}
8981

9082
@Override
91-
public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory factory, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
92-
return new DB2ConnectionImpl(context, factory, conn, tracer, metrics);
83+
public ConnectionFactory createConnectionFactory(Vertx vertx, Supplier<? extends Future<? extends SqlConnectOptions>> database) {
84+
return new DB2ConnectionFactory((VertxInternal) vertx, database);
85+
}
86+
87+
@Override
88+
public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory factory, Connection conn) {
89+
return new DB2ConnectionImpl(context, factory, conn);
9390
}
9491
}

vertx-mssql-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -416,15 +416,11 @@ public static void poolSharing3(Vertx vertx, MSSQLConnectOptions database, int m
416416
.setEventLoopSize(4));
417417
}
418418

419-
public void dynamicPoolConfig(Vertx vertx, MSSQLPool pool) {
420-
pool.connectionProvider(ctx -> {
421-
Future<MSSQLConnectOptions> fut = retrieveOptions();
422-
return fut.compose(connectOptions -> {
423-
// Do not forget to close later
424-
ConnectionFactory factory = MSSQLDriver.INSTANCE.createConnectionFactory(vertx, connectOptions);
425-
return factory.connect(ctx);
426-
});
427-
});
419+
public void dynamicPoolConfig(Vertx vertx, PoolOptions poolOptions) {
420+
MSSQLPool pool = MSSQLPool.pool(vertx, () -> {
421+
Future<MSSQLConnectOptions> connectOptions = retrieveOptions();
422+
return connectOptions;
423+
}, poolOptions);
428424
}
429425

430426
private Future<MSSQLConnectOptions> retrieveOptions() {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/MSSQLPool.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@
1212
package io.vertx.mssqlclient;
1313

1414
import io.vertx.codegen.annotations.Fluent;
15+
import io.vertx.codegen.annotations.GenIgnore;
1516
import io.vertx.codegen.annotations.VertxGen;
1617
import io.vertx.core.Context;
1718
import io.vertx.core.Future;
1819
import io.vertx.core.Handler;
1920
import io.vertx.core.Vertx;
2021
import io.vertx.mssqlclient.spi.MSSQLDriver;
2122
import io.vertx.sqlclient.*;
23+
import io.vertx.sqlclient.impl.SingletonSupplier;
2224

23-
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.function.Function;
27+
import java.util.function.Supplier;
2628

2729
import static io.vertx.mssqlclient.MSSQLConnectOptions.fromUri;
2830

@@ -75,7 +77,7 @@ static MSSQLPool pool(MSSQLConnectOptions database, PoolOptions options) {
7577
* Like {@link #pool(MSSQLConnectOptions, PoolOptions)} with a specific {@link Vertx} instance.
7678
*/
7779
static MSSQLPool pool(Vertx vertx, MSSQLConnectOptions database, PoolOptions options) {
78-
return pool(vertx, Collections.singletonList(database), options);
80+
return pool(vertx, SingletonSupplier.wrap(database), options);
7981
}
8082

8183
/**
@@ -97,6 +99,27 @@ static MSSQLPool pool(Vertx vertx, List<MSSQLConnectOptions> databases, PoolOpti
9799
return (MSSQLPool) MSSQLDriver.INSTANCE.createPool(vertx, databases, options);
98100
}
99101

102+
/**
103+
* Create a connection pool to the SQL Server {@code databases}. The supplier is called
104+
* to provide the options when a new connection is created by the pool.
105+
*
106+
* @param databases the databases supplier
107+
* @param options the options for creating the pool
108+
* @return the connection pool
109+
*/
110+
@GenIgnore(GenIgnore.PERMITTED_TYPE)
111+
static MSSQLPool pool(Supplier<Future<MSSQLConnectOptions>> databases, PoolOptions options) {
112+
return pool(null, databases, options);
113+
}
114+
115+
/**
116+
* Like {@link #pool(Supplier, PoolOptions)} with a specific {@link Vertx} instance.
117+
*/
118+
@GenIgnore(GenIgnore.PERMITTED_TYPE)
119+
static MSSQLPool pool(Vertx vertx, Supplier<Future<MSSQLConnectOptions>> databases, PoolOptions options) {
120+
return (MSSQLPool) MSSQLDriver.INSTANCE.createPool(vertx, databases, options);
121+
}
122+
100123
@Override
101124
MSSQLPool connectHandler(Handler<SqlConnection> handler);
102125

0 commit comments

Comments
 (0)