handler);
+ /**
+ * Imports data into a database.
+ *
+ * Use this method when importing opaque bytes, e.g. from a CSV file.
+ *
+ *
If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
+ *
+ * @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
+ * @param from byte stream data will be fetched from
+ * @return result set with single field {@code rowsWritten}
+ */
+ Query> copyFromBytes(String sql, Buffer from);
+
+ /**
+ * Exports data from a database with decoding.
+ *
+ * {@code FORMAT} can only be {@code binary}.
+ *
+ * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
+ * @return decoded records
+ */
+ Query> copyToRows(String sql);
+
+ /**
+ * Exports data from a database as-is, without decoding.
+ *
+ * Use this method when exporting opaque bytes, e.g. to a CSV file.
+ *
+ * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
+ * @return async result of bytes container data will be written to
+ *
+ * - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
+ * - future of read stream.
+ * - when we do query operation
+ * - we should not use query result builder
+ * - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ?
+ * - we need a new interface Future>
+ * - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
+ * - PgSocketConnection - we will apply backpressure in SocketInternal
+ */
+ Future> copyToBytes(String sql);
+
/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
*
Note: Use this with caution because the cancellation signal may or may not have any effect.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index d14b208eb..5161e8d51 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -19,18 +19,36 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.netty.buffer.ByteBuf;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
+import io.vertx.pgclient.impl.codec.CopyOutCommand;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
+import io.vertx.sqlclient.Query;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowSet;
+import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
+import io.vertx.sqlclient.impl.QueryExecutor;
+import io.vertx.sqlclient.impl.QueryResultBuilder;
+import io.vertx.sqlclient.impl.QueryResultHandler;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
+import io.vertx.sqlclient.impl.SqlResultImpl;
+import io.vertx.sqlclient.impl.command.QueryCommandBase;
+import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
+
+import java.util.function.Function;
+import java.util.stream.Collector;
public class PgConnectionImpl extends SqlConnectionBase implements PgConnection {
@@ -107,6 +125,32 @@ public PgConnection noticeHandler(Handler handler) {
return this;
}
+ @Override
+ public Query> copyFromBytes(String sql, Buffer from) {
+ return null;
+ }
+
+ @Override
+ public Query> copyToRows(String sql) {
+ return null;
+ }
+
+ @Override
+ public Future> copyToBytes(String sql) {
+ Function> factory = (buffer) -> new SqlResultImpl<>(buffer);
+ PromiseInternal> promise = context.promise();
+
+ // currently, this loads entire content into Buffer
+ // it should stream bytes out instead
+ // TODO: signal completion as soon as the database replied CopyOutResponse 'H' ?
+ QueryResultBuilder, SqlResult> resultHandler =
+ new QueryResultBuilder<>(factory, promise);
+
+ CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
+ this.schedule(promise.context(), cmd).onComplete(resultHandler);
+ return promise.future();
+ }
+
@Override
public int processId() {
return conn.getProcessId();
@@ -126,4 +170,5 @@ public Future cancelRequest() {
});
return promise.future();
}
+
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
new file mode 100644
index 000000000..13170fb2d
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -0,0 +1,43 @@
+package io.vertx.pgclient.impl.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.sqlclient.SqlResult;
+import io.vertx.sqlclient.impl.QueryResultBuilder;
+import io.vertx.sqlclient.impl.SqlResultImpl;
+import io.vertx.sqlclient.impl.command.CommandBase;
+
+import java.util.stream.Collector;
+
+public class CopyOutCommand extends CommandBase {
+ private final String sql;
+ private final Collector collector;
+ private final QueryResultBuilder, SqlResult> resultHandler;
+
+ public CopyOutCommand(
+ String sql,
+ QueryResultBuilder, SqlResult> resultHandler
+ ) {
+ this.sql = sql;
+ this.resultHandler = resultHandler;
+ this.collector = Collector.of(
+ Buffer::buffer,
+ // TODO: this might be unnecessary slow - think of alternatives
+ (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
+ (v1, v2) -> null,
+ (finalResult) -> finalResult
+ );
+ }
+
+ QueryResultBuilder, SqlResult> resultHandler() {
+ return resultHandler;
+ }
+
+ String sql() {
+ return sql;
+ }
+
+ Collector collector() {
+ return collector;
+ }
+}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
new file mode 100644
index 000000000..5f23a7651
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
@@ -0,0 +1,43 @@
+package io.vertx.pgclient.impl.codec;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.BufferImpl;
+import io.vertx.sqlclient.SqlResult;
+import io.vertx.sqlclient.impl.SqlResultImpl;
+
+class CopyOutCommandCodec extends PgCommandCodec {
+ CopyOutDataDecoder decoder;
+
+ CopyOutCommandCodec(CopyOutCommand cmd) {
+ super(cmd);
+ decoder = new CopyOutDataDecoder(cmd.collector());
+ }
+
+ @Override
+ public void handleCommandComplete(int updated) {
+ this.result = false;
+ Buffer result;
+ Throwable failure;
+ int size;
+ if (decoder != null) {
+ failure = decoder.complete();
+ result = decoder.result();
+ size = decoder.size();
+ decoder.reset();
+ } else {
+ failure = null;
+ result = new BufferImpl();
+ size = 0;
+ }
+ cmd.resultHandler().handleResult(updated, size, null, result, failure);
+ }
+
+ @Override
+ public void handleErrorResponse(ErrorResponse errorResponse) {
+ failure = errorResponse.toException();
+ }
+
+ void encode(PgEncoder encoder) {
+ encoder.writeQuery(new Query(cmd.sql()));
+ }
+}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
new file mode 100644
index 000000000..320fc13a8
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
@@ -0,0 +1,71 @@
+package io.vertx.pgclient.impl.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.vertx.core.buffer.Buffer;
+import java.util.function.BiConsumer;
+import java.util.stream.Collector;
+
+public class CopyOutDataDecoder {
+
+ private final Collector collector;
+ private BiConsumer accumulator;
+
+ private int size;
+ private Buffer container;
+ private Throwable failure;
+ private Buffer result;
+
+ protected CopyOutDataDecoder(Collector collector) {
+ this.collector = collector;
+ reset();
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public void handleChunk(ByteBuf in) {
+ if (failure != null) {
+ return;
+ }
+ if (accumulator == null) {
+ try {
+ accumulator = collector.accumulator();
+ } catch (Exception e) {
+ failure = e;
+ return;
+ }
+ }
+ try {
+ accumulator.accept(container, in);
+ } catch (Exception e) {
+ failure = e;
+ return;
+ }
+ size++;
+ }
+
+ public Buffer result() {
+ return result;
+ }
+
+ public Throwable complete() {
+ try {
+ result = collector.finisher().apply(container);
+ } catch (Exception e) {
+ failure = e;
+ }
+ return failure;
+ }
+
+ public void reset() {
+ size = 0;
+ failure = null;
+ result = null;
+ try {
+ this.container = collector.supplier().get();
+ } catch (Exception e) {
+ failure = e;
+ }
+ }
+}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
index 058c0d629..eb58ff342 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
@@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) {
decodeNotificationResponse(ctx, in);
break;
}
+ // TODO: check if these handlers need to be at this level of loop
+ // TODO: check if COPY needs a separate loop
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: {
+ decodeCopyOutResponse(ctx, in);
+ break;
+ }
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: {
+ decodeCopyData(ctx, in);
+ break;
+ }
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: {
+ decodeCopyCompletion(ctx, in);
+ break;
+ }
default: {
throw new UnsupportedOperationException();
}
@@ -455,4 +469,14 @@ private void decodeBackendKeyData(ByteBuf in) {
private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in)));
}
+
+ private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}
+
+ private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
+ PgCommandCodec, ?> codec = inflight.peek();
+ CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
+ cmdCodec.decoder.handleChunk(in);
+ }
+
+ private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
index d35200cb8..0f10e1aed 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
@@ -119,6 +119,8 @@ void write(CommandBase> cmd) {
return new ClosePortalCommandCodec((CloseCursorCommand) cmd);
} else if (cmd instanceof CloseStatementCommand) {
return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
+ } else if (cmd instanceof CopyOutCommand) {
+ return new CopyOutCommandCodec((CopyOutCommand) cmd);
}
throw new AssertionError();
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
index bc8fa9a0d..dd487ce7e 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
@@ -71,4 +71,15 @@ public class PgProtocolConstants {
public static final byte MESSAGE_TYPE_FUNCTION_RESULT = 'V';
public static final byte MESSAGE_TYPE_SSL_YES = 'S';
public static final byte MESSAGE_TYPE_SSL_NO = 'N';
+
+ /**
+ * COPY-out messages.
+ *
+ * Other messages which might appear in between CopyData:
+ *
NoticeResponse
+ * ParameterStatus
+ */
+ public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H';
+ public static final byte MESSAGE_TYPE_COPY_DATA = 'd';
+ public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c';
}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
new file mode 100644
index 000000000..67579a901
--- /dev/null
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -0,0 +1,104 @@
+package io.vertx.pgclient;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class PgConnectionCopyTest extends PgConnectionTestBase {
+ public PgConnectionCopyTest() {
+ connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> {
+ handler.handle(ar.map(p -> p));
+ });
+ }
+
+ @Test
+ public void testCopyToRows(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ deleteFromTestTable(ctx, conn, () -> {
+ insertIntoTestTable(ctx, conn, 10, () -> {
+ PgConnection pgConn = (PgConnection) conn;
+ pgConn.copyToRows("COPY my_table TO STDOUT (FORMAT binary)")
+ .execute()
+ .onComplete(ctx.asyncAssertSuccess(result -> {
+ for (int i = 0; i < 2; i++) {
+ ctx.assertEquals(1, result.rowCount());
+ result = result.next();
+ }
+ async.complete();
+ }));
+ });
+ });
+ }));
+ }
+
+ @Test
+ public void testCopyToCsvBytes(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ deleteFromTestTable(ctx, conn, () -> {
+ insertIntoTestTable(ctx, conn, 1, () -> {
+ PgConnection pgConn = (PgConnection) conn;
+ pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)")
+ .onComplete(ctx.asyncAssertSuccess(result -> {
+ ctx.assertNull(result.columnDescriptors());
+ ctx.assertEquals(10, result.rowCount());
+ ctx.assertEquals(10, result.size());
+ ctx.assertEquals(
+ Buffer.buffer(
+ "0,Whatever-0\n" +
+ "1,Whatever-1\n" +
+ "2,Whatever-2\n" +
+ "3,Whatever-3\n" +
+ "4,Whatever-4\n" +
+ "5,Whatever-5\n" +
+ "6,Whatever-6\n" +
+ "7,Whatever-7\n" +
+ "8,Whatever-8\n" +
+ "9,Whatever-9\n"
+ ),
+ result.value()
+ );
+ async.complete();
+ }));
+ });
+ });
+ }));
+ }
+
+ /**
+ * Just a thingy to eavesdrop protocol interactions.
+ *
+ * tips:
+ * - frontend / backend protocol -> message flow -> binary
+ * - start with CommandBase, SimpleQueryCommandCodecBase, builder.executeSimpleQuery, QueryExecutor, QueryResultBuilder
+ * - PgDecoder
+ * - startup message
+ * - auth
+ * - Simple Query
+ * - use wireshark - `tcp port 5432`
+ * - add VM option - port - such that it's fixed
+ *
+ * TODO: drop this.
+ *
+ * @param ctx
+ */
+ @Test
+ public void testSimpleQuery(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ conn
+ .query("select 1")
+ .execute()
+ // when does the result is transformed from bool to rows?
+ .onComplete(ctx.asyncAssertSuccess(result1 -> {
+ ctx.assertEquals(1, result1.size());
+ async.complete();
+ }));
+ }));
+ }
+}
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
index 182472c86..67019b057 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
@@ -34,7 +34,7 @@
/**
* Executes query.
*/
-class QueryExecutor, L extends SqlResult> {
+public class QueryExecutor, L extends SqlResult> {
private final Function factory;
private final Collector collector;
@@ -49,7 +49,7 @@ private QueryResultBuilder createHandler(PromiseInternal promise) {
return new QueryResultBuilder<>(factory, promise);
}
- void executeSimpleQuery(CommandScheduler scheduler,
+ public void executeSimpleQuery(CommandScheduler scheduler,
String sql,
boolean autoCommit,
boolean singleton,
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
index 091cf9e31..9e8287df1 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
@@ -38,7 +38,8 @@ public class QueryResultBuilder, L extends SqlResu
private Throwable failure;
private boolean suspended;
- QueryResultBuilder(Function factory, PromiseInternal handler) {
+ // TODO: public-ing this shouldn't be needed
+ public QueryResultBuilder(Function factory, PromiseInternal handler) {
this.factory = factory;
this.handler = handler;
}