Skip to content

Commit 2ea0ee8

Browse files
Simplify handling of propagated connections, place connection in context
1 parent bf4690d commit 2ea0ee8

File tree

4 files changed

+45
-69
lines changed

4 files changed

+45
-69
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ static Pool pool(Vertx vertx, SqlConnectOptions database, PoolOptions options) {
9898
*/
9999
Future<SqlConnection> getConnection();
100100

101-
Future<SqlConnection> getPropagatableConnection();
102-
103-
boolean propagatableConnectionIsActive();
104-
105-
Future<Void> setPropagatableConnection(SqlConnection propagatableConnection);
106-
107101
/**
108102
* {@inheritDoc}
109103
*
@@ -170,38 +164,11 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
170164
.onComplete(ar -> conn.close()));
171165
}
172166

173-
174-
default <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
175-
if (propagatableConnectionIsActive()) {
176-
return getPropagatableConnection()
177-
.flatMap(conn -> function.apply(conn)
178-
.onFailure(err -> {
179-
if (!(err instanceof TransactionRollbackException)) {
180-
conn.getTransaction().rollback();
181-
}
182-
}));
183-
} else {
184-
return getPropagatableConnection()
185-
.flatMap(conn -> conn
186-
.begin()
187-
.flatMap(tx -> function
188-
.apply(conn)
189-
.compose(
190-
res -> tx
191-
.commit()
192-
.flatMap(v -> Future.succeededFuture(res)),
193-
err -> {
194-
if (err instanceof TransactionRollbackException) {
195-
return Future.failedFuture(err);
196-
} else {
197-
return tx
198-
.rollback()
199-
.compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
200-
}
201-
}))
202-
.onComplete(ar -> conn.close(v -> setPropagatableConnection(null))));
203-
}
204-
}
167+
/**
168+
* Like {@link #withTransaction(Function, Handler)} but keeps the connection accessible via the context
169+
* for the duration of the given {@code function}
170+
*/
171+
<T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function);
205172

206173
/**
207174
* Get a connection from the pool and execute the given {@code function}.

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.vertx.sqlclient.impl;
1717

18+
import io.vertx.codegen.annotations.Nullable;
1819
import io.vertx.core.AsyncResult;
1920
import io.vertx.core.Context;
2021
import io.vertx.core.Future;
@@ -67,21 +68,6 @@ public Future<SqlConnection> getConnection() {
6768
return delegate.getConnection();
6869
}
6970

70-
@Override
71-
public Future<SqlConnection> getPropagatableConnection() {
72-
return delegate.getPropagatableConnection();
73-
}
74-
75-
@Override
76-
public boolean propagatableConnectionIsActive() {
77-
return delegate.propagatableConnectionIsActive();
78-
}
79-
80-
@Override
81-
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
82-
return delegate.setPropagatableConnection(propagatableConnection);
83-
}
84-
8571
@Override
8672
public Query<RowSet<Row>> query(String sql) {
8773
return delegate.query(sql);
@@ -92,6 +78,11 @@ public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
9278
return delegate.preparedQuery(sql);
9379
}
9480

81+
@Override
82+
public <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
83+
return delegate.withPropagatedTransaction(function);
84+
}
85+
9586
@Override
9687
public P connectHandler(Handler<SqlConnection> handler) {
9788
delegate.connectHandler(handler);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.sqlclient.impl;
1919

20+
import io.vertx.codegen.annotations.Nullable;
2021
import io.vertx.core.*;
2122
import io.vertx.core.impl.CloseFuture;
2223
import io.vertx.core.impl.ContextInternal;
@@ -26,12 +27,12 @@
2627
import io.vertx.sqlclient.Pool;
2728
import io.vertx.sqlclient.PoolOptions;
2829
import io.vertx.sqlclient.SqlConnection;
30+
import io.vertx.sqlclient.TransactionRollbackException;
2931
import io.vertx.sqlclient.impl.command.CommandBase;
3032
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3133
import io.vertx.sqlclient.impl.tracing.QueryTracer;
3234
import io.vertx.sqlclient.spi.Driver;
3335

34-
import java.util.Objects;
3536
import java.util.function.Function;
3637

3738
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -53,7 +54,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
5354
private long timerID;
5455
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
5556

56-
private volatile SqlConnection propagatableConnection;
57+
private static final String PROPAGATABLE_CONNECTION = "propagatable_connection";
5758

5859
public PoolImpl(VertxInternal vertx,
5960
Driver driver,
@@ -157,22 +158,40 @@ public Future<SqlConnection> getConnection() {
157158
});
158159
}
159160

160-
@Override
161-
public Future<SqlConnection> getPropagatableConnection() {
162-
if (propagatableConnection == null) {
163-
return getConnection().onComplete(c -> setPropagatableConnection(c.result()));
161+
public <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
162+
SqlConnection sqlConnection = context().get(PROPAGATABLE_CONNECTION);
163+
if (sqlConnection == null) {
164+
return initializePropagatedConnectionAndTransaction(function);
164165
}
165-
return Future.succeededFuture(propagatableConnection);
166+
return Future.succeededFuture(sqlConnection)
167+
.flatMap(conn -> function.apply(conn)
168+
.onFailure(err -> {
169+
if (!(err instanceof TransactionRollbackException)) {
170+
conn.getTransaction().rollback();
171+
}
172+
}));
166173
}
167174

168-
@Override
169-
public boolean propagatableConnectionIsActive() {
170-
return propagatableConnection != null;
171-
}
172-
173-
@Override
174-
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
175-
return Future.future(handler -> this.propagatableConnection = propagatableConnection);
175+
private <T> Future<@Nullable T> initializePropagatedConnectionAndTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
176+
return getConnection().onComplete(handler -> context().put(PROPAGATABLE_CONNECTION, handler.result()))
177+
.flatMap(conn -> conn
178+
.begin()
179+
.flatMap(tx -> function
180+
.apply(conn)
181+
.compose(
182+
res -> tx
183+
.commit()
184+
.flatMap(v -> Future.succeededFuture(res)),
185+
err -> {
186+
if (err instanceof TransactionRollbackException) {
187+
return Future.failedFuture(err);
188+
} else {
189+
return tx
190+
.rollback()
191+
.compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
192+
}
193+
}))
194+
.onComplete(ar -> conn.close(v -> context().remove(PROPAGATABLE_CONNECTION))));
176195
}
177196

178197
@Override

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/TransactionTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,6 @@ public void testWithPropagatedConnectionTransactionCommit(TestContext ctx) {
321321
.query("SELECT id, val FROM mutable")
322322
.execute(ctx.asyncAssertSuccess(rows -> {
323323
ctx.assertEquals(3, rows.size());
324-
ctx.assertFalse(pool.propagatableConnectionIsActive());
325324
async.complete();
326325
}))));
327326
}

0 commit comments

Comments
 (0)