Skip to content

Commit 8b28a02

Browse files
authored
Merge pull request #1231 from technical-debt-collector/master
Create propagatable connection, add Transaction accessor
2 parents cad7a73 + 54d1bdd commit 8b28a02

File tree

7 files changed

+148
-13
lines changed

7 files changed

+148
-13
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
140140
}
141141

142142
/**
143-
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result
143+
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result.
144144
*/
145145
default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
146146
return getConnection()
@@ -164,6 +164,12 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
164164
.onComplete(ar -> conn.close()));
165165
}
166166

167+
/**
168+
* Like {@link #withTransaction(Function, Handler)} but allows for setting the mode, defining how the acquired
169+
* connection is managed during the execution of the function.
170+
*/
171+
<T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation, Function<SqlConnection, Future<@Nullable T>> function);
172+
167173
/**
168174
* Get a connection from the pool and execute the given {@code function}.
169175
*

vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public interface SqlConnection extends SqlClient {
9292
*/
9393
Future<Transaction> begin();
9494

95+
/**
96+
* @return the current transaction if it exists, otherwise null
97+
*/
98+
Transaction transaction();
99+
95100
/**
96101
* @return whether the connection uses SSL
97102
*/
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
12+
package io.vertx.sqlclient;
13+
14+
import java.util.function.Function;
15+
16+
/**
17+
* Defines how the acquired connection will be managed during the execution of the function provided in
18+
* {@link Pool#withTransaction(TransactionPropagation, Function)}.
19+
*/
20+
public enum TransactionPropagation {
21+
22+
/**
23+
* The acquired connection is not stored anywhere, making it local to the provided function execution and to
24+
* wherever it is passed.
25+
*/
26+
NONE,
27+
28+
/**
29+
* Keeps the acquired connection stored in the local context for as long as the given function executes.
30+
* Any subsequent calls to {@link Pool#withTransaction} with this mode during the function execution
31+
* will retrieve this connection from the context instead of creating another.
32+
* The connection is removed from the local context when the function block has completed.
33+
*/
34+
CONTEXT
35+
36+
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.vertx.sqlclient.impl;
1717

18+
import io.vertx.codegen.annotations.Nullable;
1819
import io.vertx.core.AsyncResult;
1920
import io.vertx.core.Context;
2021
import io.vertx.core.Future;
@@ -23,14 +24,7 @@
2324
import io.vertx.core.impl.ContextInternal;
2425
import io.vertx.core.impl.VertxInternal;
2526
import io.vertx.core.impl.future.PromiseInternal;
26-
import io.vertx.sqlclient.Pool;
27-
import io.vertx.sqlclient.PrepareOptions;
28-
import io.vertx.sqlclient.PreparedQuery;
29-
import io.vertx.sqlclient.Query;
30-
import io.vertx.sqlclient.Row;
31-
import io.vertx.sqlclient.RowSet;
32-
import io.vertx.sqlclient.SqlClient;
33-
import io.vertx.sqlclient.SqlConnection;
27+
import io.vertx.sqlclient.*;
3428
import io.vertx.sqlclient.spi.Driver;
3529

3630
import java.util.function.Function;
@@ -77,6 +71,12 @@ public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
7771
return delegate.preparedQuery(sql);
7872
}
7973

74+
@Override
75+
public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
76+
Function<SqlConnection, Future<@Nullable T>> function) {
77+
return delegate.withTransaction(txPropagation, function);
78+
}
79+
8080
@Override
8181
public P connectHandler(Handler<SqlConnection> handler) {
8282
delegate.connectHandler(handler);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717

1818
package io.vertx.sqlclient.impl;
1919

20+
import io.vertx.codegen.annotations.Nullable;
2021
import io.vertx.core.*;
2122
import io.vertx.core.impl.CloseFuture;
2223
import io.vertx.core.impl.ContextInternal;
2324
import io.vertx.core.impl.VertxInternal;
2425
import io.vertx.core.impl.future.PromiseInternal;
2526
import io.vertx.core.spi.metrics.ClientMetrics;
26-
import io.vertx.sqlclient.Pool;
27-
import io.vertx.sqlclient.PoolOptions;
28-
import io.vertx.sqlclient.SqlConnection;
27+
import io.vertx.sqlclient.*;
2928
import io.vertx.sqlclient.impl.command.CommandBase;
3029
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3130
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3231
import io.vertx.sqlclient.spi.Driver;
3332

34-
import java.util.Objects;
3533
import java.util.function.Function;
3634

3735
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -53,6 +51,8 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
5351
private long timerID;
5452
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
5553

54+
private static final String PROPAGATABLE_CONNECTION = "propagatable_connection";
55+
5656
public PoolImpl(VertxInternal vertx,
5757
Driver driver,
5858
QueryTracer tracer,
@@ -155,6 +155,48 @@ public Future<SqlConnection> getConnection() {
155155
});
156156
}
157157

158+
public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
159+
Function<SqlConnection, Future<@Nullable T>> function) {
160+
if (txPropagation == TransactionPropagation.CONTEXT) {
161+
ContextInternal context = (ContextInternal) Vertx.currentContext();
162+
SqlConnection sqlConnection = context.getLocal(PROPAGATABLE_CONNECTION);
163+
if (sqlConnection == null) {
164+
return startPropagatableConnection(function);
165+
}
166+
return context.succeededFuture(sqlConnection)
167+
.flatMap(conn -> function.apply(conn)
168+
.onFailure(err -> {
169+
if (!(err instanceof TransactionRollbackException)) {
170+
conn.transaction().rollback();
171+
}
172+
}));
173+
}
174+
return withTransaction(function);
175+
}
176+
177+
private <T> Future<@Nullable T> startPropagatableConnection(Function<SqlConnection, Future<@Nullable T>> function) {
178+
ContextInternal context = (ContextInternal) Vertx.currentContext();
179+
return getConnection().onComplete(handler -> context.putLocal(PROPAGATABLE_CONNECTION, handler.result()))
180+
.flatMap(conn -> conn
181+
.begin()
182+
.flatMap(tx -> function
183+
.apply(conn)
184+
.compose(
185+
res -> tx
186+
.commit()
187+
.flatMap(v -> context.succeededFuture(res)),
188+
err -> {
189+
if (err instanceof TransactionRollbackException) {
190+
return context.failedFuture(err);
191+
} else {
192+
return tx
193+
.rollback()
194+
.compose(v -> context.failedFuture(err), failure -> context.failedFuture(err));
195+
}
196+
}))
197+
.onComplete(ar -> conn.close(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
198+
}
199+
158200
@Override
159201
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
160202
Object metric;

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ public Future<Transaction> begin() {
165165
return tx.begin();
166166
}
167167

168+
@Override
169+
public Transaction transaction() {
170+
return tx;
171+
}
172+
168173
@Override
169174
boolean autoCommit() {
170175
return tx == null;

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/TransactionTestBase.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,45 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
308308
}));
309309
}));
310310
}
311+
312+
@Test
313+
public void testWithPropagatableConnectionTransactionCommit(TestContext ctx) {
314+
Async async = ctx.async();
315+
Pool pool = createPool();
316+
vertx.runOnContext(handler -> {
317+
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
318+
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
319+
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
320+
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
321+
conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
322+
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
323+
).onComplete(ctx.asyncAssertSuccess(v -> pool
324+
.query("SELECT id, val FROM mutable")
325+
.execute(ctx.asyncAssertSuccess(rows -> {
326+
ctx.assertEquals(3, rows.size());
327+
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
328+
async.complete();
329+
}))));
330+
});
331+
}
332+
333+
@Test
334+
public void testWithPropagatableConnectionTransactionRollback(TestContext ctx) {
335+
Async async = ctx.async();
336+
Pool pool = createPool();
337+
Throwable failure = new Throwable();
338+
vertx.runOnContext(handler -> {
339+
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
340+
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
341+
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(
342+
v -> Future.failedFuture(failure)))
343+
).onComplete(ctx.asyncAssertFailure(v -> pool
344+
.query("SELECT id, val FROM mutable")
345+
.execute(ctx.asyncAssertSuccess(rows -> {
346+
ctx.assertEquals(0, rows.size());
347+
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
348+
async.complete();
349+
}))));
350+
});
351+
}
311352
}

0 commit comments

Comments
 (0)