Skip to content

Commit 27f8a74

Browse files
committed
A pool can declare a number of event loop to use instead of using borrower's event loop - fixes #1095
1 parent a439953 commit 27f8a74

File tree

10 files changed

+113
-38
lines changed

10 files changed

+113
-38
lines changed

vertx-db2-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ public void poolConfig02(DB2Pool pool, String sql) {
361361
}
362362

363363
public void poolSharing1(Vertx vertx, DB2ConnectOptions database, int maxSize) {
364-
Pool pool = DB2Pool.pool(database, new PoolOptions().setMaxSize(maxSize));
364+
DB2Pool pool = DB2Pool.pool(database, new PoolOptions().setMaxSize(maxSize));
365365
vertx.deployVerticle(() -> new AbstractVerticle() {
366366
@Override
367367
public void start() throws Exception {
@@ -372,7 +372,7 @@ public void start() throws Exception {
372372

373373
public void poolSharing2(Vertx vertx, DB2ConnectOptions database, int maxSize) {
374374
vertx.deployVerticle(() -> new AbstractVerticle() {
375-
Pool pool;
375+
DB2Pool pool;
376376
@Override
377377
public void start() {
378378
// Get or create a shared pool
@@ -385,4 +385,12 @@ public void start() {
385385
}
386386
}, new DeploymentOptions().setInstances(4));
387387
}
388+
389+
public static void poolSharing3(Vertx vertx, DB2ConnectOptions database, int maxSize) {
390+
DB2Pool pool = DB2Pool.pool(database, new PoolOptions()
391+
.setMaxSize(maxSize)
392+
.setShared(true)
393+
.setName("my-pool")
394+
.setEventLoopSize(4));
395+
}
388396
}

vertx-mssql-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ public void poolConfig02(MSSQLPool pool, String sql) {
360360
}
361361

362362
public void poolSharing1(Vertx vertx, MSSQLConnectOptions database, int maxSize) {
363-
Pool pool = MSSQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
363+
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
364364
vertx.deployVerticle(() -> new AbstractVerticle() {
365365
@Override
366366
public void start() throws Exception {
@@ -371,7 +371,7 @@ public void start() throws Exception {
371371

372372
public void poolSharing2(Vertx vertx, MSSQLConnectOptions database, int maxSize) {
373373
vertx.deployVerticle(() -> new AbstractVerticle() {
374-
Pool pool;
374+
MSSQLPool pool;
375375
@Override
376376
public void start() {
377377
// Get or create a shared pool
@@ -384,4 +384,12 @@ public void start() {
384384
}
385385
}, new DeploymentOptions().setInstances(4));
386386
}
387+
388+
public static void poolSharing3(Vertx vertx, MSSQLConnectOptions database, int maxSize) {
389+
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions()
390+
.setMaxSize(maxSize)
391+
.setShared(true)
392+
.setName("my-pool")
393+
.setEventLoopSize(4));
394+
}
387395
}

vertx-mysql-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ public void poolConfig02(MySQLPool pool, String sql) {
345345
}
346346

347347
public void poolSharing1(Vertx vertx, MySQLConnectOptions database, int maxSize) {
348-
Pool pool = MySQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
348+
MySQLPool pool = MySQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
349349
vertx.deployVerticle(() -> new AbstractVerticle() {
350350
@Override
351351
public void start() throws Exception {
@@ -356,7 +356,7 @@ public void start() throws Exception {
356356

357357
public void poolSharing2(Vertx vertx, MySQLConnectOptions database, int maxSize) {
358358
vertx.deployVerticle(() -> new AbstractVerticle() {
359-
Pool pool;
359+
MySQLPool pool;
360360
@Override
361361
public void start() {
362362
// Get or create a shared pool
@@ -369,4 +369,12 @@ public void start() {
369369
}
370370
}, new DeploymentOptions().setInstances(4));
371371
}
372+
373+
public static void poolSharing3(Vertx vertx, MySQLConnectOptions database, int maxSize) {
374+
MySQLPool pool = MySQLPool.pool(database, new PoolOptions()
375+
.setMaxSize(maxSize)
376+
.setShared(true)
377+
.setName("my-pool")
378+
.setEventLoopSize(4));
379+
}
372380
}

vertx-pg-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ public void poolConfig02(PgPool pool, String sql) {
364364
}
365365

366366
public void poolSharing1(Vertx vertx, PgConnectOptions database, int maxSize) {
367-
Pool pool = PgPool.pool(database, new PoolOptions().setMaxSize(maxSize));
367+
PgPool pool = PgPool.pool(database, new PoolOptions().setMaxSize(maxSize));
368368
vertx.deployVerticle(() -> new AbstractVerticle() {
369369
@Override
370370
public void start() throws Exception {
@@ -375,7 +375,7 @@ public void start() throws Exception {
375375

376376
public void poolSharing2(Vertx vertx, PgConnectOptions database, int maxSize) {
377377
vertx.deployVerticle(() -> new AbstractVerticle() {
378-
Pool pool;
378+
PgPool pool;
379379
@Override
380380
public void start() {
381381
// Get or create a shared pool
@@ -388,4 +388,12 @@ public void start() {
388388
}
389389
}, new DeploymentOptions().setInstances(4));
390390
}
391+
392+
public static void poolSharing3(Vertx vertx, PgConnectOptions database, int maxSize) {
393+
PgPool pool = PgPool.pool(database, new PoolOptions()
394+
.setMaxSize(maxSize)
395+
.setShared(true)
396+
.setName("my-pool")
397+
.setEventLoopSize(4));
398+
}
391399
}

vertx-sql-client/src/main/asciidoc/pool_sharing.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,10 @@ a lease to this pool. The resources are disposed after all leases have been clos
1818

1919
By default, a pool reuses the current event-loop when it needs to create a TCP connection. The shared pool will
2020
therefore randomly use event-loops of verticles using it.
21+
22+
You can assign a number of event loop a pool will use independently of the context using it
23+
24+
[source,$lang]
25+
----
26+
{@link examples.SqlClientExamples#poolSharing3}
27+
----

vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
3030
obj.setConnectionTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
3131
}
3232
break;
33+
case "eventLoopSize":
34+
if (member.getValue() instanceof Number) {
35+
obj.setEventLoopSize(((Number)member.getValue()).intValue());
36+
}
37+
break;
3338
case "idleTimeout":
3439
if (member.getValue() instanceof Number) {
3540
obj.setIdleTimeout(((Number)member.getValue()).intValue());
@@ -78,6 +83,7 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
7883
if (obj.getConnectionTimeoutUnit() != null) {
7984
json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name());
8085
}
86+
json.put("eventLoopSize", obj.getEventLoopSize());
8187
json.put("idleTimeout", obj.getIdleTimeout());
8288
if (obj.getIdleTimeoutUnit() != null) {
8389
json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name());

vertx-sql-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.Future;
2222
import io.vertx.core.Vertx;
2323
import io.vertx.core.http.HttpClient;
24+
import io.vertx.core.http.HttpClientOptions;
2425
import io.vertx.core.tracing.TracingPolicy;
2526
import io.vertx.sqlclient.Cursor;
2627
import io.vertx.sqlclient.Pool;
@@ -424,4 +425,12 @@ public void start() {
424425
}
425426
}, new DeploymentOptions().setInstances(4));
426427
}
428+
429+
public static void poolSharing3(Vertx vertx, SqlConnectOptions database, int maxSize) {
430+
Pool pool = Pool.pool(database, new PoolOptions()
431+
.setMaxSize(maxSize)
432+
.setShared(true)
433+
.setName("my-pool")
434+
.setEventLoopSize(4));
435+
}
427436
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package io.vertx.sqlclient;
1919

2020
import io.vertx.codegen.annotations.DataObject;
21+
import io.vertx.core.http.HttpClientOptions;
2122
import io.vertx.core.impl.Arguments;
2223
import io.vertx.core.json.JsonObject;
2324

@@ -77,6 +78,11 @@ public class PoolOptions {
7778
*/
7879
public static final String DEFAULT_NAME = "__vertx.DEFAULT";
7980

81+
/**
82+
* Default pool event loop size = 0 (reuse current event-loop)
83+
*/
84+
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;
85+
8086
private int maxSize = DEFAULT_MAX_SIZE;
8187
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
8288
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
@@ -86,6 +92,7 @@ public class PoolOptions {
8692
private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT;
8793
private boolean shared = DEFAULT_SHARED_POOL;
8894
private String name = DEFAULT_NAME;
95+
private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE;
8996

9097
public PoolOptions() {
9198
}
@@ -101,6 +108,7 @@ public PoolOptions(PoolOptions other) {
101108
idleTimeoutUnit = other.idleTimeoutUnit;
102109
shared= other.shared;
103110
name = other.name;
111+
eventLoopSize = other.eventLoopSize;
104112
}
105113

106114
/**
@@ -274,6 +282,33 @@ public PoolOptions setName(String name) {
274282
return this;
275283
}
276284

285+
/**
286+
* @return the max number of event-loop a pool will use, the default value is {@code 0} which implies
287+
* to reuse the current event-loop
288+
*/
289+
public int getEventLoopSize() {
290+
return eventLoopSize;
291+
}
292+
293+
/**
294+
* Set the number of event-loop the pool use.
295+
*
296+
* <ul>
297+
* <li>when the size is {@code 0}, the client pool will use the current event-loop</li>
298+
* <li>otherwise the client will create and use its own event loop</li>
299+
* </ul>
300+
*
301+
* The default size is {@code 0}.
302+
*
303+
* @param eventLoopSize the new size
304+
* @return a reference to this, so the API can be used fluently
305+
*/
306+
public PoolOptions setEventLoopSize(int eventLoopSize) {
307+
Arguments.require(eventLoopSize >= 0, "poolEventLoopSize must be >= 0");
308+
this.eventLoopSize = eventLoopSize;
309+
return this;
310+
}
311+
277312
public JsonObject toJson() {
278313
JsonObject json = new JsonObject();
279314
PoolOptionsConverter.toJson(this, json);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public PoolImpl(VertxInternal vertx,
7070
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
7171
this.timerID = -1L;
7272
this.vertx = vertx;
73-
this.pool = new SqlConnectionPool(baseConnectOptions, connectOptionsProvider, ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, vertx, idleTimeout, poolOptions.getMaxSize(), pipeliningLimit, poolOptions.getMaxWaitQueueSize());
73+
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, vertx, idleTimeout, poolOptions.getMaxSize(), pipeliningLimit, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
7474
this.closeFuture = closeFuture;
7575
}
7676

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717

1818
package io.vertx.sqlclient.impl.pool;
1919

20+
import io.netty.channel.EventLoop;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.core.impl.EventLoopContext;
2223
import io.vertx.core.impl.VertxInternal;
23-
import io.vertx.core.impl.logging.Logger;
24-
import io.vertx.core.impl.logging.LoggerFactory;
2524
import io.vertx.core.net.SocketAddress;
2625
import io.vertx.core.net.impl.ConnectionBase;
2726
import io.vertx.core.net.impl.pool.ConnectResult;
2827
import io.vertx.core.net.impl.pool.Lease;
2928
import io.vertx.core.net.impl.pool.ConnectionPool;
30-
import io.vertx.core.net.impl.pool.PoolConnection;
3129
import io.vertx.core.net.impl.pool.PoolConnector;
3230
import io.vertx.core.net.impl.pool.PoolWaiter;
33-
import io.vertx.sqlclient.SqlConnectOptions;
3431
import io.vertx.sqlclient.SqlConnection;
3532
import io.vertx.sqlclient.impl.Connection;
3633
import io.vertx.sqlclient.impl.SqlConnectionBase;
@@ -53,10 +50,6 @@
5350
*/
5451
public class SqlConnectionPool {
5552

56-
private static final Logger log = LoggerFactory.getLogger(SqlConnectionPool.class);
57-
58-
private final SqlConnectOptions baseConnectOptions;
59-
private final Supplier<Future<SqlConnectOptions>> connectOptionsProvider;
6053
private final Function<Context, Future<SqlConnection>> connectionProvider;
6154
private final VertxInternal vertx;
6255
private final ConnectionPool<PooledConnection> pool;
@@ -65,23 +58,20 @@ public class SqlConnectionPool {
6558
private final long idleTimeout;
6659
private final int maxSize;
6760

68-
public SqlConnectionPool(SqlConnectOptions baseConnectOptions,
69-
Supplier<Future<SqlConnectOptions>> connectOptionsProvider,
70-
Function<Context, Future<SqlConnection>> connectionProvider,
61+
public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
7162
Supplier<Handler<PooledConnection>> hook,
7263
VertxInternal vertx,
7364
long idleTimeout,
7465
int maxSize,
7566
int pipeliningLimit,
76-
int maxWaitQueueSize) {
67+
int maxWaitQueueSize,
68+
int eventLoopSize) {
7769
if (maxSize < 1) {
7870
throw new IllegalArgumentException("Pool max size must be > 0");
7971
}
8072
if (pipeliningLimit < 1) {
8173
throw new IllegalArgumentException("Pipelining limit must be > 0");
8274
}
83-
this.baseConnectOptions = baseConnectOptions;
84-
this.connectOptionsProvider = connectOptionsProvider;
8575
this.pool = ConnectionPool.pool(connector, new int[] { maxSize }, maxWaitQueueSize);
8676
this.vertx = vertx;
8777
this.pipeliningLimit = pipeliningLimit;
@@ -90,24 +80,20 @@ public SqlConnectionPool(SqlConnectOptions baseConnectOptions,
9080
this.hook = hook;
9181
this.connectionProvider = connectionProvider;
9282

93-
if (pipeliningLimit > 1) {
94-
pool.connectionSelector((waiter, list) -> {
95-
PoolConnection<PooledConnection> selected = null;
96-
int size = list.size();
97-
for (int i = 0;i < size;i++) {
98-
PoolConnection<PooledConnection> conn = list.get(i);
99-
if (conn.concurrency() > 0) {
100-
if (selected == null) {
101-
selected = conn;
102-
} else if (conn.get().inflight < selected.get().inflight) {
103-
selected = conn;
104-
}
105-
}
83+
if (eventLoopSize > 0) {
84+
EventLoop[] loops = new EventLoop[maxSize];
85+
for (int i = 0;i < maxSize;i++) {
86+
loops[i] = vertx.nettyEventLoopGroup().next();
87+
}
88+
pool.contextProvider(new Function<ContextInternal, EventLoopContext>() {
89+
int idx = 0;
90+
@Override
91+
public EventLoopContext apply(ContextInternal contextInternal) {
92+
EventLoop loop = loops[idx++];
93+
return vertx.createEventLoopContext(loop, null, Thread.currentThread().getContextClassLoader());
10694
}
107-
return selected;
10895
});
10996
}
110-
11197
}
11298

11399
private final PoolConnector<PooledConnection> connector = new PoolConnector<PooledConnection>() {

0 commit comments

Comments
 (0)