diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index a16fe2cc1..6c465fab9 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -733,6 +733,26 @@ public void batchReturning(SqlClient client) { }); } + public void importDataToDb(Vertx vertx, PgConnection client) { + vertx.fileSystem().readFile("path/to/file") + .flatMap(bufferAsyncResult -> { + return client.copyFromBytes( + "COPY my_table FROM STDIN (FORMAT csv, HEADER)", + bufferAsyncResult + ).execute(); + }).onSuccess(result -> { + Long rowsWritten = result.iterator().next().getLong("rowsWritten"); + System.out.println("rows written: " + rowsWritten); + }); + } + + public void exportDataFromDb(Vertx vertx, PgConnection client) { + String path = "path/to/file"; + client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)") + .flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value())) + .onSuccess(res -> System.out.println("Data exported to " + path)); + } + public void pgBouncer(PgConnectOptions connectOptions) { connectOptions.setUseLayer7Proxy(true); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index 8b9b8212d..e00ed44c2 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -17,6 +17,7 @@ package io.vertx.pgclient; +import io.vertx.core.buffer.Buffer; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; @@ -24,7 +25,13 @@ import io.vertx.core.Vertx; import io.vertx.core.impl.ContextInternal; import io.vertx.pgclient.impl.PgConnectionImpl; +import io.vertx.sqlclient.Query; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.SqlResult; + +import java.util.List; /** * A connection to Postgres. @@ -34,6 +41,7 @@ * *

* @@ -92,6 +100,48 @@ static Future connect(Vertx vertx, String connectionUri) { @Fluent PgConnection noticeHandler(Handler handler); + /** + * Imports data into a database. + * + *

Use this method when importing opaque bytes, e.g. from a CSV file. + * + *

If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead. + * + * @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)}) + * @param from byte stream data will be fetched from + * @return result set with single field {@code rowsWritten} + */ + Query> copyFromBytes(String sql, Buffer from); + + /** + * Exports data from a database with decoding. + * + * {@code FORMAT} can only be {@code binary}. + * + * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)}) + * @return decoded records + */ + Query> copyToRows(String sql); + + /** + * Exports data from a database as-is, without decoding. + * + *

Use this method when exporting opaque bytes, e.g. to a CSV file. + * + * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) + * @return async result of bytes container data will be written to + * + * - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html + * - future of read stream. + * - when we do query operation + * - we should not use query result builder + * - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ? + * - we need a new interface Future> + * - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html + * - PgSocketConnection - we will apply backpressure in SocketInternal + */ + Future> copyToBytes(String sql); + /** * Send a request cancellation message to tell the server to cancel processing request in this connection. *
Note: Use this with caution because the cancellation signal may or may not have any effect. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index d14b208eb..5161e8d51 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -19,18 +19,36 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.netty.buffer.ByteBuf; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgNotice; import io.vertx.pgclient.PgNotification; +import io.vertx.pgclient.impl.codec.CopyOutCommand; import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; +import io.vertx.sqlclient.Query; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; +import io.vertx.sqlclient.impl.QueryExecutor; +import io.vertx.sqlclient.impl.QueryResultBuilder; +import io.vertx.sqlclient.impl.QueryResultHandler; import io.vertx.sqlclient.impl.SocketConnectionBase; import io.vertx.sqlclient.impl.SqlConnectionBase; +import io.vertx.sqlclient.impl.SqlResultImpl; +import io.vertx.sqlclient.impl.command.QueryCommandBase; +import io.vertx.sqlclient.impl.command.SimpleQueryCommand; + +import java.util.function.Function; +import java.util.stream.Collector; public class PgConnectionImpl extends SqlConnectionBase implements PgConnection { @@ -107,6 +125,32 @@ public PgConnection noticeHandler(Handler handler) { return this; } + @Override + public Query> copyFromBytes(String sql, Buffer from) { + return null; + } + + @Override + public Query> copyToRows(String sql) { + return null; + } + + @Override + public Future> copyToBytes(String sql) { + Function> factory = (buffer) -> new SqlResultImpl<>(buffer); + PromiseInternal> promise = context.promise(); + + // currently, this loads entire content into Buffer + // it should stream bytes out instead + // TODO: signal completion as soon as the database replied CopyOutResponse 'H' ? + QueryResultBuilder, SqlResult> resultHandler = + new QueryResultBuilder<>(factory, promise); + + CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler); + this.schedule(promise.context(), cmd).onComplete(resultHandler); + return promise.future(); + } + @Override public int processId() { return conn.getProcessId(); @@ -126,4 +170,5 @@ public Future cancelRequest() { }); return promise.future(); } + } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java new file mode 100644 index 000000000..13170fb2d --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -0,0 +1,43 @@ +package io.vertx.pgclient.impl.codec; + +import io.netty.buffer.ByteBuf; +import io.vertx.core.buffer.Buffer; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.impl.QueryResultBuilder; +import io.vertx.sqlclient.impl.SqlResultImpl; +import io.vertx.sqlclient.impl.command.CommandBase; + +import java.util.stream.Collector; + +public class CopyOutCommand extends CommandBase { + private final String sql; + private final Collector collector; + private final QueryResultBuilder, SqlResult> resultHandler; + + public CopyOutCommand( + String sql, + QueryResultBuilder, SqlResult> resultHandler + ) { + this.sql = sql; + this.resultHandler = resultHandler; + this.collector = Collector.of( + Buffer::buffer, + // TODO: this might be unnecessary slow - think of alternatives + (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)), + (v1, v2) -> null, + (finalResult) -> finalResult + ); + } + + QueryResultBuilder, SqlResult> resultHandler() { + return resultHandler; + } + + String sql() { + return sql; + } + + Collector collector() { + return collector; + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java new file mode 100644 index 000000000..5f23a7651 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java @@ -0,0 +1,43 @@ +package io.vertx.pgclient.impl.codec; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.impl.SqlResultImpl; + +class CopyOutCommandCodec extends PgCommandCodec { + CopyOutDataDecoder decoder; + + CopyOutCommandCodec(CopyOutCommand cmd) { + super(cmd); + decoder = new CopyOutDataDecoder(cmd.collector()); + } + + @Override + public void handleCommandComplete(int updated) { + this.result = false; + Buffer result; + Throwable failure; + int size; + if (decoder != null) { + failure = decoder.complete(); + result = decoder.result(); + size = decoder.size(); + decoder.reset(); + } else { + failure = null; + result = new BufferImpl(); + size = 0; + } + cmd.resultHandler().handleResult(updated, size, null, result, failure); + } + + @Override + public void handleErrorResponse(ErrorResponse errorResponse) { + failure = errorResponse.toException(); + } + + void encode(PgEncoder encoder) { + encoder.writeQuery(new Query(cmd.sql())); + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java new file mode 100644 index 000000000..320fc13a8 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java @@ -0,0 +1,71 @@ +package io.vertx.pgclient.impl.codec; + +import io.netty.buffer.ByteBuf; +import io.vertx.core.buffer.Buffer; +import java.util.function.BiConsumer; +import java.util.stream.Collector; + +public class CopyOutDataDecoder { + + private final Collector collector; + private BiConsumer accumulator; + + private int size; + private Buffer container; + private Throwable failure; + private Buffer result; + + protected CopyOutDataDecoder(Collector collector) { + this.collector = collector; + reset(); + } + + public int size() { + return size; + } + + public void handleChunk(ByteBuf in) { + if (failure != null) { + return; + } + if (accumulator == null) { + try { + accumulator = collector.accumulator(); + } catch (Exception e) { + failure = e; + return; + } + } + try { + accumulator.accept(container, in); + } catch (Exception e) { + failure = e; + return; + } + size++; + } + + public Buffer result() { + return result; + } + + public Throwable complete() { + try { + result = collector.finisher().apply(container); + } catch (Exception e) { + failure = e; + } + return failure; + } + + public void reset() { + size = 0; + failure = null; + result = null; + try { + this.container = collector.supplier().get(); + } catch (Exception e) { + failure = e; + } + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index 058c0d629..eb58ff342 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) { decodeNotificationResponse(ctx, in); break; } + // TODO: check if these handlers need to be at this level of loop + // TODO: check if COPY needs a separate loop + case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: { + decodeCopyOutResponse(ctx, in); + break; + } + case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: { + decodeCopyData(ctx, in); + break; + } + case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: { + decodeCopyCompletion(ctx, in); + break; + } default: { throw new UnsupportedOperationException(); } @@ -455,4 +469,14 @@ private void decodeBackendKeyData(ByteBuf in) { private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) { ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in))); } + + private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {} + + private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) { + PgCommandCodec codec = inflight.peek(); + CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec; + cmdCodec.decoder.handleChunk(in); + } + + private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {} } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java index d35200cb8..0f10e1aed 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java @@ -119,6 +119,8 @@ void write(CommandBase cmd) { return new ClosePortalCommandCodec((CloseCursorCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { return new CloseStatementCommandCodec((CloseStatementCommand) cmd); + } else if (cmd instanceof CopyOutCommand) { + return new CopyOutCommandCodec((CopyOutCommand) cmd); } throw new AssertionError(); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java index bc8fa9a0d..dd487ce7e 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java @@ -71,4 +71,15 @@ public class PgProtocolConstants { public static final byte MESSAGE_TYPE_FUNCTION_RESULT = 'V'; public static final byte MESSAGE_TYPE_SSL_YES = 'S'; public static final byte MESSAGE_TYPE_SSL_NO = 'N'; + + /** + * COPY-out messages. + * + *

Other messages which might appear in between CopyData: + *

  • NoticeResponse + *
  • ParameterStatus + */ + public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H'; + public static final byte MESSAGE_TYPE_COPY_DATA = 'd'; + public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c'; } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java new file mode 100644 index 000000000..67579a901 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -0,0 +1,104 @@ +package io.vertx.pgclient; + +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class PgConnectionCopyTest extends PgConnectionTestBase { + public PgConnectionCopyTest() { + connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> { + handler.handle(ar.map(p -> p)); + }); + } + + @Test + public void testCopyToRows(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + deleteFromTestTable(ctx, conn, () -> { + insertIntoTestTable(ctx, conn, 10, () -> { + PgConnection pgConn = (PgConnection) conn; + pgConn.copyToRows("COPY my_table TO STDOUT (FORMAT binary)") + .execute() + .onComplete(ctx.asyncAssertSuccess(result -> { + for (int i = 0; i < 2; i++) { + ctx.assertEquals(1, result.rowCount()); + result = result.next(); + } + async.complete(); + })); + }); + }); + })); + } + + @Test + public void testCopyToCsvBytes(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + deleteFromTestTable(ctx, conn, () -> { + insertIntoTestTable(ctx, conn, 1, () -> { + PgConnection pgConn = (PgConnection) conn; + pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)") + .onComplete(ctx.asyncAssertSuccess(result -> { + ctx.assertNull(result.columnDescriptors()); + ctx.assertEquals(10, result.rowCount()); + ctx.assertEquals(10, result.size()); + ctx.assertEquals( + Buffer.buffer( + "0,Whatever-0\n" + + "1,Whatever-1\n" + + "2,Whatever-2\n" + + "3,Whatever-3\n" + + "4,Whatever-4\n" + + "5,Whatever-5\n" + + "6,Whatever-6\n" + + "7,Whatever-7\n" + + "8,Whatever-8\n" + + "9,Whatever-9\n" + ), + result.value() + ); + async.complete(); + })); + }); + }); + })); + } + + /** + * Just a thingy to eavesdrop protocol interactions. + * + * tips: + * - frontend / backend protocol -> message flow -> binary + * - start with CommandBase, SimpleQueryCommandCodecBase, builder.executeSimpleQuery, QueryExecutor, QueryResultBuilder + * - PgDecoder + * - startup message + * - auth + * - Simple Query + * - use wireshark - `tcp port 5432` + * - add VM option - port - such that it's fixed + * + * TODO: drop this. + * + * @param ctx + */ + @Test + public void testSimpleQuery(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + conn + .query("select 1") + .execute() + // when does the result is transformed from bool to rows? + .onComplete(ctx.asyncAssertSuccess(result1 -> { + ctx.assertEquals(1, result1.size()); + async.complete(); + })); + })); + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java index 182472c86..67019b057 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java @@ -34,7 +34,7 @@ /** * Executes query. */ -class QueryExecutor, L extends SqlResult> { +public class QueryExecutor, L extends SqlResult> { private final Function factory; private final Collector collector; @@ -49,7 +49,7 @@ private QueryResultBuilder createHandler(PromiseInternal promise) { return new QueryResultBuilder<>(factory, promise); } - void executeSimpleQuery(CommandScheduler scheduler, + public void executeSimpleQuery(CommandScheduler scheduler, String sql, boolean autoCommit, boolean singleton, diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java index 091cf9e31..9e8287df1 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java @@ -38,7 +38,8 @@ public class QueryResultBuilder, L extends SqlResu private Throwable failure; private boolean suspended; - QueryResultBuilder(Function factory, PromiseInternal handler) { + // TODO: public-ing this shouldn't be needed + public QueryResultBuilder(Function factory, PromiseInternal handler) { this.factory = factory; this.handler = handler; }