Skip to content

Commit 6236b0c

Browse files
authored
Implement cursors / streaming (#1293)
See #1185 Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 2abe496 commit 6236b0c

File tree

16 files changed

+395
-333
lines changed

16 files changed

+395
-333
lines changed

vertx-oracle-client/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ scalability and low overhead.
1111
* Java 8 Date and Time
1212
* SSL/TLS
1313
* RxJava API
14+
* Cursor
15+
* Row streaming
1416
1517
*Not supported yet*
1618

1719
* Prepared queries caching
18-
* Cursor
19-
* Row streaming
2020
* Stored Procedures
2121
2222
WARNING: this module is in tech preview

vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleException.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public OracleException(String message, int errorCode, String sqlState) {
2525
}
2626

2727
public OracleException(SQLException e) {
28-
super(e.getMessage(), e.getErrorCode(), e.getSQLState());
29-
initCause(e);
28+
super(e.getMessage(), e.getErrorCode(), e.getSQLState(), e);
3029
}
3130
}

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/CommandHandler.java

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717
import io.vertx.core.net.SocketAddress;
1818
import io.vertx.oracleclient.OracleConnectOptions;
1919
import io.vertx.oracleclient.OraclePrepareOptions;
20+
import io.vertx.oracleclient.impl.commands.PrepareStatementCommand;
21+
import io.vertx.oracleclient.impl.commands.SimpleQueryCommand;
2022
import io.vertx.oracleclient.impl.commands.*;
2123
import io.vertx.sqlclient.impl.Connection;
2224
import io.vertx.sqlclient.impl.PreparedStatement;
23-
import io.vertx.sqlclient.impl.QueryResultHandler;
24-
import io.vertx.sqlclient.impl.command.CommandBase;
25-
import io.vertx.sqlclient.impl.command.ExtendedQueryCommand;
26-
import io.vertx.sqlclient.impl.command.TxCommand;
25+
import io.vertx.sqlclient.impl.command.*;
2726
import io.vertx.sqlclient.spi.DatabaseMetadata;
2827
import oracle.jdbc.OracleConnection;
2928

3029
import java.sql.SQLException;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ConcurrentMap;
3132

3233
import static io.vertx.oracleclient.impl.Helper.*;
3334

@@ -36,6 +37,8 @@ public class CommandHandler implements Connection {
3637
private final ContextInternal context;
3738
private final OracleConnectOptions options;
3839
private Holder holder;
40+
@SuppressWarnings("rawtypes")
41+
private ConcurrentMap<String, RowReader> cursors = new ConcurrentHashMap<>();
3942

4043
public CommandHandler(ContextInternal ctx, OracleConnectOptions options, OracleConnection oc) {
4144
this.context = ctx;
@@ -123,7 +126,7 @@ public Future<Void> beforeRecycle() {
123126
public <R> Future<R> schedule(ContextInternal contextInternal, CommandBase<R> commandBase) {
124127
Future<R> result;
125128
if (commandBase instanceof io.vertx.sqlclient.impl.command.SimpleQueryCommand) {
126-
result = (Future<R>) handle((io.vertx.sqlclient.impl.command.SimpleQueryCommand) commandBase);
129+
result = (Future<R>) handle((io.vertx.sqlclient.impl.command.SimpleQueryCommand<?>) commandBase);
127130
} else if (commandBase instanceof io.vertx.sqlclient.impl.command.PrepareStatementCommand) {
128131
result = (Future<R>) handle((io.vertx.sqlclient.impl.command.PrepareStatementCommand) commandBase);
129132
} else if (commandBase instanceof ExtendedQueryCommand) {
@@ -132,6 +135,10 @@ public <R> Future<R> schedule(ContextInternal contextInternal, CommandBase<R> co
132135
result = handle((TxCommand<R>) commandBase);
133136
} else if (commandBase instanceof PingCommand) {
134137
result = (Future<R>) handle((PingCommand) commandBase);
138+
} else if (commandBase instanceof CloseStatementCommand) {
139+
result = context.succeededFuture();
140+
} else if (commandBase instanceof CloseCursorCommand) {
141+
result = (Future<R>) handle((CloseCursorCommand) commandBase);
135142
} else {
136143
result = context.failedFuture("Not yet implemented " + commandBase);
137144
}
@@ -158,42 +165,46 @@ public <R> Future<R> schedule(ContextInternal contextInternal, CommandBase<R> co
158165
});
159166
}
160167

168+
private Future<Void> handle(CloseCursorCommand cmd) {
169+
RowReader<?, ?> reader = cursors.remove(cmd.id());
170+
if (reader == null) {
171+
return context.succeededFuture();
172+
}
173+
return reader.close();
174+
}
175+
161176
private Future<Integer> handle(PingCommand ping) {
162177
return ping.execute(connection, context);
163178
}
164179

165-
@SuppressWarnings({ "unchecked", "rawtypes" })
166-
private <R> Future<Boolean> handle(io.vertx.sqlclient.impl.command.SimpleQueryCommand command) {
167-
QueryCommand<?, R> action = new SimpleQueryCommand<>(command.sql(), command.collector());
168-
return handle(action, command.resultHandler());
180+
@SuppressWarnings({"unchecked", "rawtypes"})
181+
private <R> Future<Boolean> handle(io.vertx.sqlclient.impl.command.SimpleQueryCommand cmd) {
182+
QueryCommand<?, R> action = new SimpleQueryCommand<>(cmd, cmd.collector());
183+
return action.execute(connection, context);
169184
}
170185

171186
private Future<PreparedStatement> handle(io.vertx.sqlclient.impl.command.PrepareStatementCommand command) {
172187
PrepareStatementCommand action = new PrepareStatementCommand(OraclePrepareOptions.createFrom(command.options()), command.sql());
173188
return action.execute(connection, context);
174189
}
175190

176-
private <R> Future<Boolean> handle(QueryCommand<?, R> action, QueryResultHandler<R> handler) {
177-
Future<OracleResponse<R>> fut = action.execute(connection, context);
178-
return fut
179-
.onSuccess(ar -> ar.handle(handler)).map(false)
180-
.onFailure(t -> holder.handleException(t));
181-
182-
}
183-
184-
private <R> Future<Boolean> handle(ExtendedQueryCommand<R> command) {
185-
if (command.cursorId() != null) {
186-
QueryCommand<?, R> cmd = new OracleCursorQueryCommand<>(command, command.params());
187-
return cmd.execute(connection, context)
188-
.map(false);
191+
@SuppressWarnings("unchecked")
192+
private <R> Future<Boolean> handle(ExtendedQueryCommand<R> cmd) {
193+
AbstractCommand<Boolean> action;
194+
String cursorId = cmd.cursorId();
195+
if (cursorId != null) {
196+
RowReader<?, R> rowReader = cursors.get(cursorId);
197+
if (rowReader != null) {
198+
action = new OracleCursorFetchCommand<>(cmd, rowReader);
199+
} else {
200+
action = new OracleCursorQueryCommand<>(cmd, cmd.collector(), rr -> cursors.put(cursorId, rr));
201+
}
202+
} else if (cmd.isBatch()) {
203+
action = new OraclePreparedBatch<>(cmd, cmd.collector());
204+
} else {
205+
action = new OraclePreparedQuery<>(cmd, cmd.collector());
189206
}
190-
191-
QueryCommand<?, R> action =
192-
command.isBatch() ?
193-
new OraclePreparedBatch<>(command, command.collector(), command.paramsList())
194-
: new OraclePreparedQuery<>(command, command.collector(), command.params());
195-
196-
return handle(action, command.resultHandler());
207+
return action.execute(connection, context);
197208
}
198209

199210
private <R> Future<R> handle(TxCommand<R> command) {

0 commit comments

Comments
 (0)