Skip to content

Commit bf4690d

Browse files
Create propagatable connection, add Transaction accessor
1 parent 45983da commit bf4690d

File tree

6 files changed

+113
-0
lines changed

6 files changed

+113
-0
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ static Pool pool(Vertx vertx, SqlConnectOptions database, PoolOptions options) {
9898
*/
9999
Future<SqlConnection> getConnection();
100100

101+
Future<SqlConnection> getPropagatableConnection();
102+
103+
boolean propagatableConnectionIsActive();
104+
105+
Future<Void> setPropagatableConnection(SqlConnection propagatableConnection);
106+
101107
/**
102108
* {@inheritDoc}
103109
*
@@ -164,6 +170,39 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
164170
.onComplete(ar -> conn.close()));
165171
}
166172

173+
174+
default <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
175+
if (propagatableConnectionIsActive()) {
176+
return getPropagatableConnection()
177+
.flatMap(conn -> function.apply(conn)
178+
.onFailure(err -> {
179+
if (!(err instanceof TransactionRollbackException)) {
180+
conn.getTransaction().rollback();
181+
}
182+
}));
183+
} else {
184+
return getPropagatableConnection()
185+
.flatMap(conn -> conn
186+
.begin()
187+
.flatMap(tx -> function
188+
.apply(conn)
189+
.compose(
190+
res -> tx
191+
.commit()
192+
.flatMap(v -> Future.succeededFuture(res)),
193+
err -> {
194+
if (err instanceof TransactionRollbackException) {
195+
return Future.failedFuture(err);
196+
} else {
197+
return tx
198+
.rollback()
199+
.compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
200+
}
201+
}))
202+
.onComplete(ar -> conn.close(v -> setPropagatableConnection(null))));
203+
}
204+
}
205+
167206
/**
168207
* Get a connection from the pool and execute the given {@code function}.
169208
*

vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public interface SqlConnection extends SqlClient {
9292
*/
9393
Future<Transaction> begin();
9494

95+
Transaction getTransaction();
96+
9597
/**
9698
* @return whether the connection uses SSL
9799
*/

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,21 @@ public Future<SqlConnection> getConnection() {
6767
return delegate.getConnection();
6868
}
6969

70+
@Override
71+
public Future<SqlConnection> getPropagatableConnection() {
72+
return delegate.getPropagatableConnection();
73+
}
74+
75+
@Override
76+
public boolean propagatableConnectionIsActive() {
77+
return delegate.propagatableConnectionIsActive();
78+
}
79+
80+
@Override
81+
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
82+
return delegate.setPropagatableConnection(propagatableConnection);
83+
}
84+
7085
@Override
7186
public Query<RowSet<Row>> query(String sql) {
7287
return delegate.query(sql);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
5353
private long timerID;
5454
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
5555

56+
private volatile SqlConnection propagatableConnection;
57+
5658
public PoolImpl(VertxInternal vertx,
5759
Driver driver,
5860
QueryTracer tracer,
@@ -155,6 +157,24 @@ public Future<SqlConnection> getConnection() {
155157
});
156158
}
157159

160+
@Override
161+
public Future<SqlConnection> getPropagatableConnection() {
162+
if (propagatableConnection == null) {
163+
return getConnection().onComplete(c -> setPropagatableConnection(c.result()));
164+
}
165+
return Future.succeededFuture(propagatableConnection);
166+
}
167+
168+
@Override
169+
public boolean propagatableConnectionIsActive() {
170+
return propagatableConnection != null;
171+
}
172+
173+
@Override
174+
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
175+
return Future.future(handler -> this.propagatableConnection = propagatableConnection);
176+
}
177+
158178
@Override
159179
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
160180
Object metric;

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ public Future<Transaction> begin() {
165165
return tx.begin();
166166
}
167167

168+
@Override
169+
public Transaction getTransaction() {
170+
return tx;
171+
}
172+
168173
@Override
169174
boolean autoCommit() {
170175
return tx == null;

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/TransactionTestBase.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,36 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
308308
}));
309309
}));
310310
}
311+
312+
@Test
313+
public void testWithPropagatedConnectionTransactionCommit(TestContext ctx) {
314+
Async async = ctx.async();
315+
Pool pool = createPool();
316+
pool.withPropagatedTransaction(c ->
317+
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
318+
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
319+
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
320+
).onComplete(ctx.asyncAssertSuccess(v -> pool
321+
.query("SELECT id, val FROM mutable")
322+
.execute(ctx.asyncAssertSuccess(rows -> {
323+
ctx.assertEquals(3, rows.size());
324+
ctx.assertFalse(pool.propagatableConnectionIsActive());
325+
async.complete();
326+
}))));
327+
}
328+
329+
@Test
330+
public void testWithPropagatedConnectionTransactionRollback(TestContext ctx) {
331+
Async async = ctx.async();
332+
Pool pool = createPool();
333+
Throwable failure = new Throwable();
334+
pool.withPropagatedTransaction(c ->
335+
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(v -> Future.failedFuture(failure)))
336+
).onComplete(ctx.asyncAssertFailure(v -> pool
337+
.query("SELECT id, val FROM mutable")
338+
.execute(ctx.asyncAssertSuccess(rows -> {
339+
ctx.assertEquals(0, rows.size());
340+
async.complete();
341+
}))));
342+
}
311343
}

0 commit comments

Comments
 (0)