Skip to content

Commit 9830530

Browse files
authored
Implement command scheduling for Oracle client (#1294)
See #1087 Instead of executing commands in any order, or worse, concurrently on the same JDBC connection. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 78db24d commit 9830530

21 files changed

+579
-463
lines changed

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/CommandHandler.java

Lines changed: 0 additions & 214 deletions
This file was deleted.

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/Helper.java

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License 2.0 which is available at
@@ -12,20 +12,14 @@
1212

1313
import io.vertx.core.*;
1414
import io.vertx.core.buffer.Buffer;
15-
import io.vertx.core.impl.ContextInternal;
1615
import io.vertx.oracleclient.OracleException;
1716
import io.vertx.sqlclient.Tuple;
1817
import oracle.sql.TIMESTAMPTZ;
1918

2019
import java.sql.*;
21-
import java.util.ArrayList;
22-
import java.util.List;
23-
import java.util.concurrent.Flow;
2420
import java.util.function.Function;
2521
import java.util.function.Supplier;
2622

27-
import static io.vertx.oracleclient.impl.FailureUtil.sanitize;
28-
2923
public class Helper {
3024

3125
public static void closeQuietly(AutoCloseable autoCloseable) {
@@ -78,66 +72,6 @@ public static void runOrHandleSQLException(ThrowingRunnable runnable)
7872
}
7973
}
8074

81-
public static <T> Future<T> first(Flow.Publisher<T> publisher, ContextInternal context) {
82-
Promise<T> promise = context.promise();
83-
publisher.subscribe(new Flow.Subscriber<>() {
84-
volatile Flow.Subscription subscription;
85-
86-
@Override
87-
public void onSubscribe(Flow.Subscription subscription) {
88-
this.subscription = subscription;
89-
subscription.request(1);
90-
}
91-
92-
@Override
93-
public void onNext(T item) {
94-
context.runOnContext(x -> promise.tryComplete(item));
95-
subscription.cancel();
96-
}
97-
98-
@Override
99-
public void onError(Throwable throwable) {
100-
promise.fail(sanitize(throwable));
101-
}
102-
103-
@Override
104-
public void onComplete() {
105-
// Use tryComplete as the completion signal can be sent even if we cancelled.
106-
// Also for Publisher<Void> we would get in this case.
107-
promise.tryComplete(null);
108-
}
109-
});
110-
return promise.future();
111-
}
112-
113-
public static <T> Future<List<T>> collect(Flow.Publisher<T> publisher, ContextInternal context) {
114-
Promise<List<T>> promise = context.promise();
115-
publisher.subscribe(new Flow.Subscriber<>() {
116-
final List<T> list = new ArrayList<>();
117-
118-
@Override
119-
public void onSubscribe(Flow.Subscription subscription) {
120-
subscription.request(Long.MAX_VALUE);
121-
}
122-
123-
@Override
124-
public void onNext(T item) {
125-
list.add(item);
126-
}
127-
128-
@Override
129-
public void onError(Throwable throwable) {
130-
promise.fail(sanitize(throwable));
131-
}
132-
133-
@Override
134-
public void onComplete() {
135-
promise.complete(list);
136-
}
137-
});
138-
return promise.future();
139-
}
140-
14175
public static Object convertSqlValue(Object value) throws SQLException {
14276
if (value == null) {
14377
return null;

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
import oracle.jdbc.OracleConnection;
2323
import oracle.jdbc.datasource.OracleDataSource;
2424

25-
import java.sql.SQLException;
26-
25+
import static io.vertx.oracleclient.impl.Helper.executeBlocking;
2726
import static io.vertx.oracleclient.impl.OracleDatabaseHelper.createDataSource;
2827

2928
public class OracleConnectionFactory implements ConnectionFactory {
@@ -45,14 +44,10 @@ public void close(Promise<Void> promise) {
4544
public Future<SqlConnection> connect(Context context) {
4645
ContextInternal ctx = (ContextInternal) context;
4746
QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options);
48-
return context.<OracleConnection>executeBlocking(prom -> {
49-
try {
50-
prom.complete(datasource.createConnectionBuilder().build());
51-
} catch (SQLException e) {
52-
prom.fail(e);
53-
}
54-
}).map(ora -> {
55-
CommandHandler conn = new CommandHandler((ContextInternal) context, options, ora);
47+
return executeBlocking(context, () -> {
48+
OracleConnection orac = datasource.createConnectionBuilder().build();
49+
OracleMetadata metadata = new OracleMetadata(orac.getMetaData());
50+
OracleJdbcConnection conn = new OracleJdbcConnection(ctx, options, orac, metadata);
5651
OracleConnectionImpl msConn = new OracleConnectionImpl(ctx, this, conn, tracer, null);
5752
conn.init(msConn);
5853
return msConn;

0 commit comments

Comments
 (0)