From 96f03f0914352de3eee87593c65e08e1b358d876 Mon Sep 17 00:00:00 2001 From: psolomin Date: Wed, 19 Apr 2023 13:44:29 +0100 Subject: [PATCH 01/12] Add skeleton for COPY feature API --- .../main/java/examples/PgClientExamples.java | 10 +++++ .../java/io/vertx/pgclient/PgConnection.java | 41 +++++++++++++++++++ .../vertx/pgclient/impl/PgConnectionImpl.java | 19 +++++++++ .../vertx/pgclient/PgConnectionCopyTest.java | 28 +++++++++++++ 4 files changed, 98 insertions(+) create mode 100644 vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index a16fe2cc1..23575b2c3 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -733,6 +733,16 @@ public void batchReturning(SqlClient client) { }); } + public void copyFromStdinReturning(Vertx vertx, PgConnection client) { + client.copyFrom( + "COPY my_table FROM STDIN (FORMAT csv, HEADER)", + vertx.fileSystem().readFile("path/to/file") + ).execute().onSuccess(res -> { + Long rowsWritten = res.iterator().next().getLong("rowsWritten"); + System.out.println("rows affected: " + rowsWritten); + }); + } + public void pgBouncer(PgConnectOptions connectOptions) { connectOptions.setUseLayer7Proxy(true); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index 8b9b8212d..95a4d1a71 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -17,6 +17,7 @@ package io.vertx.pgclient; +import io.vertx.core.buffer.Buffer; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; @@ -24,8 +25,13 @@ import io.vertx.core.Vertx; import io.vertx.core.impl.ContextInternal; import io.vertx.pgclient.impl.PgConnectionImpl; +import io.vertx.sqlclient.Query; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; +import java.util.List; + /** * A connection to Postgres. *

@@ -34,6 +40,7 @@ *

*

* @@ -92,6 +99,40 @@ static Future connect(Vertx vertx, String connectionUri) { @Fluent PgConnection noticeHandler(Handler handler); + /** + * Imports data into a database. + * + *

Use this method when importing opaque bytes, e.g. from a CSV file. + * + *

If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead. + * + * @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)}) + * @param from byte stream data will be fetched from + * @return result set with single field {@code rowsWritten} + */ + Query> copyFrom(String sql, Future from); + + /** + * Exports data from a database with decoding. + * + * {@code FORMAT} can only be {@code binary}. + * + * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)}) + * @return decoded records + */ + Query> copyTo(String sql); + + /** + * Exports data from a database as-is, without decoding. + * + *

Use this method when exporting opaque bytes, e.g. to a CSV file. + * + * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) + * @param to bytes container data will be written to + * @return async result + */ + Future copyTo(String sql, Buffer to); + /** * Send a request cancellation message to tell the server to cancel processing request in this connection. *
Note: Use this with caution because the cancellation signal may or may not have any effect. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index d14b208eb..c317bbb19 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -19,6 +19,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.ContextInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; @@ -27,6 +28,9 @@ import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; +import io.vertx.sqlclient.Query; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; import io.vertx.sqlclient.impl.SocketConnectionBase; @@ -107,6 +111,21 @@ public PgConnection noticeHandler(Handler handler) { return this; } + @Override + public Query> copyFrom(String sql, Future from) { + return null; + } + + @Override + public Query> copyTo(String sql) { + return null; + } + + @Override + public Future copyTo(String sql, Buffer to) { + return Future.succeededFuture(); + } + @Override public int processId() { return conn.getProcessId(); diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java new file mode 100644 index 000000000..ddde4a142 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -0,0 +1,28 @@ +package io.vertx.pgclient; + +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import org.junit.Test; + +public class PgConnectionCopyTest extends PgConnectionTestBase { + @Test + public void testCopyToRows(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + deleteFromTestTable(ctx, conn, () -> { + insertIntoTestTable(ctx, conn, 10, () -> { + PgConnection pgConn = (PgConnection) conn; + pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)") + .execute() + .onComplete(ctx.asyncAssertSuccess(result -> { + for (int i = 0; i < 2; i++) { + ctx.assertEquals(1, result.rowCount()); + result = result.next(); + } + async.complete(); + })); + }); + }); + })); + } +} From 5cbd34ab50722a9a0b22dfa0bba5f2608dcb874f Mon Sep 17 00:00:00 2001 From: psolomin Date: Wed, 19 Apr 2023 14:52:19 +0100 Subject: [PATCH 02/12] Add example with export; rename methods --- .../main/java/examples/PgClientExamples.java | 30 ++++++++++++++----- .../java/io/vertx/pgclient/PgConnection.java | 2 +- .../vertx/pgclient/impl/PgConnectionImpl.java | 2 +- .../vertx/pgclient/PgConnectionCopyTest.java | 6 ++++ 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index 23575b2c3..5bc4fcc3c 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -19,6 +19,8 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.json.JsonObject; import io.vertx.core.net.PemTrustOptions; import io.vertx.docgen.Source; @@ -733,14 +735,26 @@ public void batchReturning(SqlClient client) { }); } - public void copyFromStdinReturning(Vertx vertx, PgConnection client) { - client.copyFrom( - "COPY my_table FROM STDIN (FORMAT csv, HEADER)", - vertx.fileSystem().readFile("path/to/file") - ).execute().onSuccess(res -> { - Long rowsWritten = res.iterator().next().getLong("rowsWritten"); - System.out.println("rows affected: " + rowsWritten); - }); + public void importDataToDb(Vertx vertx, PgConnection client) { + vertx.fileSystem().readFile("path/to/file") + .flatMap(bufferAsyncResult -> { + return client.copyFrom( + "COPY my_table FROM STDIN (FORMAT csv, HEADER)", + bufferAsyncResult + ).execute(); + }).onSuccess(result -> { + Long rowsWritten = result.iterator().next().getLong("rowsWritten"); + System.out.println("rows written: " + rowsWritten); + }); + } + + public void exportDataFromDb(Vertx vertx, PgConnection client) { + Buffer buffer = new BufferImpl(); + String path = "path/to/file"; + client.copyTo("COPY my_table TO STDOUT (FORMAT csv, HEADER)", buffer) + .andThen(res -> { + vertx.fileSystem().writeFile("path/to/file.csv", buffer); + }).onSuccess(res -> System.out.println("Data exported to " + path)); } public void pgBouncer(PgConnectOptions connectOptions) { diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index 95a4d1a71..c049c1005 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -110,7 +110,7 @@ static Future connect(Vertx vertx, String connectionUri) { * @param from byte stream data will be fetched from * @return result set with single field {@code rowsWritten} */ - Query> copyFrom(String sql, Future from); + Query> copyFrom(String sql, Buffer from); /** * Exports data from a database with decoding. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index c317bbb19..a4bb26fc2 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -112,7 +112,7 @@ public PgConnection noticeHandler(Handler handler) { } @Override - public Query> copyFrom(String sql, Future from) { + public Query> copyFrom(String sql, Buffer from) { return null; } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index ddde4a142..aa98c04fe 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -5,6 +5,12 @@ import org.junit.Test; public class PgConnectionCopyTest extends PgConnectionTestBase { + public PgConnectionCopyTest() { + connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> { + handler.handle(ar.map(p -> p)); + }); + } + @Test public void testCopyToRows(TestContext ctx) { Async async = ctx.async(); From 38d58448537afd96ba418ef6dde9c800f195efac Mon Sep 17 00:00:00 2001 From: psolomin Date: Wed, 19 Apr 2023 15:00:06 +0100 Subject: [PATCH 03/12] Change methods' signatures --- .../src/main/java/examples/PgClientExamples.java | 12 ++++-------- .../main/java/io/vertx/pgclient/PgConnection.java | 9 ++++----- .../io/vertx/pgclient/impl/PgConnectionImpl.java | 8 ++++---- .../java/io/vertx/pgclient/PgConnectionCopyTest.java | 2 +- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index 5bc4fcc3c..beffbdd74 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -19,8 +19,6 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.json.JsonObject; import io.vertx.core.net.PemTrustOptions; import io.vertx.docgen.Source; @@ -738,7 +736,7 @@ public void batchReturning(SqlClient client) { public void importDataToDb(Vertx vertx, PgConnection client) { vertx.fileSystem().readFile("path/to/file") .flatMap(bufferAsyncResult -> { - return client.copyFrom( + return client.copyFromBytes( "COPY my_table FROM STDIN (FORMAT csv, HEADER)", bufferAsyncResult ).execute(); @@ -749,12 +747,10 @@ public void importDataToDb(Vertx vertx, PgConnection client) { } public void exportDataFromDb(Vertx vertx, PgConnection client) { - Buffer buffer = new BufferImpl(); String path = "path/to/file"; - client.copyTo("COPY my_table TO STDOUT (FORMAT csv, HEADER)", buffer) - .andThen(res -> { - vertx.fileSystem().writeFile("path/to/file.csv", buffer); - }).onSuccess(res -> System.out.println("Data exported to " + path)); + client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute() + .flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer)) + .onSuccess(res -> System.out.println("Data exported to " + path)); } public void pgBouncer(PgConnectOptions connectOptions) { diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index c049c1005..a52c84849 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -110,7 +110,7 @@ static Future connect(Vertx vertx, String connectionUri) { * @param from byte stream data will be fetched from * @return result set with single field {@code rowsWritten} */ - Query> copyFrom(String sql, Buffer from); + Query> copyFromBytes(String sql, Buffer from); /** * Exports data from a database with decoding. @@ -120,7 +120,7 @@ static Future connect(Vertx vertx, String connectionUri) { * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)}) * @return decoded records */ - Query> copyTo(String sql); + Query> copyToRows(String sql); /** * Exports data from a database as-is, without decoding. @@ -128,10 +128,9 @@ static Future connect(Vertx vertx, String connectionUri) { *

Use this method when exporting opaque bytes, e.g. to a CSV file. * * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) - * @param to bytes container data will be written to - * @return async result + * @return async result of bytes container data will be written to */ - Future copyTo(String sql, Buffer to); + Query copyToBytes(String sql); /** * Send a request cancellation message to tell the server to cancel processing request in this connection. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index a4bb26fc2..4db7c6140 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -112,18 +112,18 @@ public PgConnection noticeHandler(Handler handler) { } @Override - public Query> copyFrom(String sql, Buffer from) { + public Query> copyFromBytes(String sql, Buffer from) { return null; } @Override - public Query> copyTo(String sql) { + public Query> copyToRows(String sql) { return null; } @Override - public Future copyTo(String sql, Buffer to) { - return Future.succeededFuture(); + public Query copyToBytes(String sql) { + return null; } @Override diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index aa98c04fe..91b0129a7 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -18,7 +18,7 @@ public void testCopyToRows(TestContext ctx) { deleteFromTestTable(ctx, conn, () -> { insertIntoTestTable(ctx, conn, 10, () -> { PgConnection pgConn = (PgConnection) conn; - pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)") + pgConn.copyToRows("COPY my_table TO STDOUT (FORMAT binary)") .execute() .onComplete(ctx.asyncAssertSuccess(result -> { for (int i = 0; i < 2; i++) { From e596664e8b833b6627922f449bfdff0e911bc833 Mon Sep 17 00:00:00 2001 From: psolomin Date: Fri, 21 Apr 2023 17:53:16 +0100 Subject: [PATCH 04/12] Add auxiliary test --- .../vertx/pgclient/PgConnectionCopyTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 91b0129a7..b03fdae8b 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -31,4 +31,35 @@ public void testCopyToRows(TestContext ctx) { }); })); } + + /** + * Just a thingy to eavesdrop protocol interactions. + * + * tips: + * - frontend / backend protocol -> message flow -> binary + * - start with CommandBase, SimpleQueryCommandCodecBase, builder.executeSimpleQuery, QueryExecutor, QueryResultBuilder + * - PgDecoder + * - startup message + * - auth + * - Simple Query + * - use wireshark - `tcp port 5432` + * - add VM option - port - such that it's fixed + * + * TODO: drop this. + * + * @param ctx + */ + @Test + public void testSimpleQuery(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + conn + .query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(result1 -> { + ctx.assertEquals(1, result1.size()); + async.complete(); + })); + })); + } } From cd04992450abff248c39e5dcd9da1045e85c8e11 Mon Sep 17 00:00:00 2001 From: psolomin Date: Fri, 21 Apr 2023 19:07:51 +0100 Subject: [PATCH 05/12] COPY TO STDOUT doesn't throw UnsupportedOperationException TODO: put into separate loop? --- .../vertx/pgclient/impl/codec/PgDecoder.java | 20 +++++++++++++++++++ .../impl/codec/PgProtocolConstants.java | 5 +++++ .../vertx/pgclient/PgConnectionCopyTest.java | 4 ++-- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index 058c0d629..76991bb00 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) { decodeNotificationResponse(ctx, in); break; } + // TODO: check if these handlers need to be at this level of loop + // TODO: check if COPY needs a separate loop + case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: { + decodeCopyOutResponse(ctx, in); + break; + } + case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: { + decodeCopyData(ctx, in); + break; + } + case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: { + decodeCopyCompletion(ctx, in); + break; + } default: { throw new UnsupportedOperationException(); } @@ -455,4 +469,10 @@ private void decodeBackendKeyData(ByteBuf in) { private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) { ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in))); } + + private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {} + + private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {} + + private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {} } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java index bc8fa9a0d..de61c2136 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java @@ -71,4 +71,9 @@ public class PgProtocolConstants { public static final byte MESSAGE_TYPE_FUNCTION_RESULT = 'V'; public static final byte MESSAGE_TYPE_SSL_YES = 'S'; public static final byte MESSAGE_TYPE_SSL_NO = 'N'; + + // COPY-related + public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H'; + public static final byte MESSAGE_TYPE_COPY_DATA = 'd'; + public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c'; } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index b03fdae8b..4a5abb6fa 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -54,10 +54,10 @@ public void testSimpleQuery(TestContext ctx) { Async async = ctx.async(); connector.accept(ctx.asyncAssertSuccess(conn -> { conn - .query("SELECT 1") + .query("COPY world TO STDOUT (FORMAT csv)") .execute() .onComplete(ctx.asyncAssertSuccess(result1 -> { - ctx.assertEquals(1, result1.size()); + ctx.assertEquals(0, result1.size()); async.complete(); })); })); From bdeeae24f6975b3ff4568cc720a00c9df6fc9b7b Mon Sep 17 00:00:00 2001 From: psolomin Date: Sat, 22 Apr 2023 14:43:38 +0100 Subject: [PATCH 06/12] WIP - 0 Hacking around the API to make Future coming out --- .../main/java/examples/PgClientExamples.java | 2 +- .../java/io/vertx/pgclient/PgConnection.java | 2 +- .../vertx/pgclient/impl/PgConnectionImpl.java | 27 ++++++++++++-- .../impl/codec/PgProtocolConstants.java | 8 ++++- .../vertx/pgclient/PgConnectionCopyTest.java | 36 ++++++++++++++++++- .../sqlclient/impl/QueryResultBuilder.java | 3 +- 6 files changed, 71 insertions(+), 7 deletions(-) diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index beffbdd74..a9ebbd935 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -748,7 +748,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) { public void exportDataFromDb(Vertx vertx, PgConnection client) { String path = "path/to/file"; - client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute() + client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)") .flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer)) .onSuccess(res -> System.out.println("Data exported to " + path)); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index a52c84849..c4f034719 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -130,7 +130,7 @@ static Future connect(Vertx vertx, String connectionUri) { * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) * @return async result of bytes container data will be written to */ - Query copyToBytes(String sql); + Future copyToBytes(String sql); /** * Send a request cancellation message to tell the server to cancel processing request in this connection. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 4db7c6140..533b1acec 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -21,6 +21,7 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgNotice; @@ -31,10 +32,16 @@ import io.vertx.sqlclient.Query; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; +import io.vertx.sqlclient.impl.QueryResultBuilder; import io.vertx.sqlclient.impl.SocketConnectionBase; import io.vertx.sqlclient.impl.SqlConnectionBase; +import io.vertx.sqlclient.impl.SqlResultImpl; +import io.vertx.sqlclient.impl.command.CommandBase; + +import java.util.function.Function; public class PgConnectionImpl extends SqlConnectionBase implements PgConnection { @@ -122,8 +129,14 @@ public Query> copyToRows(String sql) { } @Override - public Query copyToBytes(String sql) { - return null; + public Future copyToBytes(String sql) { + Function> factory = null; + PromiseInternal> promise = null; + + QueryResultBuilder, SqlResult> resultHandler = + new QueryResultBuilder<>(factory, promise); + CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler); + return this.schedule(context, cmd); } @Override @@ -145,4 +158,14 @@ public Future cancelRequest() { }); return promise.future(); } + + private class CopyOutCommand extends CommandBase { + private final String sql; + private final QueryResultBuilder, SqlResult> resultHandler; + + CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) { + this.sql = sql; + this.resultHandler = resultHandler; + } + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java index de61c2136..dd487ce7e 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java @@ -72,7 +72,13 @@ public class PgProtocolConstants { public static final byte MESSAGE_TYPE_SSL_YES = 'S'; public static final byte MESSAGE_TYPE_SSL_NO = 'N'; - // COPY-related + /** + * COPY-out messages. + * + *

Other messages which might appear in between CopyData: + *

  • NoticeResponse + *
  • ParameterStatus + */ public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H'; public static final byte MESSAGE_TYPE_COPY_DATA = 'd'; public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c'; diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 4a5abb6fa..8acdf3ba9 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -2,6 +2,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; +import org.junit.Ignore; import org.junit.Test; public class PgConnectionCopyTest extends PgConnectionTestBase { @@ -32,6 +33,24 @@ public void testCopyToRows(TestContext ctx) { })); } + @Test + @Ignore("For now it just hangs forever") + public void testCopyToCsvBytes(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + deleteFromTestTable(ctx, conn, () -> { + insertIntoTestTable(ctx, conn, 10, () -> { + PgConnection pgConn = (PgConnection) conn; + pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)") + .onComplete(ctx.asyncAssertSuccess(buffer -> { + buffer.getBytes(); + async.complete(); + })); + }); + }); + })); + } + /** * Just a thingy to eavesdrop protocol interactions. * @@ -54,9 +73,24 @@ public void testSimpleQuery(TestContext ctx) { Async async = ctx.async(); connector.accept(ctx.asyncAssertSuccess(conn -> { conn - .query("COPY world TO STDOUT (FORMAT csv)") + .query("select 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(result1 -> { + ctx.assertEquals(1, result1.size()); + async.complete(); + })); + })); + } + + @Test + public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(conn -> { + conn + .query("copy fortune to stdout (format csv)") .execute() .onComplete(ctx.asyncAssertSuccess(result1 -> { + // nothing comes out ctx.assertEquals(0, result1.size()); async.complete(); })); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java index 091cf9e31..9e8287df1 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java @@ -38,7 +38,8 @@ public class QueryResultBuilder, L extends SqlResu private Throwable failure; private boolean suspended; - QueryResultBuilder(Function factory, PromiseInternal handler) { + // TODO: public-ing this shouldn't be needed + public QueryResultBuilder(Function factory, PromiseInternal handler) { this.factory = factory; this.handler = handler; } From c335f8a58d86c04d5472f7485c0f424afb59252d Mon Sep 17 00:00:00 2001 From: psolomin Date: Thu, 27 Apr 2023 10:41:55 +0100 Subject: [PATCH 07/12] WIP: Stuck with copyToBytes --- .../main/java/examples/PgClientExamples.java | 2 +- .../java/io/vertx/pgclient/PgConnection.java | 3 +- .../vertx/pgclient/impl/PgConnectionImpl.java | 42 ++++++++++++------- .../pgclient/impl/codec/CopyOutCommand.java | 17 ++++++++ .../impl/codec/CopyOutCommandCodec.java | 4 ++ .../vertx/pgclient/PgConnectionCopyTest.java | 9 ++-- .../vertx/sqlclient/impl/QueryExecutor.java | 4 +- 7 files changed, 57 insertions(+), 24 deletions(-) create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java index a9ebbd935..6c465fab9 100644 --- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java +++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java @@ -749,7 +749,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) { public void exportDataFromDb(Vertx vertx, PgConnection client) { String path = "path/to/file"; client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)") - .flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer)) + .flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value())) .onSuccess(res -> System.out.println("Data exported to " + path)); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index c4f034719..5e765550b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -29,6 +29,7 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.SqlResult; import java.util.List; @@ -130,7 +131,7 @@ static Future connect(Vertx vertx, String connectionUri) { * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) * @return async result of bytes container data will be written to */ - Future copyToBytes(String sql); + Future> copyToBytes(String sql); /** * Send a request cancellation message to tell the server to cancel processing request in this connection. diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 533b1acec..99c5a4d23 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -20,12 +20,14 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.future.PromiseInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgNotice; import io.vertx.pgclient.PgNotification; +import io.vertx.pgclient.impl.codec.CopyOutCommand; import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; @@ -35,13 +37,17 @@ import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; +import io.vertx.sqlclient.impl.QueryExecutor; import io.vertx.sqlclient.impl.QueryResultBuilder; +import io.vertx.sqlclient.impl.QueryResultHandler; import io.vertx.sqlclient.impl.SocketConnectionBase; import io.vertx.sqlclient.impl.SqlConnectionBase; import io.vertx.sqlclient.impl.SqlResultImpl; -import io.vertx.sqlclient.impl.command.CommandBase; +import io.vertx.sqlclient.impl.command.QueryCommandBase; +import io.vertx.sqlclient.impl.command.SimpleQueryCommand; import java.util.function.Function; +import java.util.stream.Collector; public class PgConnectionImpl extends SqlConnectionBase implements PgConnection { @@ -129,14 +135,29 @@ public Query> copyToRows(String sql) { } @Override - public Future copyToBytes(String sql) { - Function> factory = null; - PromiseInternal> promise = null; + public Future> copyToBytes(String sql) { + Function> factory = SqlResultImpl::new; + PromiseInternal> promise = context.promise(); QueryResultBuilder, SqlResult> resultHandler = new QueryResultBuilder<>(factory, promise); - CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler); - return this.schedule(context, cmd); + + Collector collector = Collector.of( + BufferImpl::new, + (v, row) -> { + System.out.println(row); + }, + (v1, v2) -> null, + Function.identity() + ); + + SimpleQueryCommand cmd = new SimpleQueryCommand<>( + sql, true, false, collector, resultHandler); + // this.schedule(promise.context(), cmd); + + QueryExecutor executor = new QueryExecutor(factory, collector); + executor.executeSimpleQuery(this, sql, false, false, promise); + return promise.future(); } @Override @@ -159,13 +180,4 @@ public Future cancelRequest() { return promise.future(); } - private class CopyOutCommand extends CommandBase { - private final String sql; - private final QueryResultBuilder, SqlResult> resultHandler; - - CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) { - this.sql = sql; - this.resultHandler = resultHandler; - } - } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java new file mode 100644 index 000000000..13b0e8faf --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -0,0 +1,17 @@ +package io.vertx.pgclient.impl.codec; + +import io.vertx.core.buffer.Buffer; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.impl.QueryResultBuilder; +import io.vertx.sqlclient.impl.SqlResultImpl; +import io.vertx.sqlclient.impl.command.CommandBase; + +public class CopyOutCommand extends CommandBase { + private final String sql; + private final QueryResultBuilder, SqlResult> resultHandler; + + public CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) { + this.sql = sql; + this.resultHandler = resultHandler; + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java new file mode 100644 index 000000000..70b6fbfb1 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java @@ -0,0 +1,4 @@ +package io.vertx.pgclient.impl.codec; + +public class CopyOutCommandCodec { +} diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 8acdf3ba9..e2b4da7f3 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -34,16 +34,15 @@ public void testCopyToRows(TestContext ctx) { } @Test - @Ignore("For now it just hangs forever") public void testCopyToCsvBytes(TestContext ctx) { Async async = ctx.async(); connector.accept(ctx.asyncAssertSuccess(conn -> { deleteFromTestTable(ctx, conn, () -> { - insertIntoTestTable(ctx, conn, 10, () -> { + insertIntoTestTable(ctx, conn, 1, () -> { PgConnection pgConn = (PgConnection) conn; - pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)") - .onComplete(ctx.asyncAssertSuccess(buffer -> { - buffer.getBytes(); + pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)") + .onComplete(ctx.asyncAssertSuccess(result -> { + result.value().getBytes(); async.complete(); })); }); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java index 182472c86..67019b057 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java @@ -34,7 +34,7 @@ /** * Executes query. */ -class QueryExecutor, L extends SqlResult> { +public class QueryExecutor, L extends SqlResult> { private final Function factory; private final Collector collector; @@ -49,7 +49,7 @@ private QueryResultBuilder createHandler(PromiseInternal promise) { return new QueryResultBuilder<>(factory, promise); } - void executeSimpleQuery(CommandScheduler scheduler, + public void executeSimpleQuery(CommandScheduler scheduler, String sql, boolean autoCommit, boolean singleton, From de9fabd5b8163d378021636cb5c9a5c75a13f123 Mon Sep 17 00:00:00 2001 From: psolomin Date: Thu, 27 Apr 2023 14:43:32 +0100 Subject: [PATCH 08/12] WIP - buffer accumulates csv text; Broken resultHandler --- .../vertx/pgclient/impl/PgConnectionImpl.java | 19 +---- .../pgclient/impl/codec/CopyOutCommand.java | 30 +++++++- .../impl/codec/CopyOutCommandCodec.java | 40 ++++++++++- .../impl/codec/CopyOutDataDecoder.java | 71 +++++++++++++++++++ .../vertx/pgclient/impl/codec/PgDecoder.java | 7 +- .../vertx/pgclient/impl/codec/PgEncoder.java | 2 + .../vertx/pgclient/PgConnectionCopyTest.java | 15 ---- 7 files changed, 149 insertions(+), 35 deletions(-) create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 99c5a4d23..6cab9c519 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -19,6 +19,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.netty.buffer.ByteBuf; import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.impl.ContextInternal; @@ -142,22 +143,8 @@ public Future> copyToBytes(String sql) { QueryResultBuilder, SqlResult> resultHandler = new QueryResultBuilder<>(factory, promise); - Collector collector = Collector.of( - BufferImpl::new, - (v, row) -> { - System.out.println(row); - }, - (v1, v2) -> null, - Function.identity() - ); - - SimpleQueryCommand cmd = new SimpleQueryCommand<>( - sql, true, false, collector, resultHandler); - // this.schedule(promise.context(), cmd); - - QueryExecutor executor = new QueryExecutor(factory, collector); - executor.executeSimpleQuery(this, sql, false, false, promise); - return promise.future(); + CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler); + return this.schedule(promise.context(), cmd); } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java index 13b0e8faf..f128c9a61 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -1,17 +1,43 @@ package io.vertx.pgclient.impl.codec; +import io.netty.buffer.ByteBuf; import io.vertx.core.buffer.Buffer; import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.impl.QueryResultBuilder; import io.vertx.sqlclient.impl.SqlResultImpl; import io.vertx.sqlclient.impl.command.CommandBase; -public class CopyOutCommand extends CommandBase { +import java.util.function.Function; +import java.util.stream.Collector; + +public class CopyOutCommand extends CommandBase> { private final String sql; + private final Collector collector; private final QueryResultBuilder, SqlResult> resultHandler; - public CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) { + public CopyOutCommand( + String sql, + QueryResultBuilder, SqlResult> resultHandler + ) { this.sql = sql; this.resultHandler = resultHandler; + this.collector = Collector.of( + Buffer::buffer, + (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)), + (v1, v2) -> null, + Function.identity() + ); + } + + QueryResultBuilder, SqlResult> resultHandler() { + return resultHandler; + } + + String sql() { + return sql; + } + + Collector collector() { + return collector; } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java index 70b6fbfb1..948dc8db9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java @@ -1,4 +1,42 @@ package io.vertx.pgclient.impl.codec; -public class CopyOutCommandCodec { +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; +import io.vertx.sqlclient.SqlResult; + +class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> { + CopyOutDataDecoder decoder; + + CopyOutCommandCodec(CopyOutCommand cmd) { + super(cmd); + decoder = new CopyOutDataDecoder(cmd.collector()); + } + + @Override + public void handleCommandComplete(int updated) { + this.result = null; + Buffer result; + Throwable failure; + int size; + if (decoder != null) { + failure = decoder.complete(); + result = decoder.result(); + size = decoder.size(); + decoder.reset(); + } else { + failure = null; + result = new BufferImpl(); + size = 0; + } + cmd.resultHandler().handleResult(updated, size, null, result, failure); + } + + @Override + public void handleErrorResponse(ErrorResponse errorResponse) { + failure = errorResponse.toException(); + } + + void encode(PgEncoder encoder) { + encoder.writeQuery(new Query(cmd.sql())); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java new file mode 100644 index 000000000..5ca742ec4 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java @@ -0,0 +1,71 @@ +package io.vertx.pgclient.impl.codec; + +import io.netty.buffer.ByteBuf; +import io.vertx.core.buffer.Buffer; +import java.util.function.BiConsumer; +import java.util.stream.Collector; + +public class CopyOutDataDecoder { + + private final Collector collector; + private BiConsumer accumulator; + + private int size; + private Buffer container; + private Throwable failure; + private Buffer result; + + protected CopyOutDataDecoder(Collector collector) { + this.collector = collector; + reset(); + } + + public int size() { + return size; + } + + public void handleChunk(int len, ByteBuf in) { + if (failure != null) { + return; + } + if (accumulator == null) { + try { + accumulator = collector.accumulator(); + } catch (Exception e) { + failure = e; + return; + } + } + try { + accumulator.accept(container, in); + } catch (Exception e) { + failure = e; + return; + } + size++; + } + + public Buffer result() { + return result; + } + + public Throwable complete() { + try { + result = collector.finisher().apply(container); + } catch (Exception e) { + failure = e; + } + return failure; + } + + public void reset() { + size = 0; + failure = null; + result = null; + try { + this.container = collector.supplier().get(); + } catch (Exception e) { + failure = e; + } + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index 76991bb00..f9c951e4c 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -472,7 +472,12 @@ private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) { private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {} - private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {} + private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) { + PgCommandCodec codec = inflight.peek(); + CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec; + int len = in.readUnsignedShort(); + cmdCodec.decoder.handleChunk(len, in); + } private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {} } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java index d35200cb8..0f10e1aed 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java @@ -119,6 +119,8 @@ void write(CommandBase cmd) { return new ClosePortalCommandCodec((CloseCursorCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { return new CloseStatementCommandCodec((CloseStatementCommand) cmd); + } else if (cmd instanceof CopyOutCommand) { + return new CopyOutCommandCodec((CopyOutCommand) cmd); } throw new AssertionError(); } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index e2b4da7f3..6c574ba9f 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -80,19 +80,4 @@ public void testSimpleQuery(TestContext ctx) { })); })); } - - @Test - public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) { - Async async = ctx.async(); - connector.accept(ctx.asyncAssertSuccess(conn -> { - conn - .query("copy fortune to stdout (format csv)") - .execute() - .onComplete(ctx.asyncAssertSuccess(result1 -> { - // nothing comes out - ctx.assertEquals(0, result1.size()); - async.complete(); - })); - })); - } } From 7977926d6359f586237db5fd0410a645261834ea Mon Sep 17 00:00:00 2001 From: psolomin Date: Thu, 27 Apr 2023 20:16:43 +0100 Subject: [PATCH 09/12] WIP - unable to get the correct result out --- .../main/java/io/vertx/pgclient/impl/PgConnectionImpl.java | 5 ++++- .../java/io/vertx/pgclient/impl/codec/CopyOutCommand.java | 3 +-- .../io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java | 3 ++- .../test/java/io/vertx/pgclient/PgConnectionCopyTest.java | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 6cab9c519..491c2e35b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -137,9 +137,12 @@ public Query> copyToRows(String sql) { @Override public Future> copyToBytes(String sql) { - Function> factory = SqlResultImpl::new; + Function> factory = (buffer) -> new SqlResultImpl<>(buffer); PromiseInternal> promise = context.promise(); + // currently, this loads entire content into Buffer + // it should stream bytes out instead + // TODO: signal completion as soon as the database replied CopyOutResponse 'H' ? QueryResultBuilder, SqlResult> resultHandler = new QueryResultBuilder<>(factory, promise); diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java index f128c9a61..69a8fb7cb 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -7,7 +7,6 @@ import io.vertx.sqlclient.impl.SqlResultImpl; import io.vertx.sqlclient.impl.command.CommandBase; -import java.util.function.Function; import java.util.stream.Collector; public class CopyOutCommand extends CommandBase> { @@ -25,7 +24,7 @@ public CopyOutCommand( Buffer::buffer, (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)), (v1, v2) -> null, - Function.identity() + (finalResult) -> finalResult ); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java index 948dc8db9..13993532b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java @@ -3,6 +3,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.impl.SqlResultImpl; class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> { CopyOutDataDecoder decoder; @@ -14,7 +15,7 @@ class CopyOutCommandCodec extends PgCommandCodec, CopyOutComma @Override public void handleCommandComplete(int updated) { - this.result = null; + this.result = new SqlResultImpl(Buffer.buffer("abc")); Buffer result; Throwable failure; int size; diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 6c574ba9f..04e50585c 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -2,7 +2,6 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; -import org.junit.Ignore; import org.junit.Test; public class PgConnectionCopyTest extends PgConnectionTestBase { @@ -74,6 +73,7 @@ public void testSimpleQuery(TestContext ctx) { conn .query("select 1") .execute() + // when does the result is transformed from bool to rows? .onComplete(ctx.asyncAssertSuccess(result1 -> { ctx.assertEquals(1, result1.size()); async.complete(); From c499da900da626424060d30c2b8ec68a1a16b909 Mon Sep 17 00:00:00 2001 From: psolomin Date: Fri, 28 Apr 2023 12:15:33 +0100 Subject: [PATCH 10/12] WIP - managed to get bytes out from query result --- .../vertx/pgclient/impl/PgConnectionImpl.java | 3 ++- .../pgclient/impl/codec/CopyOutCommand.java | 2 +- .../impl/codec/CopyOutCommandCodec.java | 4 ++-- .../vertx/pgclient/PgConnectionCopyTest.java | 23 ++++++++++++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 491c2e35b..5161e8d51 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -147,7 +147,8 @@ public Future> copyToBytes(String sql) { new QueryResultBuilder<>(factory, promise); CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler); - return this.schedule(promise.context(), cmd); + this.schedule(promise.context(), cmd).onComplete(resultHandler); + return promise.future(); } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java index 69a8fb7cb..855020687 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -9,7 +9,7 @@ import java.util.stream.Collector; -public class CopyOutCommand extends CommandBase> { +public class CopyOutCommand extends CommandBase { private final String sql; private final Collector collector; private final QueryResultBuilder, SqlResult> resultHandler; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java index 13993532b..5f23a7651 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java @@ -5,7 +5,7 @@ import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.impl.SqlResultImpl; -class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> { +class CopyOutCommandCodec extends PgCommandCodec { CopyOutDataDecoder decoder; CopyOutCommandCodec(CopyOutCommand cmd) { @@ -15,7 +15,7 @@ class CopyOutCommandCodec extends PgCommandCodec, CopyOutComma @Override public void handleCommandComplete(int updated) { - this.result = new SqlResultImpl(Buffer.buffer("abc")); + this.result = false; Buffer result; Throwable failure; int size; diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 04e50585c..8b2ce3e85 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -1,9 +1,13 @@ package io.vertx.pgclient; +import io.vertx.core.buffer.Buffer; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class PgConnectionCopyTest extends PgConnectionTestBase { public PgConnectionCopyTest() { connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> { @@ -41,7 +45,24 @@ public void testCopyToCsvBytes(TestContext ctx) { PgConnection pgConn = (PgConnection) conn; pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)") .onComplete(ctx.asyncAssertSuccess(result -> { - result.value().getBytes(); + ctx.assertNull(result.columnDescriptors()); + ctx.assertEquals(10, result.rowCount()); + ctx.assertEquals(10, result.size()); + ctx.assertEquals( + Buffer.buffer( + "Whatever-0\n" + + "Whatever-1\n" + + "Whatever-2\n" + + "Whatever-3\n" + + "Whatever-4\n" + + "Whatever-5\n" + + "Whatever-6\n" + + "Whatever-7\n" + + "Whatever-8\n" + + "Whatever-9\n" + ), + result.value() + ); async.complete(); })); }); From 3797477077933b61f66ac9874dbfa6670e8fc298 Mon Sep 17 00:00:00 2001 From: psolomin Date: Fri, 28 Apr 2023 12:42:58 +0100 Subject: [PATCH 11/12] Fix bug: unnecessary read of data content - corrupt output --- .../pgclient/impl/codec/CopyOutCommand.java | 1 + .../impl/codec/CopyOutDataDecoder.java | 2 +- .../vertx/pgclient/impl/codec/PgDecoder.java | 3 +-- .../vertx/pgclient/PgConnectionCopyTest.java | 20 +++++++++---------- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java index 855020687..13170fb2d 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java @@ -22,6 +22,7 @@ public CopyOutCommand( this.resultHandler = resultHandler; this.collector = Collector.of( Buffer::buffer, + // TODO: this might be unnecessary slow - think of alternatives (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)), (v1, v2) -> null, (finalResult) -> finalResult diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java index 5ca742ec4..320fc13a8 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java @@ -24,7 +24,7 @@ public int size() { return size; } - public void handleChunk(int len, ByteBuf in) { + public void handleChunk(ByteBuf in) { if (failure != null) { return; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index f9c951e4c..eb58ff342 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -475,8 +475,7 @@ private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {} private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) { PgCommandCodec codec = inflight.peek(); CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec; - int len = in.readUnsignedShort(); - cmdCodec.decoder.handleChunk(len, in); + cmdCodec.decoder.handleChunk(in); } private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {} diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java index 8b2ce3e85..67579a901 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java @@ -50,16 +50,16 @@ public void testCopyToCsvBytes(TestContext ctx) { ctx.assertEquals(10, result.size()); ctx.assertEquals( Buffer.buffer( - "Whatever-0\n" + - "Whatever-1\n" + - "Whatever-2\n" + - "Whatever-3\n" + - "Whatever-4\n" + - "Whatever-5\n" + - "Whatever-6\n" + - "Whatever-7\n" + - "Whatever-8\n" + - "Whatever-9\n" + "0,Whatever-0\n" + + "1,Whatever-1\n" + + "2,Whatever-2\n" + + "3,Whatever-3\n" + + "4,Whatever-4\n" + + "5,Whatever-5\n" + + "6,Whatever-6\n" + + "7,Whatever-7\n" + + "8,Whatever-8\n" + + "9,Whatever-9\n" ), result.value() ); From f200835b78c0c4778da9faf0a1841ef33681d18d Mon Sep 17 00:00:00 2001 From: psolomin Date: Fri, 28 Apr 2023 14:50:52 +0100 Subject: [PATCH 12/12] Add notes --- .../src/main/java/io/vertx/pgclient/PgConnection.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java index 5e765550b..e00ed44c2 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java @@ -130,6 +130,15 @@ static Future connect(Vertx vertx, String connectionUri) { * * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)}) * @return async result of bytes container data will be written to + * + * - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html + * - future of read stream. + * - when we do query operation + * - we should not use query result builder + * - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ? + * - we need a new interface Future> + * - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html + * - PgSocketConnection - we will apply backpressure in SocketInternal */ Future> copyToBytes(String sql);