Skip to content

Commit e2c4587

Browse files
authored
Merge pull request #1318 from kdubb/max-lifetime-4.x
Max lifetime 4.x
2 parents 21f1c0b + 50e36a1 commit e2c4587

File tree

5 files changed

+154
-20
lines changed

5 files changed

+154
-20
lines changed

vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ public void testPipeliningDistribution(TestContext ctx) {
338338
public void testPoolIdleTimeout(TestContext ctx) {
339339
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
340340
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
341-
int pooleCleanerPeriod = 100;
341+
int poolCleanerPeriod = 100;
342342
int idleTimeout = 3000;
343343
Async latch = ctx.async();
344344
proxy.proxyHandler(conn -> {
@@ -347,8 +347,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
347347
conn.clientCloseHandler(v -> {
348348
long lifetime = System.currentTimeMillis() - now;
349349
int delta = 500;
350-
int lowerBound = idleTimeout - pooleCleanerPeriod - delta;
351-
int upperBound = idleTimeout + pooleCleanerPeriod + delta;
350+
int lowerBound = idleTimeout - poolCleanerPeriod - delta;
351+
int upperBound = idleTimeout + poolCleanerPeriod + delta;
352352
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
353353
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
354354
latch.complete();
@@ -362,7 +362,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
362362
listenLatch.awaitSuccess(20_000);
363363

364364
poolOptions
365-
.setPoolCleanerPeriod(pooleCleanerPeriod)
365+
.setPoolCleanerPeriod(poolCleanerPeriod)
366+
.setMaxLifetime(0)
366367
.setIdleTimeout(idleTimeout)
367368
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
368369
options.setPort(8080);
@@ -376,6 +377,49 @@ public void testPoolIdleTimeout(TestContext ctx) {
376377
.onComplete(ctx.asyncAssertSuccess());
377378
}
378379

380+
@Test
381+
public void testPoolMaxLifetime(TestContext ctx) {
382+
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
383+
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
384+
int poolCleanerPeriod = 100;
385+
int maxLifetime = 3000;
386+
Async latch = ctx.async();
387+
proxy.proxyHandler(conn -> {
388+
proxyConn.set(conn);
389+
long now = System.currentTimeMillis();
390+
conn.clientCloseHandler(v -> {
391+
long lifetime = System.currentTimeMillis() - now;
392+
int delta = 500;
393+
int lowerBound = maxLifetime - poolCleanerPeriod - delta;
394+
int upperBound = maxLifetime + poolCleanerPeriod + delta;
395+
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
396+
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
397+
latch.complete();
398+
});
399+
conn.connect();
400+
});
401+
402+
// Start proxy
403+
Async listenLatch = ctx.async();
404+
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
405+
listenLatch.awaitSuccess(20_000);
406+
407+
poolOptions
408+
.setPoolCleanerPeriod(poolCleanerPeriod)
409+
.setIdleTimeout(0)
410+
.setMaxLifetime(maxLifetime)
411+
.setMaxLifetimeUnit(TimeUnit.MILLISECONDS);
412+
options.setPort(8080);
413+
options.setHost("localhost");
414+
PgPool pool = createPool(options, poolOptions);
415+
416+
// Create a connection that remains in the pool
417+
pool
418+
.getConnection()
419+
.flatMap(SqlClient::close)
420+
.onComplete(ctx.asyncAssertSuccess());
421+
}
422+
379423
@Test
380424
public void testPoolConnectTimeout(TestContext ctx) {
381425
Async async = ctx.async(2);
@@ -416,9 +460,9 @@ public void testPoolConnectTimeout(TestContext ctx) {
416460
public void testNoConnectionLeaks(TestContext ctx) {
417461
Async killConnections = ctx.async();
418462
PgConnection.connect(vertx, options, ctx.asyncAssertSuccess(conn -> {
419-
Collector<Row, ?, List<Integer>> collector = mapping(row -> row.getInteger(0), toList());
463+
Collector<Row, ?, List<Boolean>> collector = mapping(row -> row.getBoolean(0), toList());
420464
String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1";
421-
PreparedQuery<SqlResult<List<Integer>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
465+
PreparedQuery<SqlResult<List<Boolean>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
422466
Tuple params = Tuple.of(options.getDatabase());
423467
preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete()));
424468
}));

vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
4545
obj.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
4646
}
4747
break;
48+
case "maxLifetime":
49+
if (member.getValue() instanceof Number) {
50+
obj.setMaxLifetime(((Number)member.getValue()).intValue());
51+
}
52+
break;
53+
case "maxLifetimeUnit":
54+
if (member.getValue() instanceof String) {
55+
obj.setMaxLifetimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
56+
}
57+
break;
4858
case "maxSize":
4959
if (member.getValue() instanceof Number) {
5060
obj.setMaxSize(((Number)member.getValue()).intValue());
@@ -88,6 +98,10 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
8898
if (obj.getIdleTimeoutUnit() != null) {
8999
json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name());
90100
}
101+
json.put("maxLifetime", obj.getMaxLifetime());
102+
if (obj.getMaxLifetimeUnit() != null) {
103+
json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name());
104+
}
91105
json.put("maxSize", obj.getMaxSize());
92106
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
93107
if (obj.getName() != null) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package io.vertx.sqlclient;
1919

2020
import io.vertx.codegen.annotations.DataObject;
21-
import io.vertx.core.http.HttpClientOptions;
2221
import io.vertx.core.impl.Arguments;
2322
import io.vertx.core.json.JsonObject;
2423

@@ -48,11 +47,21 @@ public class PoolOptions {
4847
*/
4948
public static final int DEFAULT_IDLE_TIMEOUT = 0;
5049

50+
/**
51+
* Default maximum pooled connection lifetime = 0 (no maximum)
52+
*/
53+
public static final int DEFAULT_MAXIMUM_LIFETIME = 0;
54+
5155
/**
5256
* Default connection idle time unit in the pool = seconds
5357
*/
5458
public static final TimeUnit DEFAULT_IDLE_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;
5559

60+
/**
61+
* Default maximum pooled connection lifetime unit = seconds
62+
*/
63+
public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS;
64+
5665
/**
5766
* Default pool cleaner period = 1000 ms (1 second)
5867
*/
@@ -87,6 +96,8 @@ public class PoolOptions {
8796
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
8897
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
8998
private TimeUnit idleTimeoutUnit = DEFAULT_IDLE_TIMEOUT_TIME_UNIT;
99+
private int maxLifetime = DEFAULT_MAXIMUM_LIFETIME;
100+
private TimeUnit maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT;
90101
private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD;
91102
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
92103
private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT;
@@ -177,16 +188,54 @@ public int getIdleTimeout() {
177188
}
178189

179190
/**
180-
* Establish an idle timeout for pooled connections.
191+
* Establish an idle timeout for pooled connections, a value of zero disables the idle timeout.
181192
*
182-
* @param idleTimeout the pool connection idle time unitq
193+
* @param idleTimeout the pool connection idle timeout
183194
* @return a reference to this, so the API can be used fluently
184195
*/
185196
public PoolOptions setIdleTimeout(int idleTimeout) {
197+
Arguments.require(idleTimeout >= 0, "idleTimeout must be >= 0");
186198
this.idleTimeout = idleTimeout;
187199
return this;
188200
}
189201

202+
/**
203+
* @return the pooled connection max lifetime unit
204+
*/
205+
public TimeUnit getMaxLifetimeUnit() {
206+
return maxLifetimeUnit;
207+
}
208+
209+
/**
210+
* Establish a max lifetime unit for pooled connections.
211+
*
212+
* @param maxLifetimeUnit pooled connection max lifetime unit
213+
* @return a reference to this, so the API can be used fluently
214+
*/
215+
public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) {
216+
this.maxLifetimeUnit = maxLifetimeUnit;
217+
return this;
218+
}
219+
220+
/**
221+
* @return pooled connection max lifetime
222+
*/
223+
public int getMaxLifetime() {
224+
return maxLifetime;
225+
}
226+
227+
/**
228+
* Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime.
229+
*
230+
* @param maxLifetime the pool connection max lifetime
231+
* @return a reference to this, so the API can be used fluently
232+
*/
233+
public PoolOptions setMaxLifetime(int maxLifetime) {
234+
Arguments.require(maxLifetime >= 0, "maxLifetime must be >= 0");
235+
this.maxLifetime = maxLifetime;
236+
return this;
237+
}
238+
190239
/**
191240
* @return the connection pool cleaner period in ms.
192241
*/

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
4343
private final CloseFuture closeFuture;
4444
private final long idleTimeout;
4545
private final long connectionTimeout;
46+
private final long maxLifetime;
4647
private final long cleanerPeriod;
4748
private final boolean pipelined;
4849
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
@@ -62,20 +63,23 @@ public PoolImpl(VertxInternal vertx,
6263

6364
this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit());
6465
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
66+
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
6567
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
6668
this.timerID = -1L;
6769
this.pipelined = pipelined;
6870
this.vertx = vertx;
69-
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
71+
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer,
72+
afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined,
73+
poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
7074
this.closeFuture = closeFuture;
7175
}
7276

7377
public Pool init() {
7478
closeFuture.add(this);
75-
if (idleTimeout > 0 && cleanerPeriod > 0) {
79+
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
7680
synchronized (this) {
7781
timerID = vertx.setTimer(cleanerPeriod, id -> {
78-
checkExpired();
82+
runEviction();
7983
});
8084
}
8185
}
@@ -90,17 +94,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
9094
return this;
9195
}
9296

93-
private void checkExpired() {
97+
private void runEviction() {
9498
synchronized (this) {
9599
if (timerID == -1) {
96100
// Cancelled
97101
return;
98102
}
99103
timerID = vertx.setTimer(cleanerPeriod, id -> {
100-
checkExpired();
104+
runEviction();
101105
});
102106
}
103-
pool.checkExpired();
107+
pool.evict();
104108
}
105109

106110
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class SqlConnectionPool {
5353
private final Function<Connection, Future<Void>> beforeRecycle;
5454
private final boolean pipelined;
5555
private final long idleTimeout;
56+
private final long maxLifetime;
5657
private final int maxSize;
5758

5859
public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
@@ -61,6 +62,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
6162
Function<Connection, Future<Void>> beforeRecycle,
6263
VertxInternal vertx,
6364
long idleTimeout,
65+
long maxLifetime,
6466
int maxSize,
6567
boolean pipelined,
6668
int maxWaitQueueSize,
@@ -75,6 +77,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
7577
this.vertx = vertx;
7678
this.pipelined = pipelined;
7779
this.idleTimeout = idleTimeout;
80+
this.maxLifetime = maxLifetime;
7881
this.maxSize = maxSize;
7982
this.hook = hook;
8083
this.connectionProvider = connectionProvider;
@@ -144,9 +147,9 @@ public int size() {
144147
return pool.size();
145148
}
146149

147-
public void checkExpired() {
150+
public void evict() {
148151
long now = System.currentTimeMillis();
149-
pool.evict(conn -> conn.expirationTimestamp < now, ar -> {
152+
pool.evict(conn -> conn.shouldEvict(now), ar -> {
150153
if (ar.succeeded()) {
151154
List<PooledConnection> res = ar.result();
152155
for (PooledConnection conn : res) {
@@ -171,7 +174,7 @@ public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
171174
future = pooled.schedule(context, cmd);
172175
}
173176
return future.andThen(ar -> {
174-
pooled.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
177+
pooled.refresh();
175178
lease.recycle();
176179
});
177180
});
@@ -266,12 +269,15 @@ public class PooledConnection implements Connection, Connection.Holder {
266269
private Holder holder;
267270
private Handler<AsyncResult<ConnectResult<PooledConnection>>> poolResultHandler;
268271
private Lease<PooledConnection> lease;
269-
public long expirationTimestamp;
272+
public long idleEvictionTimestamp;
273+
public long lifetimeEvictionTimestamp;
270274

271275
PooledConnection(ConnectionFactory factory, Connection conn, PoolConnector.Listener listener) {
272276
this.factory = factory;
273277
this.conn = conn;
274278
this.listener = listener;
279+
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
280+
refresh();
275281
}
276282

277283
@Override
@@ -348,6 +354,10 @@ private void close(Promise<Void> promise) {
348354
conn.close(this, promise);
349355
}
350356

357+
private void refresh() {
358+
this.idleEvictionTimestamp = idleTimeout > 0 ? System.currentTimeMillis() + idleTimeout : Long.MAX_VALUE;
359+
}
360+
351361
@Override
352362
public void init(Holder holder) {
353363
if (this.holder != null) {
@@ -391,7 +401,7 @@ private void doClose(Holder holder, Promise<Void> promise) {
391401
private void cleanup(Promise<Void> promise) {
392402
Lease<PooledConnection> l = this.lease;
393403
this.lease = null;
394-
this.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
404+
refresh();
395405
l.recycle();
396406
promise.complete();
397407
}
@@ -437,5 +447,18 @@ public int getSecretKey() {
437447
public Connection unwrap() {
438448
return conn;
439449
}
450+
451+
private boolean hasIdleExpired(long now) {
452+
return idleEvictionTimestamp < now;
453+
}
454+
455+
private boolean hasLifetimeExpired(long now) {
456+
return lifetimeEvictionTimestamp < now;
457+
}
458+
459+
private boolean shouldEvict(long now) {
460+
return hasIdleExpired(now) || hasLifetimeExpired(now);
461+
}
462+
440463
}
441464
}

0 commit comments

Comments
 (0)