Skip to content

Commit 21f1c0b

Browse files
authored
Fix memory leak with one-shot connection factories (#1311)
Fixes #1302 After creating an unpooled connection, add it as a close hook to the creating context. Also, in this case, make sure the connection factory is closed after usage. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 54b808c commit 21f1c0b

File tree

9 files changed

+119
-64
lines changed

9 files changed

+119
-64
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public static Future<DB2Connection> connect(Vertx vertx, DB2ConnectOptions optio
3939
} catch (Exception e) {
4040
return ctx.failedFuture(e);
4141
}
42-
ctx.addCloseHook(client);
43-
return (Future) client.connect(ctx);
42+
return prepareForClose(ctx, client.connect(ctx)).map(DB2Connection::cast);
4443
}
4544

4645
public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License 2.0 which is available at
@@ -36,8 +36,7 @@ public MSSQLConnectionImpl(ContextInternal context, ConnectionFactory factory, C
3636
public static Future<MSSQLConnection> connect(Vertx vertx, MSSQLConnectOptions options) {
3737
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
3838
MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx.owner(), SingletonSupplier.wrap(options));
39-
ctx.addCloseHook(client);
40-
return (Future)client.connect(ctx);
39+
return prepareForClose(ctx, client.connect(ctx)).map(MSSQLConnection::cast);
4140
}
4241

4342
@Override

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License 2.0 which is available at
@@ -39,8 +39,7 @@ public static Future<MySQLConnection> connect(ContextInternal ctx, MySQLConnectO
3939
} catch (Exception e) {
4040
return ctx.failedFuture(e);
4141
}
42-
ctx.addCloseHook(client);
43-
return (Future)client.connect(ctx);
42+
return prepareForClose(ctx, client.connect(ctx)).map(MySQLConnection::cast);
4443
}
4544

4645
public MySQLConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) {

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License 2.0 which is available at
@@ -31,7 +31,6 @@ public OracleConnectionImpl(ContextInternal context, ConnectionFactory factory,
3131
public static Future<OracleConnection> connect(Vertx vertx, OracleConnectOptions options) {
3232
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
3333
OracleConnectionFactory client = new OracleConnectionFactory(ctx.owner(), SingletonSupplier.wrap(options));
34-
ctx.addCloseHook(client);
35-
return (Future) client.connect(ctx);
34+
return prepareForClose(ctx, client.connect(ctx)).map(OracleConnection::cast);
3635
}
3736
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,19 @@
1616
*/
1717
package io.vertx.pgclient.impl;
1818

19+
import io.vertx.core.*;
1920
import io.vertx.core.impl.ContextInternal;
20-
import io.vertx.core.spi.metrics.ClientMetrics;
2121
import io.vertx.pgclient.PgConnectOptions;
2222
import io.vertx.pgclient.PgConnection;
2323
import io.vertx.pgclient.PgNotice;
2424
import io.vertx.pgclient.PgNotification;
2525
import io.vertx.pgclient.impl.codec.NoticeResponse;
26+
import io.vertx.pgclient.impl.codec.TxFailedEvent;
2627
import io.vertx.pgclient.spi.PgDriver;
2728
import io.vertx.sqlclient.impl.Connection;
2829
import io.vertx.sqlclient.impl.Notification;
2930
import io.vertx.sqlclient.impl.SocketConnectionBase;
3031
import io.vertx.sqlclient.impl.SqlConnectionBase;
31-
import io.vertx.core.AsyncResult;
32-
import io.vertx.core.Context;
33-
import io.vertx.core.Future;
34-
import io.vertx.core.Handler;
35-
import io.vertx.core.Vertx;
36-
import io.vertx.pgclient.impl.codec.TxFailedEvent;
3732

3833
import java.util.function.Supplier;
3934

@@ -46,8 +41,7 @@ public static Future<PgConnection> connect(ContextInternal context, Supplier<PgC
4641
} catch (Exception e) {
4742
return context.failedFuture(e);
4843
}
49-
context.addCloseHook(client);
50-
return (Future) client.connect(context);
44+
return prepareForClose(context, client.connect(context)).map(PgConnection::cast);
5145
}
5246

5347
private volatile Handler<PgNotification> notificationHandler;

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionTestBase.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,11 @@
1717

1818
package io.vertx.pgclient;
1919

20-
import io.vertx.core.AbstractVerticle;
2120
import io.vertx.core.AsyncResult;
22-
import io.vertx.core.Promise;
2321
import io.vertx.core.buffer.Buffer;
2422
import io.vertx.ext.unit.Async;
2523
import io.vertx.ext.unit.TestContext;
26-
import io.vertx.sqlclient.ProxyServer;
27-
import io.vertx.sqlclient.Row;
28-
import io.vertx.sqlclient.RowSet;
29-
import io.vertx.sqlclient.SqlConnection;
30-
import io.vertx.sqlclient.TransactionRollbackException;
31-
import io.vertx.sqlclient.Tuple;
24+
import io.vertx.sqlclient.*;
3225
import org.junit.Test;
3326

3427
import java.util.ArrayList;
@@ -313,24 +306,6 @@ public void testBatchInsertError(TestContext ctx) throws Exception {
313306
}));
314307
}
315308

316-
@Test
317-
public void testCloseOnUndeploy(TestContext ctx) {
318-
Async done = ctx.async();
319-
vertx.deployVerticle(new AbstractVerticle() {
320-
@Override
321-
public void start(Promise<Void> startPromise) throws Exception {
322-
connector.accept(ctx.asyncAssertSuccess(conn -> {
323-
conn.closeHandler(v -> {
324-
done.complete();
325-
});
326-
startPromise.complete();
327-
}));
328-
}
329-
}, ctx.asyncAssertSuccess(id -> {
330-
vertx.undeploy(id);
331-
}));
332-
}
333-
334309
@Test
335310
public void testTransactionCommit(TestContext ctx) {
336311
testTransactionCommit(ctx, Runnable::run);

vertx-pg-client/src/test/java/io/vertx/pgclient/PgPooledConnectionTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,7 @@ public void tearDown(TestContext ctx) {
4343
if (pool != null) {
4444
PgPool p = pool;
4545
pool = null;
46-
try {
47-
p.close();
48-
} catch (IllegalStateException e) {
49-
// Might be already closed because of testCloseOnUndeploy
50-
}
46+
p.close();
5147
}
5248
super.tearDown(ctx);
5349
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package io.vertx.sqlclient.impl;
1919

20+
import io.vertx.core.*;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.core.impl.future.PromiseInternal;
2223
import io.vertx.core.spi.metrics.ClientMetrics;
2324
import io.vertx.core.spi.tracing.VertxTracer;
2425
import io.vertx.sqlclient.PrepareOptions;
2526
import io.vertx.sqlclient.PreparedStatement;
27+
import io.vertx.sqlclient.SqlConnection;
2628
import io.vertx.sqlclient.Transaction;
27-
import io.vertx.sqlclient.impl.command.*;
28-
import io.vertx.core.*;
29+
import io.vertx.sqlclient.impl.command.CommandBase;
30+
import io.vertx.sqlclient.impl.command.PrepareStatementCommand;
31+
import io.vertx.sqlclient.impl.command.QueryCommandBase;
2932
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3033
import io.vertx.sqlclient.impl.tracing.QueryReporter;
3134
import io.vertx.sqlclient.spi.ConnectionFactory;
@@ -35,10 +38,11 @@
3538
/**
3639
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
3740
*/
38-
public class SqlConnectionBase<C extends SqlConnectionBase<C>> extends SqlClientBase implements SqlConnectionInternal {
41+
public class SqlConnectionBase<C extends SqlConnectionBase<C>> extends SqlClientBase implements SqlConnectionInternal, Closeable {
3942

4043
private volatile Handler<Throwable> exceptionHandler;
4144
private volatile Handler<Void> closeHandler;
45+
private volatile boolean closeFactoryAfterUsage;
4246
protected TransactionImpl tx;
4347
protected final ContextInternal context;
4448
protected final ConnectionFactory factory;
@@ -208,7 +212,16 @@ public void close(Handler<AsyncResult<Void>> handler) {
208212
close(promise(handler));
209213
}
210214

211-
private void close(Promise<Void> promise) {
215+
@Override
216+
public void close(Promise<Void> completion) {
217+
doClose(completion);
218+
if (closeFactoryAfterUsage) {
219+
completion.future().onComplete(v -> factory.close(Promise.promise()));
220+
context.removeCloseHook(this);
221+
}
222+
}
223+
224+
private void doClose(Promise<Void> promise) {
212225
context.execute(promise, p -> {
213226
if (tx != null) {
214227
tx.rollback(ar -> conn.close(this, p));
@@ -218,4 +231,14 @@ private void close(Promise<Void> promise) {
218231
}
219232
});
220233
}
234+
235+
protected static Future<SqlConnection> prepareForClose(ContextInternal ctx, Future<SqlConnection> future) {
236+
return future.andThen(ar -> {
237+
if (ar.succeeded()) {
238+
SqlConnectionBase<?> base = (SqlConnectionBase<?>) ar.result();
239+
base.closeFactoryAfterUsage = true;
240+
ctx.addCloseHook(base);
241+
}
242+
});
243+
}
221244
}

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/ConnectionTestBase.java

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License 2.0 which is available at
@@ -11,19 +11,24 @@
1111

1212
package io.vertx.sqlclient.tck;
1313

14-
import io.vertx.core.AsyncResult;
15-
import io.vertx.core.Handler;
16-
import io.vertx.core.Vertx;
14+
import io.vertx.core.*;
1715
import io.vertx.ext.unit.Async;
1816
import io.vertx.ext.unit.TestContext;
1917
import io.vertx.sqlclient.SqlConnectOptions;
2018
import io.vertx.sqlclient.SqlConnection;
19+
import io.vertx.sqlclient.impl.SqlConnectionBase;
20+
import io.vertx.sqlclient.spi.ConnectionFactory;
2121
import io.vertx.sqlclient.spi.DatabaseMetadata;
22-
2322
import org.junit.After;
2423
import org.junit.Before;
2524
import org.junit.Test;
2625

26+
import java.util.Collections;
27+
import java.util.Set;
28+
import java.util.WeakHashMap;
29+
30+
import static java.util.concurrent.TimeUnit.SECONDS;
31+
2732
public abstract class ConnectionTestBase {
2833
protected Vertx vertx;
2934
protected Connector<SqlConnection> connector;
@@ -46,7 +51,73 @@ public void tearDown(TestContext ctx) {
4651

4752
@Test
4853
public void testConnect(TestContext ctx) {
49-
connect(ctx.asyncAssertSuccess(conn -> {
54+
connect(ctx.asyncAssertSuccess());
55+
}
56+
57+
@Test
58+
public void testConnectNoLeak(TestContext ctx) throws Exception {
59+
Set<SqlConnectionBase<?>> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
60+
Set<ConnectionFactory> factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
61+
Async async = ctx.async(100);
62+
for (int i = 0; i < 100; i++) {
63+
connect(ctx.asyncAssertSuccess(conn -> {
64+
SqlConnectionBase<?> base = (SqlConnectionBase<?>) conn;
65+
connections.add(base);
66+
factories.add(base.factory());
67+
conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown()));
68+
}));
69+
}
70+
async.awaitSuccess();
71+
for (int c = 0; c < 5; c++) {
72+
System.gc();
73+
SECONDS.sleep(1);
74+
}
75+
ctx.assertEquals(0, connections.size());
76+
ctx.assertEquals(0, factories.size());
77+
}
78+
79+
@Test
80+
public void testConnectNoLeakInVerticle(TestContext ctx) throws Exception {
81+
Set<SqlConnectionBase<?>> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
82+
Set<ConnectionFactory> factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
83+
Async async = ctx.async(100);
84+
vertx.deployVerticle(new AbstractVerticle() {
85+
@Override
86+
public void start() throws Exception {
87+
for (int i = 0; i < 100; i++) {
88+
connect(ctx.asyncAssertSuccess(conn -> {
89+
SqlConnectionBase<?> base = (SqlConnectionBase<?>) conn;
90+
connections.add(base);
91+
factories.add(base.factory());
92+
conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown()));
93+
}));
94+
}
95+
}
96+
});
97+
async.awaitSuccess();
98+
for (int c = 0; c < 5; c++) {
99+
System.gc();
100+
SECONDS.sleep(1);
101+
}
102+
ctx.assertEquals(0, connections.size());
103+
ctx.assertEquals(0, factories.size());
104+
}
105+
106+
@Test
107+
public void testCloseOnUndeploy(TestContext ctx) {
108+
Async done = ctx.async();
109+
vertx.deployVerticle(new AbstractVerticle() {
110+
@Override
111+
public void start(Promise<Void> startPromise) throws Exception {
112+
connect(ctx.asyncAssertSuccess(conn -> {
113+
conn.closeHandler(v -> {
114+
done.complete();
115+
});
116+
startPromise.complete();
117+
}));
118+
}
119+
}).onComplete(ctx.asyncAssertSuccess(id -> {
120+
vertx.undeploy(id);
50121
}));
51122
}
52123

@@ -119,7 +190,7 @@ public void testCloseWithQueryInProgress(TestContext ctx) {
119190
}));
120191
async.await();
121192
}
122-
193+
123194
@Test
124195
public void testDatabaseMetaData(TestContext ctx) {
125196
connect(ctx.asyncAssertSuccess(conn -> {
@@ -132,7 +203,7 @@ public void testDatabaseMetaData(TestContext ctx) {
132203
validateDatabaseMetaData(ctx, md);
133204
}));
134205
}
135-
206+
136207
protected abstract void validateDatabaseMetaData(TestContext ctx, DatabaseMetadata md);
137-
208+
138209
}

0 commit comments

Comments
 (0)