Skip to content

Commit b170bdc

Browse files
authored
Simplified transaction API improvements (#574)
Fixes - #421 - #583 - #584 - #585 - #586
1 parent c3e9a3b commit b170bdc

File tree

36 files changed

+910
-746
lines changed

36 files changed

+910
-746
lines changed

vertx-db2-client/src/main/java/examples/SqlClientExamples.java

Lines changed: 89 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -210,32 +210,38 @@ public void transaction01(Pool pool) {
210210
SqlConnection conn = res.result();
211211

212212
// Begin the transaction
213-
Transaction tx = conn.begin();
214-
215-
// Various statements
216-
conn
217-
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
218-
.execute(ar1 -> {
219-
if (ar1.succeeded()) {
213+
conn.begin(ar0 -> {
214+
if (ar0.succeeded()) {
215+
Transaction tx = ar0.result();
216+
// Various statements
220217
conn
221-
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
222-
.execute(ar2 -> {
223-
if (ar2.succeeded()) {
224-
// Commit the transaction
225-
tx.commit(ar3 -> {
226-
if (ar3.succeeded()) {
227-
System.out.println("Transaction succeeded");
228-
} else {
229-
System.out.println("Transaction failed " + ar3.cause().getMessage());
230-
}
218+
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
219+
.execute(ar1 -> {
220+
if (ar1.succeeded()) {
221+
conn
222+
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
223+
.execute(ar2 -> {
224+
if (ar2.succeeded()) {
225+
// Commit the transaction
226+
tx.commit(ar3 -> {
227+
if (ar3.succeeded()) {
228+
System.out.println("Transaction succeeded");
229+
} else {
230+
System.out.println("Transaction failed " + ar3.cause().getMessage());
231+
}
232+
// Return the connection to the pool
233+
conn.close();
234+
});
235+
} else {
236+
// Return the connection to the pool
237+
conn.close();
238+
}
239+
});
240+
} else {
231241
// Return the connection to the pool
232242
conn.close();
233-
});
234-
} else {
235-
// Return the connection to the pool
236-
conn.close();
237-
}
238-
});
243+
}
244+
});
239245
} else {
240246
// Return the connection to the pool
241247
conn.close();
@@ -246,69 +252,62 @@ public void transaction01(Pool pool) {
246252
}
247253

248254
public void transaction02(Transaction tx) {
249-
tx.abortHandler(v -> {
255+
tx.completion().onFailure(err -> {
250256
System.out.println("Transaction failed => rollbacked");
251257
});
252258
}
253259

254260
public void transaction03(Pool pool) {
255261

256262
// Acquire a transaction and begin the transaction
257-
pool.begin(res -> {
258-
if (res.succeeded()) {
259-
260-
// Get the transaction
261-
Transaction tx = res.result();
262-
263-
// Various statements
264-
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
265-
.execute(ar1 -> {
266-
if (ar1.succeeded()) {
267-
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
268-
.execute(ar2 -> {
269-
if (ar2.succeeded()) {
270-
// Commit the transaction
271-
// the connection will automatically return to the pool
272-
tx.commit(ar3 -> {
273-
if (ar3.succeeded()) {
274-
System.out.println("Transaction succeeded");
275-
} else {
276-
System.out.println("Transaction failed " + ar3.cause().getMessage());
277-
}
278-
});
279-
}
280-
});
281-
} else {
282-
// No need to close connection as transaction will abort and be returned to the pool
283-
}
284-
});
263+
pool.withTransaction(client -> client
264+
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
265+
.execute()
266+
.flatMap(res -> client
267+
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
268+
.execute()
269+
// Map to a message result
270+
.map("Users inserted"))
271+
).onComplete(ar -> {
272+
// The connection was automatically return to the pool
273+
if (ar.succeeded()) {
274+
// Transaction was committed
275+
String message = ar.result();
276+
System.out.println("Transaction succeeded: " + message);
277+
} else {
278+
// Transaction was rolled back
279+
System.out.println("Transaction failed " + ar.cause().getMessage());
285280
}
286281
});
287282
}
288283

289284
public void usingCursors01(SqlConnection connection) {
290-
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar1 -> {
291-
if (ar1.succeeded()) {
292-
PreparedStatement pq = ar1.result();
285+
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
286+
if (ar0.succeeded()) {
287+
PreparedStatement pq = ar0.result();
293288

294289
// Cursors require to run within a transaction
295-
Transaction tx = connection.begin();
290+
connection.begin(ar1 -> {
291+
if (ar1.succeeded()) {
292+
Transaction tx = ar1.result();
296293

297-
// Create a cursor
298-
Cursor cursor = pq.cursor(Tuple.of("julien"));
294+
// Create a cursor
295+
Cursor cursor = pq.cursor(Tuple.of("julien"));
299296

300-
// Read 50 rows
301-
cursor.read(50, ar2 -> {
302-
if (ar2.succeeded()) {
303-
RowSet<Row> rows = ar2.result();
304-
305-
// Check for more ?
306-
if (cursor.hasMore()) {
307-
// Repeat the process...
308-
} else {
309-
// No more rows - commit the transaction
310-
tx.commit();
311-
}
297+
// Read 50 rows
298+
cursor.read(50, ar2 -> {
299+
if (ar2.succeeded()) {
300+
RowSet<Row> rows = ar2.result();
301+
302+
// Check for more ?
303+
if (cursor.hasMore()) {
304+
// Repeat the process...
305+
} else {
306+
// No more rows - commit the transaction
307+
tx.commit();
308+
}
309+
}
310+
});
312311
}
313312
});
314313
}
@@ -325,26 +324,30 @@ public void usingCursors02(Cursor cursor) {
325324
}
326325

327326
public void usingCursors03(SqlConnection connection) {
328-
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar1 -> {
329-
if (ar1.succeeded()) {
330-
PreparedStatement pq = ar1.result();
327+
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
328+
if (ar0.succeeded()) {
329+
PreparedStatement pq = ar0.result();
331330

332331
// Streams require to run within a transaction
333-
Transaction tx = connection.begin();
332+
connection.begin(ar1 -> {
333+
if (ar1.succeeded()) {
334+
Transaction tx = ar1.result();
334335

335-
// Fetch 50 rows at a time
336-
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
336+
// Fetch 50 rows at a time
337+
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
337338

338-
// Use the stream
339-
stream.exceptionHandler(err -> {
340-
System.out.println("Error: " + err.getMessage());
341-
});
342-
stream.endHandler(v -> {
343-
tx.commit();
344-
System.out.println("End of stream");
345-
});
346-
stream.handler(row -> {
347-
System.out.println("User: " + row.getString("last_name"));
339+
// Use the stream
340+
stream.exceptionHandler(err -> {
341+
System.out.println("Error: " + err.getMessage());
342+
});
343+
stream.endHandler(v -> {
344+
tx.commit();
345+
System.out.println("End of stream");
346+
});
347+
stream.handler(row -> {
348+
System.out.println("User: " + row.getString("last_name"));
349+
});
350+
}
348351
});
349352
}
350353
});

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public class DB2SocketConnection extends SocketConnectionBase {
4040
private DB2Codec codec;
4141
private Handler<Void> closeHandler;
4242

43-
public DB2SocketConnection(NetSocketInternal socket,
43+
public DB2SocketConnection(NetSocketInternal socket,
4444
boolean cachePreparedStatements,
45-
int preparedStatementCacheSize,
45+
int preparedStatementCacheSize,
4646
int preparedStatementCacheSqlLimit,
4747
int pipeliningLimit,
4848
ContextInternal context) {
@@ -65,37 +65,37 @@ public void init() {
6565
pipeline.addBefore("handler", "codec", codec);
6666
super.init();
6767
}
68-
68+
6969
@Override
7070
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
7171
if (cmd instanceof TxCommand) {
72-
TxCommand txCmd = (TxCommand) cmd;
73-
if (TxCommand.BEGIN.sql.equals(txCmd.sql)) {
72+
TxCommand<R> txCmd = (TxCommand<R>) cmd;
73+
if (txCmd.kind == TxCommand.Kind.BEGIN) {
7474
// DB2 always implicitly starts a transaction with each query, and does
7575
// not support the 'BEGIN' keyword. Instead we can no-op BEGIN commands
7676
cmd.handler = handler;
77-
cmd.complete(CommandResponse.success((R) null).toAsyncResult());
77+
cmd.complete(CommandResponse.success(txCmd.result).toAsyncResult());
7878
} else {
7979
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(
80-
txCmd.sql,
80+
txCmd.kind.sql,
8181
false,
8282
false,
8383
QueryCommandBase.NULL_COLLECTOR,
8484
QueryResultHandler.NOOP_HANDLER);
85-
super.doSchedule(cmd2, ar -> handler.handle(ar.mapEmpty()));
86-
85+
super.doSchedule(cmd2, ar -> handler.handle(ar.map(txCmd.result)));
86+
8787
}
8888
} else {
8989
super.doSchedule(cmd, handler);
9090
}
9191
}
92-
92+
9393
@Override
9494
public void handleClose(Throwable t) {
9595
super.handleClose(t);
9696
context().runOnContext(closeHandler);
9797
}
98-
98+
9999
public DB2SocketConnection closeHandler(Handler<Void> handler) {
100100
closeHandler = handler;
101101
return this;

vertx-db2-client/src/test/java/io/vertx/db2client/tck/DB2TransactionTest.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,8 @@ public class DB2TransactionTest extends TransactionTestBase {
3737
public static DB2Resource rule = DB2Resource.SHARED_INSTANCE;
3838

3939
@Override
40-
protected void initConnector() {
41-
connector = handler -> {
42-
if (pool == null) {
43-
pool = DB2Pool.pool(vertx, new DB2ConnectOptions(rule.options()), new PoolOptions().setMaxSize(1));
44-
}
45-
pool.begin(handler);
46-
};
40+
protected Pool createPool() {
41+
return DB2Pool.pool(vertx, new DB2ConnectOptions(rule.options()), new PoolOptions().setMaxSize(1));
4742
}
4843

4944
@Override
@@ -54,18 +49,14 @@ protected Pool nonTxPool() {
5449
@Override
5550
protected void cleanTestTable(TestContext ctx) {
5651
// use DELETE FROM because DB2 does not support TRUNCATE TABLE
57-
connector.accept(ctx.asyncAssertSuccess(conn -> {
58-
conn.query("DELETE FROM mutable").execute(ctx.asyncAssertSuccess(result -> {
59-
conn.close();
60-
}));
61-
}));
52+
getPool().query("DELETE FROM mutable").execute(ctx.asyncAssertSuccess());
6253
}
6354

6455
@Override
6556
protected String statement(String... parts) {
6657
return String.join("?", parts);
6758
}
68-
59+
6960
@Test
7061
public void testDelayedCommit(TestContext ctx) {
7162
assumeFalse("DB2 on Z holds write locks on inserted columns with isolation level = 2", rule.isZOS());

0 commit comments

Comments
 (0)