From 61d0455c308badabfb9bc8ecb69b7786de934f89 Mon Sep 17 00:00:00 2001 From: priyanshu-d11 Date: Tue, 11 Mar 2025 15:38:33 +0530 Subject: [PATCH 1/4] feat: add jitter to max lifetime in connection pool Signed-off-by: priyanshu-d11 --- .../vertx/sqlclient/PoolOptionsConverter.java | 6 ++++ .../java/io/vertx/sqlclient/PoolOptions.java | 29 +++++++++++++++++++ .../impl/pool/SqlConnectionPool.java | 7 ++++- .../sqlclient/internal/pool/PoolImpl.java | 4 ++- 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index ee783f734..d03727559 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -44,6 +44,11 @@ static void fromJson(Iterable> json, PoolOpt obj.setMaxLifetime(((Number)member.getValue()).intValue()); } break; + case "jitter": + if (member.getValue() instanceof Number) { + obj.setJitter(((Number)member.getValue()).intValue()); + } + break; case "poolCleanerPeriod": if (member.getValue() instanceof Number) { obj.setPoolCleanerPeriod(((Number)member.getValue()).intValue()); @@ -93,6 +98,7 @@ static void toJson(PoolOptions obj, java.util.Map json) { json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name()); } json.put("maxLifetime", obj.getMaxLifetime()); + json.put("jitter", obj.getJitter()); json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod()); if (obj.getConnectionTimeoutUnit() != null) { json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name()); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index dff64bb35..1e3ddbe73 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -93,6 +93,11 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + /** + * Default jitter value = 0 + */ + public static final int DEFAULT_JITTER = 0; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -105,6 +110,7 @@ public class PoolOptions { private boolean shared = DEFAULT_SHARED_POOL; private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; + private int jitter = DEFAULT_JITTER; public PoolOptions() { } @@ -241,6 +247,29 @@ public PoolOptions setMaxLifetime(int maxLifetime) { return this; } + /** + * Get the jitter value that will be applied to maxLifetime. + * + * @return the jitter value in milliseconds + */ + public int getJitter() { + return this.jitter; + } + + /** + * Set the jitter value, to be applied to maxLifetime. + * + * @param jitter the jitter value + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setJitter(int jitter) { + if (jitter < 0) { + throw new IllegalArgumentException("jitter must be >= 0"); + } + this.jitter = jitter; + return this; + } + /** * @return the connection pool cleaner period in ms. */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index cc66f6daf..266a120a3 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -32,6 +32,7 @@ import io.vertx.sqlclient.spi.DatabaseMetadata; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,6 +57,7 @@ public class SqlConnectionPool { private final boolean pipelined; private final long idleTimeout; private final long maxLifetime; + private final long jitter; private final int maxSize; public SqlConnectionPool(Function> connectionProvider, @@ -66,6 +68,7 @@ public SqlConnectionPool(Function> connectionProv VertxInternal vertx, long idleTimeout, long maxLifetime, + long jitter, int maxSize, boolean pipelined, int maxWaitQueueSize, @@ -82,6 +85,7 @@ public SqlConnectionPool(Function> connectionProv this.pipelined = pipelined; this.idleTimeout = idleTimeout; this.maxLifetime = maxLifetime; + this.jitter = jitter; this.maxSize = maxSize; this.hook = hook; this.connectionProvider = connectionProvider; @@ -309,7 +313,8 @@ public class PooledConnection implements Connection, Connection.Holder { this.factory = factory; this.conn = conn; this.listener = listener; - this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE; + long randomizedJitter = jitter > 0 ? ThreadLocalRandom.current().nextLong(-jitter, -jitter + 1) : 0; + this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + randomizedJitter : Long.MAX_VALUE; refresh(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index bf269a080..e21b15590 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private final long idleTimeout; private final long connectionTimeout; private final long maxLifetime; + private final long jitter; private final long cleanerPeriod; private final boolean pipelined; private final Handler connectionInitializer; @@ -80,11 +81,12 @@ public PoolImpl(VertxInternal vertx, this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit()); this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); + this.jitter = MILLISECONDS.convert(poolOptions.getJitter(), poolOptions.getMaxLifetimeUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; - this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, jitter, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; this.connectionInitializer = connectionInitializer; } From 204faa95ab935a954e52e872b5033853795e47e1 Mon Sep 17 00:00:00 2001 From: priyanshu-d11 Date: Wed, 19 Mar 2025 18:15:51 +0530 Subject: [PATCH 2/4] test: improve connection jitter test with timing verification and assertions Signed-off-by: priyanshu-d11 --- .../io/vertx/tests/pgclient/PgPoolTest.java | 78 +++++++++++++++++-- .../impl/pool/SqlConnectionPool.java | 2 +- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java index 3cb1c56f0..46b5727f2 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java @@ -34,17 +34,17 @@ import io.vertx.tests.sqlclient.ProxyServer; import org.junit.Rule; import org.junit.Test; +import org.testcontainers.shaded.com.trilead.ssh2.ConnectionInfo; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collector; +import java.util.function.Consumer; +import java.util.stream.Collectors; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -59,6 +59,17 @@ public class PgPoolTest extends PgPoolTestBase { private Set pools = new HashSet<>(); + // Static inner class to store connection info with timestamp + private static class ConnectionRecord { + final int pid; + final long timestamp; + + ConnectionRecord(int pid, long timestamp) { + this.pid = pid; + this.timestamp = timestamp; + } + } + @Override public void tearDown(TestContext ctx) { int size = pools.size(); @@ -619,4 +630,61 @@ public void testConnectionClosedInHook(TestContext ctx) { })); })); } + + @Test + @Repeat(2) + public void testConnectionJitter(TestContext ctx) { + poolOptions + .setMaxSize(1) + .setMaxLifetime(2000) + .setMaxLifetimeUnit(TimeUnit.MILLISECONDS) + .setJitter(400) + .setPoolCleanerPeriod(50); + + Pool pool = createPool(options, poolOptions); + Async latch = ctx.async(); + + List pids = Collections.synchronizedList(new ArrayList<>()); + List times = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger lastPid = new AtomicInteger(-1); + AtomicLong timerId = new AtomicLong(); + + Consumer checkPid = testCtx -> { + pool.query("SELECT pg_backend_pid() as pid") + .execute() + .onComplete(testCtx.asyncAssertSuccess(rs -> { + int currentPid = rs.iterator().next().getInteger("pid"); + if (lastPid.get() != currentPid) { + pids.add(currentPid); + times.add(System.currentTimeMillis()); + lastPid.set(currentPid); + + if (pids.size() == 3) { + vertx.cancelTimer(timerId.get()); + long diff1to2 = times.get(1) - times.get(0); + long diff2to3 = times.get(2) - times.get(1); + + // Verify time ranges + int maxLifetime = 2000; + int jitter = 400; + int lowerBound = maxLifetime - jitter; + int upperBound = maxLifetime + jitter; + + ctx.assertTrue(diff1to2 >= lowerBound && diff1to2 <= upperBound, + String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms", + pids.get(0), pids.get(1), diff1to2, lowerBound, upperBound)); + + ctx.assertTrue(diff2to3 >= lowerBound && diff2to3 <= upperBound, + String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms", + pids.get(1), pids.get(2), diff2to3, lowerBound, upperBound)); + + pool.close().onComplete(ctx.asyncAssertSuccess(v -> latch.complete())); + } + } + })); + }; + + timerId.set(vertx.setPeriodic(30, id -> checkPid.accept(ctx))); + latch.awaitSuccess(20000); + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 266a120a3..265624173 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -313,7 +313,7 @@ public class PooledConnection implements Connection, Connection.Holder { this.factory = factory; this.conn = conn; this.listener = listener; - long randomizedJitter = jitter > 0 ? ThreadLocalRandom.current().nextLong(-jitter, -jitter + 1) : 0; + long randomizedJitter = jitter > 0 ? ThreadLocalRandom.current().nextLong(-jitter, jitter + 1) : 0; this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + randomizedJitter : Long.MAX_VALUE; refresh(); } From cbb4fad53569284fa5e3a08424f740ba53949f1c Mon Sep 17 00:00:00 2001 From: priyanshu-d11 Date: Thu, 20 Mar 2025 15:26:04 +0530 Subject: [PATCH 3/4] Added timeunit for jitter --- .../io/vertx/tests/pgclient/PgPoolTest.java | 33 ++++++++----------- .../vertx/sqlclient/PoolOptionsConverter.java | 8 +++++ .../java/io/vertx/sqlclient/PoolOptions.java | 28 +++++++++++++++- .../impl/pool/SqlConnectionPool.java | 2 +- .../sqlclient/internal/pool/PoolImpl.java | 2 +- 5 files changed, 51 insertions(+), 22 deletions(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java index 46b5727f2..4fd798c5b 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java @@ -36,7 +36,11 @@ import org.junit.Test; import org.testcontainers.shaded.com.trilead.ssh2.ConnectionInfo; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -59,17 +63,6 @@ public class PgPoolTest extends PgPoolTestBase { private Set pools = new HashSet<>(); - // Static inner class to store connection info with timestamp - private static class ConnectionRecord { - final int pid; - final long timestamp; - - ConnectionRecord(int pid, long timestamp) { - this.pid = pid; - this.timestamp = timestamp; - } - } - @Override public void tearDown(TestContext ctx) { int size = pools.size(); @@ -634,11 +627,12 @@ public void testConnectionClosedInHook(TestContext ctx) { @Test @Repeat(2) public void testConnectionJitter(TestContext ctx) { - poolOptions + PoolOptions poolOptions = new PoolOptions() .setMaxSize(1) - .setMaxLifetime(2000) + .setMaxLifetime(3000) .setMaxLifetimeUnit(TimeUnit.MILLISECONDS) - .setJitter(400) + .setJitter(1) + .setJitterUnit(TimeUnit.SECONDS) .setPoolCleanerPeriod(50); Pool pool = createPool(options, poolOptions); @@ -665,10 +659,11 @@ public void testConnectionJitter(TestContext ctx) { long diff2to3 = times.get(2) - times.get(1); // Verify time ranges - int maxLifetime = 2000; - int jitter = 400; - int lowerBound = maxLifetime - jitter; - int upperBound = maxLifetime + jitter; + int maxLifetime = 3000; + int jitter = 1000; + int buffer = 100; + int lowerBound = maxLifetime - jitter + buffer; + int upperBound = maxLifetime + jitter + buffer; ctx.assertTrue(diff1to2 >= lowerBound && diff1to2 <= upperBound, String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms", diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index d03727559..7ee68ecec 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -49,6 +49,11 @@ static void fromJson(Iterable> json, PoolOpt obj.setJitter(((Number)member.getValue()).intValue()); } break; + case "jitterUnit": + if (member.getValue() instanceof String) { + obj.setJitterUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; case "poolCleanerPeriod": if (member.getValue() instanceof Number) { obj.setPoolCleanerPeriod(((Number)member.getValue()).intValue()); @@ -99,6 +104,9 @@ static void toJson(PoolOptions obj, java.util.Map json) { } json.put("maxLifetime", obj.getMaxLifetime()); json.put("jitter", obj.getJitter()); + if (obj.getJitterUnit() != null) { + json.put("jitterUnit", obj.getJitterUnit().name()); + } json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod()); if (obj.getConnectionTimeoutUnit() != null) { json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name()); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index 1e3ddbe73..d5531889a 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -98,6 +98,11 @@ public class PoolOptions { */ public static final int DEFAULT_JITTER = 0; + /** + * Default jitter unit = milliseconds + */ + public static final TimeUnit DEFAULT_JITTER_UNIT = TimeUnit.SECONDS; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -111,7 +116,8 @@ public class PoolOptions { private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; private int jitter = DEFAULT_JITTER; - + private TimeUnit jitterUnit = DEFAULT_JITTER_UNIT; + public PoolOptions() { } @@ -270,6 +276,26 @@ public PoolOptions setJitter(int jitter) { return this; } + /** + * Get the time unit for the jitter value. + * + * @return the jitter time unit + */ + public TimeUnit getJitterUnit() { + return this.jitterUnit; + } + + /** + * Set the time unit for the jitter value. + * + * @param jitterUnit the jitter time unit + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setJitterUnit(TimeUnit jitterUnit) { + this.jitterUnit = jitterUnit; + return this; + } + /** * @return the connection pool cleaner period in ms. */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 265624173..610604677 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -313,7 +313,7 @@ public class PooledConnection implements Connection, Connection.Holder { this.factory = factory; this.conn = conn; this.listener = listener; - long randomizedJitter = jitter > 0 ? ThreadLocalRandom.current().nextLong(-jitter, jitter + 1) : 0; + long randomizedJitter = (maxLifetime > 0 && jitter > 0 && maxLifetime > jitter) ? ThreadLocalRandom.current().nextLong(-jitter, jitter + 1) : 0; this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + randomizedJitter : Long.MAX_VALUE; refresh(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index e21b15590..604fe8380 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -81,7 +81,7 @@ public PoolImpl(VertxInternal vertx, this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit()); this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); - this.jitter = MILLISECONDS.convert(poolOptions.getJitter(), poolOptions.getMaxLifetimeUnit()); + this.jitter = MILLISECONDS.convert(poolOptions.getJitter(), poolOptions.getJitterUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); this.timerID = -1L; this.pipelined = pipelined; From f9dc3c846c1ac48b5a495d34c031cf02e5e869dc Mon Sep 17 00:00:00 2001 From: priyanshu-d11 Date: Wed, 26 Mar 2025 14:47:24 +0530 Subject: [PATCH 4/4] Addded test case for jitter --- .../io/vertx/tests/pgclient/PgPoolTest.java | 132 +++++++++++++++++- .../impl/pool/SqlConnectionPool.java | 3 +- 2 files changed, 132 insertions(+), 3 deletions(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java index 4fd798c5b..9f83a075f 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java @@ -18,8 +18,11 @@ package io.vertx.tests.pgclient; import io.netty.channel.EventLoop; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.Repeat; @@ -38,9 +41,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +60,8 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import java.time.OffsetDateTime; + /** * @author Julien Viet */ @@ -625,7 +634,6 @@ public void testConnectionClosedInHook(TestContext ctx) { } @Test - @Repeat(2) public void testConnectionJitter(TestContext ctx) { PoolOptions poolOptions = new PoolOptions() .setMaxSize(1) @@ -682,4 +690,126 @@ public void testConnectionJitter(TestContext ctx) { timerId.set(vertx.setPeriodic(30, id -> checkPid.accept(ctx))); latch.awaitSuccess(20000); } + + @Test + public void testConnectionCloseTimingParallel(TestContext ctx) { + // Configure pool options. + PoolOptions poolOptions = new PoolOptions() + .setMaxSize(20) // Allow parallel tasks. + .setMaxLifetime(5000) // Maximum lifetime = 5000 ms. + .setMaxLifetimeUnit(TimeUnit.MILLISECONDS) + .setJitter(1) // Jitter = 1 second. + .setJitterUnit(TimeUnit.SECONDS) + .setPoolCleanerPeriod(50); + + Pool pool = createPool(options, poolOptions); + Async latch = ctx.async(); + int totalConnections = 50; + List> futures = new ArrayList<>(); + List pids = Collections.synchronizedList(new ArrayList<>()); + ConcurrentMap startTimes = new ConcurrentHashMap<>(); + ConcurrentMap endTimes = new ConcurrentHashMap<>(); + + // Launch connection tasks in parallel. + for (int i = 0; i < totalConnections; i++) { + futures.add(processSingleConnection(pool, i, ctx, pids, startTimes, endTimes)); + } + + Future.all(futures).onComplete(ctx.asyncAssertSuccess(ar -> { + pool.close().onComplete(ctx.asyncAssertSuccess(v -> { + // Wait 3 seconds after pool closure. + vertx.setTimer(3000, timerId -> { + PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + // (Optional) Query to verify that none of our recorded PIDs are still active. + String pidList = pids.stream().map(String::valueOf).collect(Collectors.joining(",")); + String sql = "SELECT pid FROM pg_stat_activity WHERE pid IN (" + pidList + ")"; + conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> { + // Compute durations for each PID. + List durations = pids.stream() + .map(pid -> endTimes.get(pid) - startTimes.get(pid)) + .collect(Collectors.toList()); + long maxLifetime = 5000; + long jitterMs = 1000; + long bucketWidth = jitterMs / 5; + // Bucket the durations based on an offset of maxLifetime. + Map> bucketMap = new HashMap<>(); + for (Integer pid : pids) { + long duration = endTimes.get(pid) - startTimes.get(pid); + int bucket = (int) ((duration - maxLifetime) / bucketWidth); + bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()).add(pid); + } + + ctx.assertTrue(bucketMap.size() >= 5, "Bucket Size should be 5"); + bucketMap.forEach((bucket, bucketPids) -> { + ctx.assertTrue(!bucketPids.isEmpty(), "Bucket " + bucket + " should not be empty"); + }); + // Print one line per PID with its duration. + for (Integer pid : pids) { + long duration = endTimes.get(pid) - startTimes.get(pid); + } + conn.close(); + latch.complete(); + })); + })); + }); + })); + })); + latch.awaitSuccess(60000); + } + + /** + * Acquires a connection, retrieves its PID and backend_start time, + * closes the connection, and polls for its closure. + * Returns a Future with a JsonObject containing the PID, start time, and end time. + */ + private Future processSingleConnection(Pool pool, int index, TestContext ctx, + List pids, + ConcurrentMap startTimes, + ConcurrentMap endTimes) { + Promise promise = Promise.promise(); + pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> { + conn.query("SELECT pg_backend_pid() AS pid") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + int pid = rs.iterator().next().getInteger("pid"); + pids.add(pid); + String sql = "SELECT backend_start FROM pg_stat_activity WHERE pid = " + pid; + conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> { + Row row = rs2.iterator().next(); + OffsetDateTime backendStart = row.getOffsetDateTime("backend_start"); + long startMillis = backendStart.toInstant().toEpochMilli(); + startTimes.put(pid, startMillis); + pollForClose(pool, pid, closeTime -> { + endTimes.put(pid, closeTime); + JsonObject res = new JsonObject() + .put("pid", pid) + .put("start", startMillis) + .put("end", closeTime); + promise.complete(res); + }); + conn.close().onComplete(x -> {}); + })); + })); + })); + return promise.future(); + } + + /** + * Polls pg_stat_activity periodically for the given PID. + * Once the PID is no longer found (i.e. the connection is closed), + * returns the current system time via the resultHandler. + */ + private void pollForClose(Pool pool, int pid, Handler resultHandler) { + long timerId = vertx.setPeriodic(50, id -> { + pool.query("SELECT 1 FROM pg_stat_activity WHERE pid = " + pid) + .execute() + .onComplete(ar -> { + if (ar.succeeded() && !ar.result().iterator().hasNext()) { + long closeTime = System.currentTimeMillis(); + vertx.cancelTimer(id); + resultHandler.handle(closeTime); + } + }); + }); + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 610604677..fba95043d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -313,8 +313,7 @@ public class PooledConnection implements Connection, Connection.Holder { this.factory = factory; this.conn = conn; this.listener = listener; - long randomizedJitter = (maxLifetime > 0 && jitter > 0 && maxLifetime > jitter) ? ThreadLocalRandom.current().nextLong(-jitter, jitter + 1) : 0; - this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + randomizedJitter : Long.MAX_VALUE; + this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + Math.max(0, ThreadLocalRandom.current().nextLong(0, jitter + 1)) : Long.MAX_VALUE; refresh(); }