diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java index 3cb1c56f0..9f83a075f 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java @@ -18,8 +18,11 @@ package io.vertx.tests.pgclient; import io.netty.channel.EventLoop; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.Repeat; @@ -34,21 +37,31 @@ import io.vertx.tests.sqlclient.ProxyServer; import org.junit.Rule; import org.junit.Test; +import org.testcontainers.shaded.com.trilead.ssh2.ConnectionInfo; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collector; +import java.util.function.Consumer; +import java.util.stream.Collectors; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import java.time.OffsetDateTime; + /** * @author Julien Viet */ @@ -619,4 +632,184 @@ public void testConnectionClosedInHook(TestContext ctx) { })); })); } + + @Test + public void testConnectionJitter(TestContext ctx) { + PoolOptions poolOptions = new PoolOptions() + .setMaxSize(1) + .setMaxLifetime(3000) + .setMaxLifetimeUnit(TimeUnit.MILLISECONDS) + .setJitter(1) + .setJitterUnit(TimeUnit.SECONDS) + .setPoolCleanerPeriod(50); + + Pool pool = createPool(options, poolOptions); + Async latch = ctx.async(); + + List pids = Collections.synchronizedList(new ArrayList<>()); + List times = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger lastPid = new AtomicInteger(-1); + AtomicLong timerId = new AtomicLong(); + + Consumer checkPid = testCtx -> { + pool.query("SELECT pg_backend_pid() as pid") + .execute() + .onComplete(testCtx.asyncAssertSuccess(rs -> { + int currentPid = rs.iterator().next().getInteger("pid"); + if (lastPid.get() != currentPid) { + pids.add(currentPid); + times.add(System.currentTimeMillis()); + lastPid.set(currentPid); + + if (pids.size() == 3) { + vertx.cancelTimer(timerId.get()); + long diff1to2 = times.get(1) - times.get(0); + long diff2to3 = times.get(2) - times.get(1); + + // Verify time ranges + int maxLifetime = 3000; + int jitter = 1000; + int buffer = 100; + int lowerBound = maxLifetime - jitter + buffer; + int upperBound = maxLifetime + jitter + buffer; + + ctx.assertTrue(diff1to2 >= lowerBound && diff1to2 <= upperBound, + String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms", + pids.get(0), pids.get(1), diff1to2, lowerBound, upperBound)); + + ctx.assertTrue(diff2to3 >= lowerBound && diff2to3 <= upperBound, + String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms", + pids.get(1), pids.get(2), diff2to3, lowerBound, upperBound)); + + pool.close().onComplete(ctx.asyncAssertSuccess(v -> latch.complete())); + } + } + })); + }; + + timerId.set(vertx.setPeriodic(30, id -> checkPid.accept(ctx))); + latch.awaitSuccess(20000); + } + + @Test + public void testConnectionCloseTimingParallel(TestContext ctx) { + // Configure pool options. + PoolOptions poolOptions = new PoolOptions() + .setMaxSize(20) // Allow parallel tasks. + .setMaxLifetime(5000) // Maximum lifetime = 5000 ms. + .setMaxLifetimeUnit(TimeUnit.MILLISECONDS) + .setJitter(1) // Jitter = 1 second. + .setJitterUnit(TimeUnit.SECONDS) + .setPoolCleanerPeriod(50); + + Pool pool = createPool(options, poolOptions); + Async latch = ctx.async(); + int totalConnections = 50; + List> futures = new ArrayList<>(); + List pids = Collections.synchronizedList(new ArrayList<>()); + ConcurrentMap startTimes = new ConcurrentHashMap<>(); + ConcurrentMap endTimes = new ConcurrentHashMap<>(); + + // Launch connection tasks in parallel. + for (int i = 0; i < totalConnections; i++) { + futures.add(processSingleConnection(pool, i, ctx, pids, startTimes, endTimes)); + } + + Future.all(futures).onComplete(ctx.asyncAssertSuccess(ar -> { + pool.close().onComplete(ctx.asyncAssertSuccess(v -> { + // Wait 3 seconds after pool closure. + vertx.setTimer(3000, timerId -> { + PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + // (Optional) Query to verify that none of our recorded PIDs are still active. + String pidList = pids.stream().map(String::valueOf).collect(Collectors.joining(",")); + String sql = "SELECT pid FROM pg_stat_activity WHERE pid IN (" + pidList + ")"; + conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> { + // Compute durations for each PID. + List durations = pids.stream() + .map(pid -> endTimes.get(pid) - startTimes.get(pid)) + .collect(Collectors.toList()); + long maxLifetime = 5000; + long jitterMs = 1000; + long bucketWidth = jitterMs / 5; + // Bucket the durations based on an offset of maxLifetime. + Map> bucketMap = new HashMap<>(); + for (Integer pid : pids) { + long duration = endTimes.get(pid) - startTimes.get(pid); + int bucket = (int) ((duration - maxLifetime) / bucketWidth); + bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()).add(pid); + } + + ctx.assertTrue(bucketMap.size() >= 5, "Bucket Size should be 5"); + bucketMap.forEach((bucket, bucketPids) -> { + ctx.assertTrue(!bucketPids.isEmpty(), "Bucket " + bucket + " should not be empty"); + }); + // Print one line per PID with its duration. + for (Integer pid : pids) { + long duration = endTimes.get(pid) - startTimes.get(pid); + } + conn.close(); + latch.complete(); + })); + })); + }); + })); + })); + latch.awaitSuccess(60000); + } + + /** + * Acquires a connection, retrieves its PID and backend_start time, + * closes the connection, and polls for its closure. + * Returns a Future with a JsonObject containing the PID, start time, and end time. + */ + private Future processSingleConnection(Pool pool, int index, TestContext ctx, + List pids, + ConcurrentMap startTimes, + ConcurrentMap endTimes) { + Promise promise = Promise.promise(); + pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> { + conn.query("SELECT pg_backend_pid() AS pid") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + int pid = rs.iterator().next().getInteger("pid"); + pids.add(pid); + String sql = "SELECT backend_start FROM pg_stat_activity WHERE pid = " + pid; + conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> { + Row row = rs2.iterator().next(); + OffsetDateTime backendStart = row.getOffsetDateTime("backend_start"); + long startMillis = backendStart.toInstant().toEpochMilli(); + startTimes.put(pid, startMillis); + pollForClose(pool, pid, closeTime -> { + endTimes.put(pid, closeTime); + JsonObject res = new JsonObject() + .put("pid", pid) + .put("start", startMillis) + .put("end", closeTime); + promise.complete(res); + }); + conn.close().onComplete(x -> {}); + })); + })); + })); + return promise.future(); + } + + /** + * Polls pg_stat_activity periodically for the given PID. + * Once the PID is no longer found (i.e. the connection is closed), + * returns the current system time via the resultHandler. + */ + private void pollForClose(Pool pool, int pid, Handler resultHandler) { + long timerId = vertx.setPeriodic(50, id -> { + pool.query("SELECT 1 FROM pg_stat_activity WHERE pid = " + pid) + .execute() + .onComplete(ar -> { + if (ar.succeeded() && !ar.result().iterator().hasNext()) { + long closeTime = System.currentTimeMillis(); + vertx.cancelTimer(id); + resultHandler.handle(closeTime); + } + }); + }); + } } diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index ee783f734..7ee68ecec 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -44,6 +44,16 @@ static void fromJson(Iterable> json, PoolOpt obj.setMaxLifetime(((Number)member.getValue()).intValue()); } break; + case "jitter": + if (member.getValue() instanceof Number) { + obj.setJitter(((Number)member.getValue()).intValue()); + } + break; + case "jitterUnit": + if (member.getValue() instanceof String) { + obj.setJitterUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; case "poolCleanerPeriod": if (member.getValue() instanceof Number) { obj.setPoolCleanerPeriod(((Number)member.getValue()).intValue()); @@ -93,6 +103,10 @@ static void toJson(PoolOptions obj, java.util.Map json) { json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name()); } json.put("maxLifetime", obj.getMaxLifetime()); + json.put("jitter", obj.getJitter()); + if (obj.getJitterUnit() != null) { + json.put("jitterUnit", obj.getJitterUnit().name()); + } json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod()); if (obj.getConnectionTimeoutUnit() != null) { json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name()); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index dff64bb35..d5531889a 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -93,6 +93,16 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + /** + * Default jitter value = 0 + */ + public static final int DEFAULT_JITTER = 0; + + /** + * Default jitter unit = milliseconds + */ + public static final TimeUnit DEFAULT_JITTER_UNIT = TimeUnit.SECONDS; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -105,7 +115,9 @@ public class PoolOptions { private boolean shared = DEFAULT_SHARED_POOL; private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; - + private int jitter = DEFAULT_JITTER; + private TimeUnit jitterUnit = DEFAULT_JITTER_UNIT; + public PoolOptions() { } @@ -241,6 +253,49 @@ public PoolOptions setMaxLifetime(int maxLifetime) { return this; } + /** + * Get the jitter value that will be applied to maxLifetime. + * + * @return the jitter value in milliseconds + */ + public int getJitter() { + return this.jitter; + } + + /** + * Set the jitter value, to be applied to maxLifetime. + * + * @param jitter the jitter value + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setJitter(int jitter) { + if (jitter < 0) { + throw new IllegalArgumentException("jitter must be >= 0"); + } + this.jitter = jitter; + return this; + } + + /** + * Get the time unit for the jitter value. + * + * @return the jitter time unit + */ + public TimeUnit getJitterUnit() { + return this.jitterUnit; + } + + /** + * Set the time unit for the jitter value. + * + * @param jitterUnit the jitter time unit + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setJitterUnit(TimeUnit jitterUnit) { + this.jitterUnit = jitterUnit; + return this; + } + /** * @return the connection pool cleaner period in ms. */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index cc66f6daf..fba95043d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -32,6 +32,7 @@ import io.vertx.sqlclient.spi.DatabaseMetadata; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,6 +57,7 @@ public class SqlConnectionPool { private final boolean pipelined; private final long idleTimeout; private final long maxLifetime; + private final long jitter; private final int maxSize; public SqlConnectionPool(Function> connectionProvider, @@ -66,6 +68,7 @@ public SqlConnectionPool(Function> connectionProv VertxInternal vertx, long idleTimeout, long maxLifetime, + long jitter, int maxSize, boolean pipelined, int maxWaitQueueSize, @@ -82,6 +85,7 @@ public SqlConnectionPool(Function> connectionProv this.pipelined = pipelined; this.idleTimeout = idleTimeout; this.maxLifetime = maxLifetime; + this.jitter = jitter; this.maxSize = maxSize; this.hook = hook; this.connectionProvider = connectionProvider; @@ -309,7 +313,7 @@ public class PooledConnection implements Connection, Connection.Holder { this.factory = factory; this.conn = conn; this.listener = listener; - this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE; + this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + Math.max(0, ThreadLocalRandom.current().nextLong(0, jitter + 1)) : Long.MAX_VALUE; refresh(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index bf269a080..604fe8380 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private final long idleTimeout; private final long connectionTimeout; private final long maxLifetime; + private final long jitter; private final long cleanerPeriod; private final boolean pipelined; private final Handler connectionInitializer; @@ -80,11 +81,12 @@ public PoolImpl(VertxInternal vertx, this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit()); this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); + this.jitter = MILLISECONDS.convert(poolOptions.getJitter(), poolOptions.getJitterUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; - this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, jitter, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; this.connectionInitializer = connectionInitializer; }