Skip to content

Commit a7657dc

Browse files
committed
Client async close - closes #648
1 parent 4e508a9 commit a7657dc

File tree

27 files changed

+230
-114
lines changed

27 files changed

+230
-114
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.vertx.core.Future;
2222
import io.vertx.core.Promise;
23-
import io.vertx.core.Vertx;
2423
import io.vertx.core.impl.ContextInternal;
2524
import io.vertx.core.net.NetClient;
2625
import io.vertx.core.net.NetClientOptions;
@@ -45,7 +44,7 @@ public class DB2ConnectionFactory implements ConnectionFactory {
4544
private final int preparedStatementCacheSqlLimit;
4645
private final int pipeliningLimit;
4746

48-
public DB2ConnectionFactory(Vertx vertx, ContextInternal context, DB2ConnectOptions options) {
47+
public DB2ConnectionFactory(ContextInternal context, DB2ConnectOptions options) {
4948
NetClientOptions netClientOptions = new NetClientOptions(options);
5049

5150
this.context = context;
@@ -62,11 +61,11 @@ public DB2ConnectionFactory(Vertx vertx, ContextInternal context, DB2ConnectOpti
6261
this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit();
6362
this.pipeliningLimit = options.getPipeliningLimit();
6463

65-
this.netClient = vertx.createNetClient(netClientOptions);
64+
this.netClient = context.owner().createNetClient(netClientOptions);
6665
}
6766

68-
public void close() {
69-
netClient.close();
67+
public Future<Void> close() {
68+
return netClient.close();
7069
}
7170

7271
@Override

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2PoolImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.vertx.core.AsyncResult;
1919
import io.vertx.core.Handler;
20+
import io.vertx.core.impl.CloseFuture;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.db2client.DB2ConnectOptions;
2223
import io.vertx.db2client.DB2Pool;
@@ -29,13 +30,20 @@ public class DB2PoolImpl extends PoolBase<DB2PoolImpl> implements DB2Pool {
2930

3031
public static DB2PoolImpl create(ContextInternal context, boolean closeVertx, DB2ConnectOptions connectOptions,
3132
PoolOptions poolOptions) {
32-
return new DB2PoolImpl(context, closeVertx, poolOptions, new DB2ConnectionFactory(context.owner(), context, connectOptions));
33+
DB2PoolImpl pool = new DB2PoolImpl(context, poolOptions, new DB2ConnectionFactory(context, connectOptions));
34+
CloseFuture closeFuture = pool.closeFuture();
35+
if (closeVertx) {
36+
closeFuture.onComplete(ar -> context.owner().close());
37+
} else {
38+
context.addCloseHook(closeFuture);
39+
}
40+
return pool;
3341
}
3442

3543
private final DB2ConnectionFactory factory;
3644

37-
private DB2PoolImpl(ContextInternal context, boolean closeVertx, PoolOptions poolOptions, DB2ConnectionFactory factory) {
38-
super(context, factory, poolOptions, closeVertx);
45+
private DB2PoolImpl(ContextInternal context, PoolOptions poolOptions, DB2ConnectionFactory factory) {
46+
super(context, factory, poolOptions);
3947
this.factory = factory;
4048
}
4149

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class MSSQLConnectionFactory implements ConnectionFactory {
3535
private final String database;
3636
private final Map<String, String> properties;
3737

38-
MSSQLConnectionFactory(Vertx vertx, ContextInternal context, MSSQLConnectOptions options) {
38+
MSSQLConnectionFactory(ContextInternal context, MSSQLConnectOptions options) {
3939
NetClientOptions netClientOptions = new NetClientOptions(options);
4040

4141
this.context = context;
@@ -45,7 +45,7 @@ class MSSQLConnectionFactory implements ConnectionFactory {
4545
this.password = options.getPassword();
4646
this.database = options.getDatabase();
4747
this.properties = new HashMap<>(options.getProperties());
48-
this.netClient = vertx.createNetClient(netClientOptions);
48+
this.netClient = context.owner().createNetClient(netClientOptions);
4949
}
5050

5151
@Override
@@ -81,7 +81,7 @@ private void close(Handler<AsyncResult<Void>> completionHandler) {
8181
completionHandler.handle(Future.succeededFuture());
8282
}
8383

84-
public void close() {
85-
netClient.close();
84+
public Future<Void> close() {
85+
return netClient.close();
8686
}
8787
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLPoolImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,37 @@
1212
package io.vertx.mssqlclient.impl;
1313

1414
import io.vertx.core.Future;
15+
import io.vertx.core.impl.CloseFuture;
1516
import io.vertx.core.impl.ContextInternal;
1617
import io.vertx.mssqlclient.MSSQLConnectOptions;
1718
import io.vertx.mssqlclient.MSSQLPool;
1819
import io.vertx.core.AsyncResult;
1920
import io.vertx.core.Handler;
2021
import io.vertx.sqlclient.PoolOptions;
2122
import io.vertx.sqlclient.SqlClient;
22-
import io.vertx.sqlclient.Transaction;
2323
import io.vertx.sqlclient.impl.Connection;
2424
import io.vertx.sqlclient.impl.PoolBase;
2525
import io.vertx.sqlclient.impl.SqlConnectionImpl;
26-
import io.vertx.sqlclient.impl.pool.ConnectionPool;
2726

2827
import java.util.function.Function;
2928

3029
public class MSSQLPoolImpl extends PoolBase<MSSQLPoolImpl> implements MSSQLPool {
3130

3231
public static MSSQLPoolImpl create(ContextInternal context, boolean closeVertx, MSSQLConnectOptions connectOptions, PoolOptions poolOptions) {
33-
return new MSSQLPoolImpl(context, closeVertx, new MSSQLConnectionFactory(context.owner(), context, connectOptions), poolOptions);
32+
MSSQLPoolImpl pool = new MSSQLPoolImpl(context, new MSSQLConnectionFactory(context, connectOptions), poolOptions);
33+
CloseFuture closeFuture = pool.closeFuture();
34+
if (closeVertx) {
35+
closeFuture.onComplete(ar -> context.owner().close());
36+
} else {
37+
context.addCloseHook(closeFuture);
38+
}
39+
return pool;
3440
}
3541

3642
private final MSSQLConnectionFactory connectionFactory;
3743

38-
private MSSQLPoolImpl(ContextInternal context, boolean closeVertx, MSSQLConnectionFactory factory, PoolOptions poolOptions) {
39-
super(context, factory, poolOptions, closeVertx);
44+
private MSSQLPoolImpl(ContextInternal context, MSSQLConnectionFactory factory, PoolOptions poolOptions) {
45+
super(context, factory, poolOptions);
4046
this.connectionFactory = factory;
4147
}
4248

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class MySQLConnectionFactory implements ConnectionFactory {
4949
private final int preparedStatementCacheSqlLimit;
5050
private final int initialCapabilitiesFlags;
5151

52-
public MySQLConnectionFactory(Vertx vertx, ContextInternal context, MySQLConnectOptions options) {
52+
public MySQLConnectionFactory(ContextInternal context, MySQLConnectOptions options) {
5353
NetClientOptions netClientOptions = new NetClientOptions(options);
5454

5555
this.context = context;
@@ -88,7 +88,7 @@ public MySQLConnectionFactory(Vertx vertx, ContextInternal context, MySQLConnect
8888
serverRsaPublicKey = options.getServerRsaPublicKeyValue();
8989
} else {
9090
if (options.getServerRsaPublicKeyPath() != null) {
91-
serverRsaPublicKey = vertx.fileSystem().readFileBlocking(options.getServerRsaPublicKeyPath());
91+
serverRsaPublicKey = context.owner().fileSystem().readFileBlocking(options.getServerRsaPublicKeyPath());
9292
}
9393
}
9494
this.serverRsaPublicKey = serverRsaPublicKey;
@@ -113,7 +113,7 @@ public MySQLConnectionFactory(Vertx vertx, ContextInternal context, MySQLConnect
113113
this.preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize();
114114
this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit();
115115

116-
this.netClient = vertx.createNetClient(netClientOptions);
116+
this.netClient = context.owner().createNetClient(netClientOptions);
117117
}
118118

119119
// Called by hook
@@ -122,8 +122,8 @@ private void close(Handler<AsyncResult<Void>> completionHandler) {
122122
completionHandler.handle(Future.succeededFuture());
123123
}
124124

125-
public void close() {
126-
netClient.close();
125+
public Future<Void> close() {
126+
return netClient.close();
127127
}
128128

129129
@Override

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLPoolImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import io.vertx.core.AsyncResult;
1515
import io.vertx.core.Handler;
16+
import io.vertx.core.impl.CloseFuture;
1617
import io.vertx.core.impl.ContextInternal;
1718
import io.vertx.mysqlclient.MySQLConnectOptions;
1819
import io.vertx.mysqlclient.MySQLPool;
@@ -24,13 +25,20 @@
2425
public class MySQLPoolImpl extends PoolBase<MySQLPoolImpl> implements MySQLPool {
2526

2627
public static MySQLPoolImpl create(ContextInternal context, boolean closeVertx, MySQLConnectOptions connectOptions, PoolOptions poolOptions) {
27-
return new MySQLPoolImpl(context, closeVertx, new MySQLConnectionFactory(context.owner(), context, connectOptions), poolOptions);
28+
MySQLPoolImpl pool = new MySQLPoolImpl(context, new MySQLConnectionFactory(context, connectOptions), poolOptions);
29+
CloseFuture closeFuture = pool.closeFuture();
30+
if (closeVertx) {
31+
closeFuture.onComplete(ar -> context.owner().close());
32+
} else {
33+
context.addCloseHook(closeFuture);
34+
}
35+
return pool;
2836
}
2937

3038
private final MySQLConnectionFactory factory;
3139

32-
private MySQLPoolImpl(ContextInternal context, boolean closeVertx, MySQLConnectionFactory factory, PoolOptions poolOptions) {
33-
super(context, factory, poolOptions, closeVertx);
40+
private MySQLPoolImpl(ContextInternal context, MySQLConnectionFactory factory, PoolOptions poolOptions) {
41+
super(context, factory, poolOptions);
3442
this.factory = factory;
3543
}
3644

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ class PgConnectionFactory implements ConnectionFactory {
7777
this.client = vertx.createNetClient(netClientOptions);
7878
}
7979

80-
public void close() {
81-
client.close();
80+
public Future<Void> close() {
81+
return client.close();
8282
}
8383

8484
public void cancelRequest(int processId, int secretKey, Handler<AsyncResult<Void>> handler) {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.pgclient.impl;
1919

20+
import io.vertx.core.impl.CloseFuture;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.pgclient.*;
2223
import io.vertx.sqlclient.PoolOptions;
@@ -37,13 +38,20 @@
3738
public class PgPoolImpl extends PoolBase<PgPoolImpl> implements PgPool {
3839

3940
public static PgPoolImpl create(ContextInternal context, boolean closeVertx, PgConnectOptions connectOptions, PoolOptions poolOptions) {
40-
return new PgPoolImpl(context, closeVertx, new PgConnectionFactory(context.owner(), context, connectOptions), poolOptions);
41+
PgPoolImpl pool = new PgPoolImpl(context, new PgConnectionFactory(context.owner(), context, connectOptions), poolOptions);
42+
CloseFuture closeFuture = pool.closeFuture();
43+
if (closeVertx) {
44+
closeFuture.onComplete(ar -> context.owner().close());
45+
} else {
46+
context.addCloseHook(closeFuture);
47+
}
48+
return pool;
4149
}
4250

4351
private final PgConnectionFactory factory;
4452

45-
private PgPoolImpl(ContextInternal context, boolean closeVertx, PgConnectionFactory factory, PoolOptions poolOptions) {
46-
super(context, factory, poolOptions, closeVertx);
53+
private PgPoolImpl(ContextInternal context, PgConnectionFactory factory, PoolOptions poolOptions) {
54+
super(context, factory, poolOptions);
4755
this.factory = factory;
4856
}
4957

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionCommandCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ private CloseConnectionCommandCodec() {
2929
@Override
3030
void encode(PgEncoder encoder) {
3131
encoder.writeTerminate();
32+
encoder.close();
3233
}
3334

3435
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package io.vertx.pgclient.impl.codec;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
2021
import io.netty.channel.ChannelHandlerContext;
2122
import io.netty.channel.ChannelOutboundHandlerAdapter;
2223
import io.netty.channel.ChannelPromise;
24+
import io.netty.channel.socket.SocketChannel;
2325
import io.vertx.sqlclient.Tuple;
2426
import io.vertx.pgclient.impl.util.Util;
2527
import io.vertx.sqlclient.impl.ParamDesc;
@@ -119,10 +121,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
119121
}
120122

121123
@Override
122-
public void flush(ChannelHandlerContext ctx) throws Exception {
124+
public void flush(ChannelHandlerContext ctx) {
123125
flush();
124126
}
125127

128+
void close() {
129+
ByteBuf buff;
130+
if (out != null) {
131+
buff = out;
132+
out = null;
133+
} else {
134+
buff = Unpooled.EMPTY_BUFFER;
135+
}
136+
SocketChannel channel = (SocketChannel) channelHandlerContext().channel();
137+
ctx.writeAndFlush(buff).addListener(v -> channel.shutdownOutput());
138+
}
139+
126140
void flush() {
127141
if (out != null) {
128142
ByteBuf buff = out;

0 commit comments

Comments
 (0)