From 61c91585b823e31b64fde42f267fade247e3ae2d Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Tue, 11 Jul 2023 17:27:44 -0300 Subject: [PATCH 1/8] Honor pool connection timeout when executing queries directly in the pool. Should fix #1232, as it now uses the timeout when acquiring the connection --- .../src/main/java/io/vertx/sqlclient/impl/PoolImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 724913297..1553790ab 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -169,7 +169,9 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { - return pool.execute(context, cmd); + PromiseInternal promise = context.promise(); + acquire(context, connectionTimeout, promise); + return promise.future().compose((pooled -> pooled.schedule(context, cmd))); } private void acquire(ContextInternal context, long timeout, Handler> completionHandler) { From 9ed5c577ec5b26898bb4aa02395e5a5e43e4e8a5 Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Tue, 11 Jul 2023 18:04:10 -0300 Subject: [PATCH 2/8] Removed unnecessary parenthesis --- .../src/main/java/io/vertx/sqlclient/impl/PoolImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 1553790ab..5a8e42943 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -171,7 +171,7 @@ public Future getConnection() { public Future schedule(ContextInternal context, CommandBase cmd) { PromiseInternal promise = context.promise(); acquire(context, connectionTimeout, promise); - return promise.future().compose((pooled -> pooled.schedule(context, cmd))); + return promise.future().compose(pooled -> pooled.schedule(context, cmd)); } private void acquire(ContextInternal context, long timeout, Handler> completionHandler) { From 3527436f02aeeaea9954c764cb782c82129af8ca Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Wed, 12 Jul 2023 09:16:47 -0300 Subject: [PATCH 3/8] Some improvements: - Found a pool test I could modify (but couldn't validate locally yet) - Changed the code to return the release the connection, which wasn't happening --- .../java/io/vertx/pgclient/PgPoolTest.java | 42 +++++++++++++++++++ .../io/vertx/sqlclient/impl/PoolImpl.java | 14 ++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java index 3b03a5293..ea508d13a 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java @@ -22,6 +22,7 @@ import io.vertx.core.Handler; import io.vertx.core.VertxOptions; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.Repeat; @@ -592,4 +593,45 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately })); })); } + + @Test + public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) { + PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2)); + final Async latch = ctx.async(2); + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertSuccess(rows -> { + ctx.assertEquals(12, rows.size()); + latch.countDown(); + })); + })); + + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertSuccess(rows -> { + ctx.assertEquals(12, rows.size()); + latch.countDown(); + })); + })); + + latch.awaitSuccess(); + final long timerId = vertx.setTimer(10000L, id -> { + ctx.fail("Timeout exceeded without completing"); + }); + //Used both connections + Async async = ctx.async(10); + for (int i = 0; i < 10; i++) { + pool + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertFailure(t -> { + ctx.assertTrue(t instanceof NoStackTraceThrowable); + ctx.assertEquals("Timeout", t.getMessage()); + async.countDown(); + })); + } + + async.handler(v -> vertx.cancelTimer(timerId)); + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 5a8e42943..a1525b5d1 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -170,8 +170,20 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { PromiseInternal promise = context.promise(); + //Acquires the connection honoring the pool's connection timeout acquire(context, connectionTimeout, promise); - return promise.future().compose(pooled -> pooled.schedule(context, cmd)); + return promise.future().compose(pooled -> { + //We need to 'init' the connection of close will fail. + pooled.init(pooled); + return pooled.schedule(context, cmd) + .eventually(v -> { + Promise p = Promise.promise(); + pooled.close(pooled, p); + return p.future(); + } + ); + } + ); } private void acquire(ContextInternal context, long timeout, Handler> completionHandler) { From 635cc01f4ab752c9d9cc26c2b3b4f301b7e7735c Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Wed, 12 Jul 2023 09:19:12 -0300 Subject: [PATCH 4/8] Fixed a typo --- .../src/main/java/io/vertx/sqlclient/impl/PoolImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index a1525b5d1..a848e617d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -173,7 +173,7 @@ public Future schedule(ContextInternal context, CommandBase cmd) { //Acquires the connection honoring the pool's connection timeout acquire(context, connectionTimeout, promise); return promise.future().compose(pooled -> { - //We need to 'init' the connection of close will fail. + //We need to 'init' the connection or close will fail. pooled.init(pooled); return pooled.schedule(context, cmd) .eventually(v -> { From 58dc3cd7fa31bc627cd84f9524cbcfb24a53ff74 Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Mon, 6 May 2024 09:16:55 -0300 Subject: [PATCH 5/8] Addressed PR comments: Added an option to enable the new behavior, while still ignoring timeout for legacy clients --- PgTimeoutTester.java | 84 +++++++++++++++++++ .../java/io/vertx/pgclient/PgPoolTest.java | 2 +- .../java/io/vertx/sqlclient/PoolOptions.java | 28 +++++++ .../io/vertx/sqlclient/impl/PoolImpl.java | 7 +- 4 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 PgTimeoutTester.java diff --git a/PgTimeoutTester.java b/PgTimeoutTester.java new file mode 100644 index 000000000..8340d96e6 --- /dev/null +++ b/PgTimeoutTester.java @@ -0,0 +1,84 @@ +package io.vertx.sqlclient.templates.impl; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.PoolOptions; + +import java.util.ArrayList; +import java.util.List; + +public class PgTimeoutTester { + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + + PgConnectOptions dbConfig = new PgConnectOptions() + .setPort(5432) + .setConnectTimeout(2000) + .setHost("localhost") + .setDatabase("postgres") + .setUser("postgres") + .setPassword("postgres"); + + PoolOptions poolConfig = new PoolOptions() + .setMaxSize(1) // One connection in Pool + .setConnectionTimeout(2); // 2 seconds + + PgPool pool = PgPool.pool(vertx, dbConfig, poolConfig); + + //connectionTimeOut(pool, vertx); + poolTimeOut(pool, vertx); + } + + private static void connectionTimeOut(PgPool pool, Vertx vertx) { + //First query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 1").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + /*.eventually(ign -> conn0.close())*/); // Don't close connection to trigger timeout while getting one below + + //Second query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 2").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .eventually(ign -> conn0.close())); + } + + private static void poolTimeOut(PgPool pool, Vertx vertx) { + //First query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 1").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .eventually(ign -> conn0.close()));// Don't close connection to trigger timeout while getting one below + + List> futures = new ArrayList<>(); + //N queries + for (int i = 2; i < 10; i++) { + Future f = pool.query("SELECT " + i).execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }); + futures.add(f); + } + + Future.all(futures).onComplete(c -> vertx.close()); + } +} diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java index ea508d13a..1a05cbbd0 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java @@ -596,7 +596,7 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately @Test public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) { - PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2)); + PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true)); final Async latch = ctx.async(2); pool.getConnection(ctx.asyncAssertSuccess(conn -> { conn diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index d309cb67a..ecfd47677 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -94,6 +94,11 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + /** + * Default honor timeout when scheduling commands is false + */ + public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -106,6 +111,7 @@ public class PoolOptions { private boolean shared = DEFAULT_SHARED_POOL; private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; + private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT; public PoolOptions() { } @@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) { shared= other.shared; name = other.name; eventLoopSize = other.eventLoopSize; + alwaysUseTimeout = other.alwaysUseTimeout; } /** @@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) { return this; } + /** + * @return Whether the pool will always use timeout, even when sending commands directly to execute. + */ + public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; } + + /** + * Sets whether always honor the pool's timeout. + *

+ * This basically affects the pool's schedule method, which will submit the command regardless of whether there's + * an available connection or not. This settings allows the caller to have a consistent max wait time across every + * method. + *

+ * The default is {@code false}. + * @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) { + this.alwaysUseTimeout = alwaysUseTimeout; + return this; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); PoolOptionsConverter.toJson(this, json); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index a848e617d..ee02baf01 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private volatile Handler connectionInitializer; private long timerID; private volatile Function> connectionProvider; + private final boolean alwaysUseTimeout; public static final String PROPAGATABLE_CONNECTION = "propagatable_connection"; @@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx, this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); + this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; @@ -169,6 +171,9 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { + if (alwaysUseTimeout) { + return pool.execute(context, cmd); + } PromiseInternal promise = context.promise(); //Acquires the connection honoring the pool's connection timeout acquire(context, connectionTimeout, promise); @@ -176,7 +181,7 @@ public Future schedule(ContextInternal context, CommandBase cmd) { //We need to 'init' the connection or close will fail. pooled.init(pooled); return pooled.schedule(context, cmd) - .eventually(v -> { + .eventually(() -> { Promise p = Promise.promise(); pooled.close(pooled, p); return p.future(); From 1e2f4621f070f4057c6395c00cf7b4ca1be15bd0 Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Mon, 6 May 2024 09:29:00 -0300 Subject: [PATCH 6/8] This wasn't supposed to be checked in --- PgTimeoutTester.java | 84 -------------------------------------------- 1 file changed, 84 deletions(-) delete mode 100644 PgTimeoutTester.java diff --git a/PgTimeoutTester.java b/PgTimeoutTester.java deleted file mode 100644 index 8340d96e6..000000000 --- a/PgTimeoutTester.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.vertx.sqlclient.templates.impl; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgPool; -import io.vertx.sqlclient.PoolOptions; - -import java.util.ArrayList; -import java.util.List; - -public class PgTimeoutTester { - public static void main(String[] args) { - Vertx vertx = Vertx.vertx(); - - PgConnectOptions dbConfig = new PgConnectOptions() - .setPort(5432) - .setConnectTimeout(2000) - .setHost("localhost") - .setDatabase("postgres") - .setUser("postgres") - .setPassword("postgres"); - - PoolOptions poolConfig = new PoolOptions() - .setMaxSize(1) // One connection in Pool - .setConnectionTimeout(2); // 2 seconds - - PgPool pool = PgPool.pool(vertx, dbConfig, poolConfig); - - //connectionTimeOut(pool, vertx); - poolTimeOut(pool, vertx); - } - - private static void connectionTimeOut(PgPool pool, Vertx vertx) { - //First query - pool.getConnection() - .onFailure(err -> { - err.printStackTrace(); - vertx.close(); - }) - .compose(conn0 -> - conn0.query("SELECT 1").execute() - .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) - /*.eventually(ign -> conn0.close())*/); // Don't close connection to trigger timeout while getting one below - - //Second query - pool.getConnection() - .onFailure(err -> { - err.printStackTrace(); - vertx.close(); - }) - .compose(conn0 -> - conn0.query("SELECT 2").execute() - .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) - .eventually(ign -> conn0.close())); - } - - private static void poolTimeOut(PgPool pool, Vertx vertx) { - //First query - pool.getConnection() - .onFailure(err -> { - err.printStackTrace(); - vertx.close(); - }) - .compose(conn0 -> - conn0.query("SELECT 1").execute() - .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) - .eventually(ign -> conn0.close()));// Don't close connection to trigger timeout while getting one below - - List> futures = new ArrayList<>(); - //N queries - for (int i = 2; i < 10; i++) { - Future f = pool.query("SELECT " + i).execute() - .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) - .onFailure(err -> { - err.printStackTrace(); - vertx.close(); - }); - futures.add(f); - } - - Future.all(futures).onComplete(c -> vertx.close()); - } -} From 1a8eca6b2e01f5b270490161c0fbfb8373ecbe0d Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Mon, 6 May 2024 09:32:29 -0300 Subject: [PATCH 7/8] Fixed logic --- .../src/main/java/io/vertx/sqlclient/impl/PoolImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index ee02baf01..5c534143d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -171,7 +171,7 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { - if (alwaysUseTimeout) { + if (!alwaysUseTimeout) { return pool.execute(context, cmd); } PromiseInternal promise = context.promise(); From dbb33f83d68f9fdccde3bfd328df44d0e1f3d898 Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Mon, 6 May 2024 16:29:39 -0300 Subject: [PATCH 8/8] Checking in generated code changes --- .../generated/io/vertx/sqlclient/PoolOptionsConverter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index 73a68fe91..1cd446d25 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -20,6 +20,11 @@ public class PoolOptionsConverter { static void fromJson(Iterable> json, PoolOptions obj) { for (java.util.Map.Entry member : json) { switch (member.getKey()) { + case "alwaysUseTimeout": + if (member.getValue() instanceof Boolean) { + obj.setAlwaysUseTimeout((Boolean)member.getValue()); + } + break; case "connectionTimeout": if (member.getValue() instanceof Number) { obj.setConnectionTimeout(((Number)member.getValue()).intValue()); @@ -89,6 +94,7 @@ static void toJson(PoolOptions obj, JsonObject json) { } static void toJson(PoolOptions obj, java.util.Map json) { + json.put("alwaysUseTimeout", obj.isAlwaysUseTimeout()); json.put("connectionTimeout", obj.getConnectionTimeout()); if (obj.getConnectionTimeoutUnit() != null) { json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name());