Skip to content

Commit c47516a

Browse files
Define modes for withTransaction, rename withPropagatedTransaction, adhere to code style guidelines
1 parent 777604a commit c47516a

File tree

5 files changed

+62
-38
lines changed

5 files changed

+62
-38
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
140140
}
141141

142142
/**
143-
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result
143+
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result.
144144
*/
145145
default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
146146
return getConnection()
@@ -165,10 +165,10 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
165165
}
166166

167167
/**
168-
* Like {@link #withTransaction(Function, Handler)} but keeps the connection accessible via the context
169-
* for the duration of the given {@code function}
168+
* Like {@link #withTransaction(Function, Handler)} but allows for setting the mode, defining how the acquired
169+
* connection is managed during the execution of the function.
170170
*/
171-
<T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function);
171+
<T> Future<@Nullable T> withTransaction(TransactionMode mode, Function<SqlConnection, Future<@Nullable T>> function);
172172

173173
/**
174174
* Get a connection from the pool and execute the given {@code function}.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.vertx.sqlclient;
2+
3+
import java.util.function.Function;
4+
5+
/**
6+
* Defines how the acquired connection will be managed during the execution of the function provided in
7+
* {@link Pool#withTransaction(TransactionMode, Function)}.
8+
*/
9+
public enum TransactionMode {
10+
11+
/**
12+
* The acquired connection is not stored anywhere, making it local to the provided function execution and to
13+
* whereever it is passed.
14+
*/
15+
DEFAULT,
16+
17+
/**
18+
* Keeps the acquired connection stored in the local context for as long as the given function executes.
19+
* Any subsequent calls to {@link Pool#withTransaction} with this mode during the function execution
20+
* will retrieve this connection from the context instead of creating another.
21+
* The connection is removed from the local context when the function block has completed.
22+
*/
23+
PROPAGATABLE
24+
25+
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,7 @@
2424
import io.vertx.core.impl.ContextInternal;
2525
import io.vertx.core.impl.VertxInternal;
2626
import io.vertx.core.impl.future.PromiseInternal;
27-
import io.vertx.sqlclient.Pool;
28-
import io.vertx.sqlclient.PrepareOptions;
29-
import io.vertx.sqlclient.PreparedQuery;
30-
import io.vertx.sqlclient.Query;
31-
import io.vertx.sqlclient.Row;
32-
import io.vertx.sqlclient.RowSet;
33-
import io.vertx.sqlclient.SqlClient;
34-
import io.vertx.sqlclient.SqlConnection;
27+
import io.vertx.sqlclient.*;
3528
import io.vertx.sqlclient.spi.Driver;
3629

3730
import java.util.function.Function;
@@ -79,8 +72,9 @@ public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
7972
}
8073

8174
@Override
82-
public <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
83-
return delegate.withPropagatedTransaction(function);
75+
public <T> Future<@Nullable T> withTransaction(TransactionMode mode,
76+
Function<SqlConnection, Future<@Nullable T>> function) {
77+
return delegate.withTransaction(mode, function);
8478
}
8579

8680
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import io.vertx.core.impl.VertxInternal;
2525
import io.vertx.core.impl.future.PromiseInternal;
2626
import io.vertx.core.spi.metrics.ClientMetrics;
27-
import io.vertx.sqlclient.Pool;
28-
import io.vertx.sqlclient.PoolOptions;
29-
import io.vertx.sqlclient.SqlConnection;
30-
import io.vertx.sqlclient.TransactionRollbackException;
27+
import io.vertx.sqlclient.*;
3128
import io.vertx.sqlclient.impl.command.CommandBase;
3229
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3330
import io.vertx.sqlclient.impl.tracing.QueryTracer;
@@ -158,22 +155,26 @@ public Future<SqlConnection> getConnection() {
158155
});
159156
}
160157

161-
public <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
162-
ContextInternal context = (ContextInternal) Vertx.currentContext();
163-
SqlConnection sqlConnection = context.getLocal(PROPAGATABLE_CONNECTION);
164-
if (sqlConnection == null) {
165-
return initializePropagatedConnectionAndTransaction(function);
158+
public <T> Future<@Nullable T> withTransaction(TransactionMode mode,
159+
Function<SqlConnection, Future<@Nullable T>> function) {
160+
if (mode == TransactionMode.PROPAGATABLE) {
161+
ContextInternal context = (ContextInternal) Vertx.currentContext();
162+
SqlConnection sqlConnection = context.getLocal(PROPAGATABLE_CONNECTION);
163+
if (sqlConnection == null) {
164+
return startPropagatableConnection(function);
165+
}
166+
return context.succeededFuture(sqlConnection)
167+
.flatMap(conn -> function.apply(conn)
168+
.onFailure(err -> {
169+
if (!(err instanceof TransactionRollbackException)) {
170+
conn.getTransaction().rollback();
171+
}
172+
}));
166173
}
167-
return context.succeededFuture(sqlConnection)
168-
.flatMap(conn -> function.apply(conn)
169-
.onFailure(err -> {
170-
if (!(err instanceof TransactionRollbackException)) {
171-
conn.getTransaction().rollback();
172-
}
173-
}));
174+
return withTransaction(function);
174175
}
175176

176-
private <T> Future<@Nullable T> initializePropagatedConnectionAndTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
177+
private <T> Future<@Nullable T> startPropagatableConnection(Function<SqlConnection, Future<@Nullable T>> function) {
177178
ContextInternal context = (ContextInternal) Vertx.currentContext();
178179
return getConnection().onComplete(handler -> context.putLocal(PROPAGATABLE_CONNECTION, handler.result()))
179180
.flatMap(conn -> conn

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/TransactionTestBase.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,15 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
310310
}
311311

312312
@Test
313-
public void testWithPropagatedConnectionTransactionCommit(TestContext ctx) {
313+
public void testWithPropagatableConnectionTransactionCommit(TestContext ctx) {
314314
Async async = ctx.async();
315315
Pool pool = createPool();
316316
vertx.runOnContext(handler -> {
317-
pool.withPropagatedTransaction(c ->
318-
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
319-
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
317+
pool.withTransaction(TransactionMode.PROPAGATABLE, c ->
318+
pool.withTransaction(TransactionMode.PROPAGATABLE, conn ->
319+
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
320+
pool.withTransaction(TransactionMode.PROPAGATABLE, conn ->
321+
conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
320322
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
321323
).onComplete(ctx.asyncAssertSuccess(v -> pool
322324
.query("SELECT id, val FROM mutable")
@@ -329,13 +331,15 @@ public void testWithPropagatedConnectionTransactionCommit(TestContext ctx) {
329331
}
330332

331333
@Test
332-
public void testWithPropagatedConnectionTransactionRollback(TestContext ctx) {
334+
public void testWithPropagatableConnectionTransactionRollback(TestContext ctx) {
333335
Async async = ctx.async();
334336
Pool pool = createPool();
335337
Throwable failure = new Throwable();
336338
vertx.runOnContext(handler -> {
337-
pool.withPropagatedTransaction(c ->
338-
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(v -> Future.failedFuture(failure)))
339+
pool.withTransaction(TransactionMode.PROPAGATABLE, c ->
340+
pool.withTransaction(TransactionMode.PROPAGATABLE, conn ->
341+
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(
342+
v -> Future.failedFuture(failure)))
339343
).onComplete(ctx.asyncAssertFailure(v -> pool
340344
.query("SELECT id, val FROM mutable")
341345
.execute(ctx.asyncAssertSuccess(rows -> {

0 commit comments

Comments
 (0)