diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java
index 3cb1c56f0..9f83a075f 100644
--- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java
@@ -18,8 +18,11 @@
package io.vertx.tests.pgclient;
import io.netty.channel.EventLoop;
+import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
import io.vertx.core.VertxOptions;
+import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.Repeat;
@@ -34,21 +37,31 @@
import io.vertx.tests.sqlclient.ProxyServer;
import org.junit.Rule;
import org.junit.Test;
+import org.testcontainers.shaded.com.trilead.ssh2.ConnectionInfo;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collector;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
+import java.time.OffsetDateTime;
+
/**
* @author Julien Viet
*/
@@ -619,4 +632,184 @@ public void testConnectionClosedInHook(TestContext ctx) {
}));
}));
}
+
+ @Test
+ public void testConnectionJitter(TestContext ctx) {
+ PoolOptions poolOptions = new PoolOptions()
+ .setMaxSize(1)
+ .setMaxLifetime(3000)
+ .setMaxLifetimeUnit(TimeUnit.MILLISECONDS)
+ .setJitter(1)
+ .setJitterUnit(TimeUnit.SECONDS)
+ .setPoolCleanerPeriod(50);
+
+ Pool pool = createPool(options, poolOptions);
+ Async latch = ctx.async();
+
+ List pids = Collections.synchronizedList(new ArrayList<>());
+ List times = Collections.synchronizedList(new ArrayList<>());
+ AtomicInteger lastPid = new AtomicInteger(-1);
+ AtomicLong timerId = new AtomicLong();
+
+ Consumer checkPid = testCtx -> {
+ pool.query("SELECT pg_backend_pid() as pid")
+ .execute()
+ .onComplete(testCtx.asyncAssertSuccess(rs -> {
+ int currentPid = rs.iterator().next().getInteger("pid");
+ if (lastPid.get() != currentPid) {
+ pids.add(currentPid);
+ times.add(System.currentTimeMillis());
+ lastPid.set(currentPid);
+
+ if (pids.size() == 3) {
+ vertx.cancelTimer(timerId.get());
+ long diff1to2 = times.get(1) - times.get(0);
+ long diff2to3 = times.get(2) - times.get(1);
+
+ // Verify time ranges
+ int maxLifetime = 3000;
+ int jitter = 1000;
+ int buffer = 100;
+ int lowerBound = maxLifetime - jitter + buffer;
+ int upperBound = maxLifetime + jitter + buffer;
+
+ ctx.assertTrue(diff1to2 >= lowerBound && diff1to2 <= upperBound,
+ String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms",
+ pids.get(0), pids.get(1), diff1to2, lowerBound, upperBound));
+
+ ctx.assertTrue(diff2to3 >= lowerBound && diff2to3 <= upperBound,
+ String.format("Time between PIDs %d->%d (%dms) should be between %dms and %dms",
+ pids.get(1), pids.get(2), diff2to3, lowerBound, upperBound));
+
+ pool.close().onComplete(ctx.asyncAssertSuccess(v -> latch.complete()));
+ }
+ }
+ }));
+ };
+
+ timerId.set(vertx.setPeriodic(30, id -> checkPid.accept(ctx)));
+ latch.awaitSuccess(20000);
+ }
+
+ @Test
+ public void testConnectionCloseTimingParallel(TestContext ctx) {
+ // Configure pool options.
+ PoolOptions poolOptions = new PoolOptions()
+ .setMaxSize(20) // Allow parallel tasks.
+ .setMaxLifetime(5000) // Maximum lifetime = 5000 ms.
+ .setMaxLifetimeUnit(TimeUnit.MILLISECONDS)
+ .setJitter(1) // Jitter = 1 second.
+ .setJitterUnit(TimeUnit.SECONDS)
+ .setPoolCleanerPeriod(50);
+
+ Pool pool = createPool(options, poolOptions);
+ Async latch = ctx.async();
+ int totalConnections = 50;
+ List> futures = new ArrayList<>();
+ List pids = Collections.synchronizedList(new ArrayList<>());
+ ConcurrentMap startTimes = new ConcurrentHashMap<>();
+ ConcurrentMap endTimes = new ConcurrentHashMap<>();
+
+ // Launch connection tasks in parallel.
+ for (int i = 0; i < totalConnections; i++) {
+ futures.add(processSingleConnection(pool, i, ctx, pids, startTimes, endTimes));
+ }
+
+ Future.all(futures).onComplete(ctx.asyncAssertSuccess(ar -> {
+ pool.close().onComplete(ctx.asyncAssertSuccess(v -> {
+ // Wait 3 seconds after pool closure.
+ vertx.setTimer(3000, timerId -> {
+ PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
+ // (Optional) Query to verify that none of our recorded PIDs are still active.
+ String pidList = pids.stream().map(String::valueOf).collect(Collectors.joining(","));
+ String sql = "SELECT pid FROM pg_stat_activity WHERE pid IN (" + pidList + ")";
+ conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> {
+ // Compute durations for each PID.
+ List durations = pids.stream()
+ .map(pid -> endTimes.get(pid) - startTimes.get(pid))
+ .collect(Collectors.toList());
+ long maxLifetime = 5000;
+ long jitterMs = 1000;
+ long bucketWidth = jitterMs / 5;
+ // Bucket the durations based on an offset of maxLifetime.
+ Map> bucketMap = new HashMap<>();
+ for (Integer pid : pids) {
+ long duration = endTimes.get(pid) - startTimes.get(pid);
+ int bucket = (int) ((duration - maxLifetime) / bucketWidth);
+ bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()).add(pid);
+ }
+
+ ctx.assertTrue(bucketMap.size() >= 5, "Bucket Size should be 5");
+ bucketMap.forEach((bucket, bucketPids) -> {
+ ctx.assertTrue(!bucketPids.isEmpty(), "Bucket " + bucket + " should not be empty");
+ });
+ // Print one line per PID with its duration.
+ for (Integer pid : pids) {
+ long duration = endTimes.get(pid) - startTimes.get(pid);
+ }
+ conn.close();
+ latch.complete();
+ }));
+ }));
+ });
+ }));
+ }));
+ latch.awaitSuccess(60000);
+ }
+
+ /**
+ * Acquires a connection, retrieves its PID and backend_start time,
+ * closes the connection, and polls for its closure.
+ * Returns a Future with a JsonObject containing the PID, start time, and end time.
+ */
+ private Future processSingleConnection(Pool pool, int index, TestContext ctx,
+ List pids,
+ ConcurrentMap startTimes,
+ ConcurrentMap endTimes) {
+ Promise promise = Promise.promise();
+ pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> {
+ conn.query("SELECT pg_backend_pid() AS pid")
+ .execute()
+ .onComplete(ctx.asyncAssertSuccess(rs -> {
+ int pid = rs.iterator().next().getInteger("pid");
+ pids.add(pid);
+ String sql = "SELECT backend_start FROM pg_stat_activity WHERE pid = " + pid;
+ conn.query(sql).execute().onComplete(ctx.asyncAssertSuccess(rs2 -> {
+ Row row = rs2.iterator().next();
+ OffsetDateTime backendStart = row.getOffsetDateTime("backend_start");
+ long startMillis = backendStart.toInstant().toEpochMilli();
+ startTimes.put(pid, startMillis);
+ pollForClose(pool, pid, closeTime -> {
+ endTimes.put(pid, closeTime);
+ JsonObject res = new JsonObject()
+ .put("pid", pid)
+ .put("start", startMillis)
+ .put("end", closeTime);
+ promise.complete(res);
+ });
+ conn.close().onComplete(x -> {});
+ }));
+ }));
+ }));
+ return promise.future();
+ }
+
+ /**
+ * Polls pg_stat_activity periodically for the given PID.
+ * Once the PID is no longer found (i.e. the connection is closed),
+ * returns the current system time via the resultHandler.
+ */
+ private void pollForClose(Pool pool, int pid, Handler resultHandler) {
+ long timerId = vertx.setPeriodic(50, id -> {
+ pool.query("SELECT 1 FROM pg_stat_activity WHERE pid = " + pid)
+ .execute()
+ .onComplete(ar -> {
+ if (ar.succeeded() && !ar.result().iterator().hasNext()) {
+ long closeTime = System.currentTimeMillis();
+ vertx.cancelTimer(id);
+ resultHandler.handle(closeTime);
+ }
+ });
+ });
+ }
}
diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java
index ee783f734..7ee68ecec 100644
--- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java
+++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java
@@ -44,6 +44,16 @@ static void fromJson(Iterable> json, PoolOpt
obj.setMaxLifetime(((Number)member.getValue()).intValue());
}
break;
+ case "jitter":
+ if (member.getValue() instanceof Number) {
+ obj.setJitter(((Number)member.getValue()).intValue());
+ }
+ break;
+ case "jitterUnit":
+ if (member.getValue() instanceof String) {
+ obj.setJitterUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
+ }
+ break;
case "poolCleanerPeriod":
if (member.getValue() instanceof Number) {
obj.setPoolCleanerPeriod(((Number)member.getValue()).intValue());
@@ -93,6 +103,10 @@ static void toJson(PoolOptions obj, java.util.Map json) {
json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name());
}
json.put("maxLifetime", obj.getMaxLifetime());
+ json.put("jitter", obj.getJitter());
+ if (obj.getJitterUnit() != null) {
+ json.put("jitterUnit", obj.getJitterUnit().name());
+ }
json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod());
if (obj.getConnectionTimeoutUnit() != null) {
json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name());
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
index dff64bb35..d5531889a 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
@@ -93,6 +93,16 @@ public class PoolOptions {
*/
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;
+ /**
+ * Default jitter value = 0
+ */
+ public static final int DEFAULT_JITTER = 0;
+
+ /**
+ * Default jitter unit = milliseconds
+ */
+ public static final TimeUnit DEFAULT_JITTER_UNIT = TimeUnit.SECONDS;
+
private int maxSize = DEFAULT_MAX_SIZE;
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
@@ -105,7 +115,9 @@ public class PoolOptions {
private boolean shared = DEFAULT_SHARED_POOL;
private String name = DEFAULT_NAME;
private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE;
-
+ private int jitter = DEFAULT_JITTER;
+ private TimeUnit jitterUnit = DEFAULT_JITTER_UNIT;
+
public PoolOptions() {
}
@@ -241,6 +253,49 @@ public PoolOptions setMaxLifetime(int maxLifetime) {
return this;
}
+ /**
+ * Get the jitter value that will be applied to maxLifetime.
+ *
+ * @return the jitter value in milliseconds
+ */
+ public int getJitter() {
+ return this.jitter;
+ }
+
+ /**
+ * Set the jitter value, to be applied to maxLifetime.
+ *
+ * @param jitter the jitter value
+ * @return a reference to this, so the API can be used fluently
+ */
+ public PoolOptions setJitter(int jitter) {
+ if (jitter < 0) {
+ throw new IllegalArgumentException("jitter must be >= 0");
+ }
+ this.jitter = jitter;
+ return this;
+ }
+
+ /**
+ * Get the time unit for the jitter value.
+ *
+ * @return the jitter time unit
+ */
+ public TimeUnit getJitterUnit() {
+ return this.jitterUnit;
+ }
+
+ /**
+ * Set the time unit for the jitter value.
+ *
+ * @param jitterUnit the jitter time unit
+ * @return a reference to this, so the API can be used fluently
+ */
+ public PoolOptions setJitterUnit(TimeUnit jitterUnit) {
+ this.jitterUnit = jitterUnit;
+ return this;
+ }
+
/**
* @return the connection pool cleaner period in ms.
*/
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
index cc66f6daf..fba95043d 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
@@ -32,6 +32,7 @@
import io.vertx.sqlclient.spi.DatabaseMetadata;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -56,6 +57,7 @@ public class SqlConnectionPool {
private final boolean pipelined;
private final long idleTimeout;
private final long maxLifetime;
+ private final long jitter;
private final int maxSize;
public SqlConnectionPool(Function> connectionProvider,
@@ -66,6 +68,7 @@ public SqlConnectionPool(Function> connectionProv
VertxInternal vertx,
long idleTimeout,
long maxLifetime,
+ long jitter,
int maxSize,
boolean pipelined,
int maxWaitQueueSize,
@@ -82,6 +85,7 @@ public SqlConnectionPool(Function> connectionProv
this.pipelined = pipelined;
this.idleTimeout = idleTimeout;
this.maxLifetime = maxLifetime;
+ this.jitter = jitter;
this.maxSize = maxSize;
this.hook = hook;
this.connectionProvider = connectionProvider;
@@ -309,7 +313,7 @@ public class PooledConnection implements Connection, Connection.Holder {
this.factory = factory;
this.conn = conn;
this.listener = listener;
- this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
+ this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime + Math.max(0, ThreadLocalRandom.current().nextLong(0, jitter + 1)) : Long.MAX_VALUE;
refresh();
}
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
index bf269a080..604fe8380 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
@@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private final long idleTimeout;
private final long connectionTimeout;
private final long maxLifetime;
+ private final long jitter;
private final long cleanerPeriod;
private final boolean pipelined;
private final Handler connectionInitializer;
@@ -80,11 +81,12 @@ public PoolImpl(VertxInternal vertx,
this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit());
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
+ this.jitter = MILLISECONDS.convert(poolOptions.getJitter(), poolOptions.getJitterUnit());
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
- this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
+ this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, jitter, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
this.connectionInitializer = connectionInitializer;
}