handler);
+ /**
+ * Imports data into a database.
+ *
+ * Use this method when importing opaque bytes, e.g. from a CSV file.
+ *
+ *
If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
+ *
+ * @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
+ * @param from byte stream data will be fetched from
+ * @return result set with single field {@code rowsWritten}
+ */
+ Query> copyFrom(String sql, Future from);
+
+ /**
+ * Exports data from a database with decoding.
+ *
+ * {@code FORMAT} can only be {@code binary}.
+ *
+ * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
+ * @return decoded records
+ */
+ Query> copyTo(String sql);
+
+ /**
+ * Exports data from a database as-is, without decoding.
+ *
+ * Use this method when exporting opaque bytes, e.g. to a CSV file.
+ *
+ * @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
+ * @param to bytes container data will be written to
+ * @return async result
+ */
+ Future copyTo(String sql, Buffer to);
+
/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
*
Note: Use this with caution because the cancellation signal may or may not have any effect.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index d14b208eb..c317bbb19 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -19,6 +19,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
@@ -27,6 +28,9 @@
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
+import io.vertx.sqlclient.Query;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -107,6 +111,21 @@ public PgConnection noticeHandler(Handler handler) {
return this;
}
+ @Override
+ public Query> copyFrom(String sql, Future from) {
+ return null;
+ }
+
+ @Override
+ public Query> copyTo(String sql) {
+ return null;
+ }
+
+ @Override
+ public Future copyTo(String sql, Buffer to) {
+ return Future.succeededFuture();
+ }
+
@Override
public int processId() {
return conn.getProcessId();
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
new file mode 100644
index 000000000..ddde4a142
--- /dev/null
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -0,0 +1,28 @@
+package io.vertx.pgclient;
+
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import org.junit.Test;
+
+public class PgConnectionCopyTest extends PgConnectionTestBase {
+ @Test
+ public void testCopyToRows(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ deleteFromTestTable(ctx, conn, () -> {
+ insertIntoTestTable(ctx, conn, 10, () -> {
+ PgConnection pgConn = (PgConnection) conn;
+ pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)")
+ .execute()
+ .onComplete(ctx.asyncAssertSuccess(result -> {
+ for (int i = 0; i < 2; i++) {
+ ctx.assertEquals(1, result.rowCount());
+ result = result.next();
+ }
+ async.complete();
+ }));
+ });
+ });
+ }));
+ }
+}
From 5cbd34ab50722a9a0b22dfa0bba5f2608dcb874f Mon Sep 17 00:00:00 2001
From: psolomin
Date: Wed, 19 Apr 2023 14:52:19 +0100
Subject: [PATCH 02/12] Add example with export; rename methods
---
.../main/java/examples/PgClientExamples.java | 30 ++++++++++++++-----
.../java/io/vertx/pgclient/PgConnection.java | 2 +-
.../vertx/pgclient/impl/PgConnectionImpl.java | 2 +-
.../vertx/pgclient/PgConnectionCopyTest.java | 6 ++++
4 files changed, 30 insertions(+), 10 deletions(-)
diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
index 23575b2c3..5bc4fcc3c 100644
--- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java
+++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
@@ -19,6 +19,8 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.docgen.Source;
@@ -733,14 +735,26 @@ public void batchReturning(SqlClient client) {
});
}
- public void copyFromStdinReturning(Vertx vertx, PgConnection client) {
- client.copyFrom(
- "COPY my_table FROM STDIN (FORMAT csv, HEADER)",
- vertx.fileSystem().readFile("path/to/file")
- ).execute().onSuccess(res -> {
- Long rowsWritten = res.iterator().next().getLong("rowsWritten");
- System.out.println("rows affected: " + rowsWritten);
- });
+ public void importDataToDb(Vertx vertx, PgConnection client) {
+ vertx.fileSystem().readFile("path/to/file")
+ .flatMap(bufferAsyncResult -> {
+ return client.copyFrom(
+ "COPY my_table FROM STDIN (FORMAT csv, HEADER)",
+ bufferAsyncResult
+ ).execute();
+ }).onSuccess(result -> {
+ Long rowsWritten = result.iterator().next().getLong("rowsWritten");
+ System.out.println("rows written: " + rowsWritten);
+ });
+ }
+
+ public void exportDataFromDb(Vertx vertx, PgConnection client) {
+ Buffer buffer = new BufferImpl();
+ String path = "path/to/file";
+ client.copyTo("COPY my_table TO STDOUT (FORMAT csv, HEADER)", buffer)
+ .andThen(res -> {
+ vertx.fileSystem().writeFile("path/to/file.csv", buffer);
+ }).onSuccess(res -> System.out.println("Data exported to " + path));
}
public void pgBouncer(PgConnectOptions connectOptions) {
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
index 95a4d1a71..c049c1005 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
@@ -110,7 +110,7 @@ static Future connect(Vertx vertx, String connectionUri) {
* @param from byte stream data will be fetched from
* @return result set with single field {@code rowsWritten}
*/
- Query> copyFrom(String sql, Future from);
+ Query> copyFrom(String sql, Buffer from);
/**
* Exports data from a database with decoding.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index c317bbb19..a4bb26fc2 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -112,7 +112,7 @@ public PgConnection noticeHandler(Handler handler) {
}
@Override
- public Query> copyFrom(String sql, Future from) {
+ public Query> copyFrom(String sql, Buffer from) {
return null;
}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index ddde4a142..aa98c04fe 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -5,6 +5,12 @@
import org.junit.Test;
public class PgConnectionCopyTest extends PgConnectionTestBase {
+ public PgConnectionCopyTest() {
+ connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> {
+ handler.handle(ar.map(p -> p));
+ });
+ }
+
@Test
public void testCopyToRows(TestContext ctx) {
Async async = ctx.async();
From 38d58448537afd96ba418ef6dde9c800f195efac Mon Sep 17 00:00:00 2001
From: psolomin
Date: Wed, 19 Apr 2023 15:00:06 +0100
Subject: [PATCH 03/12] Change methods' signatures
---
.../src/main/java/examples/PgClientExamples.java | 12 ++++--------
.../main/java/io/vertx/pgclient/PgConnection.java | 9 ++++-----
.../io/vertx/pgclient/impl/PgConnectionImpl.java | 8 ++++----
.../java/io/vertx/pgclient/PgConnectionCopyTest.java | 2 +-
4 files changed, 13 insertions(+), 18 deletions(-)
diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
index 5bc4fcc3c..beffbdd74 100644
--- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java
+++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
@@ -19,8 +19,6 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.docgen.Source;
@@ -738,7 +736,7 @@ public void batchReturning(SqlClient client) {
public void importDataToDb(Vertx vertx, PgConnection client) {
vertx.fileSystem().readFile("path/to/file")
.flatMap(bufferAsyncResult -> {
- return client.copyFrom(
+ return client.copyFromBytes(
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
bufferAsyncResult
).execute();
@@ -749,12 +747,10 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
}
public void exportDataFromDb(Vertx vertx, PgConnection client) {
- Buffer buffer = new BufferImpl();
String path = "path/to/file";
- client.copyTo("COPY my_table TO STDOUT (FORMAT csv, HEADER)", buffer)
- .andThen(res -> {
- vertx.fileSystem().writeFile("path/to/file.csv", buffer);
- }).onSuccess(res -> System.out.println("Data exported to " + path));
+ client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute()
+ .flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
+ .onSuccess(res -> System.out.println("Data exported to " + path));
}
public void pgBouncer(PgConnectOptions connectOptions) {
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
index c049c1005..a52c84849 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
@@ -110,7 +110,7 @@ static Future connect(Vertx vertx, String connectionUri) {
* @param from byte stream data will be fetched from
* @return result set with single field {@code rowsWritten}
*/
- Query> copyFrom(String sql, Buffer from);
+ Query> copyFromBytes(String sql, Buffer from);
/**
* Exports data from a database with decoding.
@@ -120,7 +120,7 @@ static Future connect(Vertx vertx, String connectionUri) {
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
* @return decoded records
*/
- Query> copyTo(String sql);
+ Query> copyToRows(String sql);
/**
* Exports data from a database as-is, without decoding.
@@ -128,10 +128,9 @@ static Future connect(Vertx vertx, String connectionUri) {
* Use this method when exporting opaque bytes, e.g. to a CSV file.
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
- * @param to bytes container data will be written to
- * @return async result
+ * @return async result of bytes container data will be written to
*/
- Future copyTo(String sql, Buffer to);
+ Query copyToBytes(String sql);
/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index a4bb26fc2..4db7c6140 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -112,18 +112,18 @@ public PgConnection noticeHandler(Handler handler) {
}
@Override
- public Query> copyFrom(String sql, Buffer from) {
+ public Query> copyFromBytes(String sql, Buffer from) {
return null;
}
@Override
- public Query> copyTo(String sql) {
+ public Query> copyToRows(String sql) {
return null;
}
@Override
- public Future copyTo(String sql, Buffer to) {
- return Future.succeededFuture();
+ public Query copyToBytes(String sql) {
+ return null;
}
@Override
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index aa98c04fe..91b0129a7 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -18,7 +18,7 @@ public void testCopyToRows(TestContext ctx) {
deleteFromTestTable(ctx, conn, () -> {
insertIntoTestTable(ctx, conn, 10, () -> {
PgConnection pgConn = (PgConnection) conn;
- pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)")
+ pgConn.copyToRows("COPY my_table TO STDOUT (FORMAT binary)")
.execute()
.onComplete(ctx.asyncAssertSuccess(result -> {
for (int i = 0; i < 2; i++) {
From e596664e8b833b6627922f449bfdff0e911bc833 Mon Sep 17 00:00:00 2001
From: psolomin
Date: Fri, 21 Apr 2023 17:53:16 +0100
Subject: [PATCH 04/12] Add auxiliary test
---
.../vertx/pgclient/PgConnectionCopyTest.java | 31 +++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 91b0129a7..b03fdae8b 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -31,4 +31,35 @@ public void testCopyToRows(TestContext ctx) {
});
}));
}
+
+ /**
+ * Just a thingy to eavesdrop protocol interactions.
+ *
+ * tips:
+ * - frontend / backend protocol -> message flow -> binary
+ * - start with CommandBase, SimpleQueryCommandCodecBase, builder.executeSimpleQuery, QueryExecutor, QueryResultBuilder
+ * - PgDecoder
+ * - startup message
+ * - auth
+ * - Simple Query
+ * - use wireshark - `tcp port 5432`
+ * - add VM option - port - such that it's fixed
+ *
+ * TODO: drop this.
+ *
+ * @param ctx
+ */
+ @Test
+ public void testSimpleQuery(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ conn
+ .query("SELECT 1")
+ .execute()
+ .onComplete(ctx.asyncAssertSuccess(result1 -> {
+ ctx.assertEquals(1, result1.size());
+ async.complete();
+ }));
+ }));
+ }
}
From cd04992450abff248c39e5dcd9da1045e85c8e11 Mon Sep 17 00:00:00 2001
From: psolomin
Date: Fri, 21 Apr 2023 19:07:51 +0100
Subject: [PATCH 05/12] COPY TO STDOUT doesn't throw
UnsupportedOperationException
TODO: put into separate loop?
---
.../vertx/pgclient/impl/codec/PgDecoder.java | 20 +++++++++++++++++++
.../impl/codec/PgProtocolConstants.java | 5 +++++
.../vertx/pgclient/PgConnectionCopyTest.java | 4 ++--
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
index 058c0d629..76991bb00 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
@@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) {
decodeNotificationResponse(ctx, in);
break;
}
+ // TODO: check if these handlers need to be at this level of loop
+ // TODO: check if COPY needs a separate loop
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: {
+ decodeCopyOutResponse(ctx, in);
+ break;
+ }
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: {
+ decodeCopyData(ctx, in);
+ break;
+ }
+ case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: {
+ decodeCopyCompletion(ctx, in);
+ break;
+ }
default: {
throw new UnsupportedOperationException();
}
@@ -455,4 +469,10 @@ private void decodeBackendKeyData(ByteBuf in) {
private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in)));
}
+
+ private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}
+
+ private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {}
+
+ private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
index bc8fa9a0d..de61c2136 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
@@ -71,4 +71,9 @@ public class PgProtocolConstants {
public static final byte MESSAGE_TYPE_FUNCTION_RESULT = 'V';
public static final byte MESSAGE_TYPE_SSL_YES = 'S';
public static final byte MESSAGE_TYPE_SSL_NO = 'N';
+
+ // COPY-related
+ public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H';
+ public static final byte MESSAGE_TYPE_COPY_DATA = 'd';
+ public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c';
}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index b03fdae8b..4a5abb6fa 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -54,10 +54,10 @@ public void testSimpleQuery(TestContext ctx) {
Async async = ctx.async();
connector.accept(ctx.asyncAssertSuccess(conn -> {
conn
- .query("SELECT 1")
+ .query("COPY world TO STDOUT (FORMAT csv)")
.execute()
.onComplete(ctx.asyncAssertSuccess(result1 -> {
- ctx.assertEquals(1, result1.size());
+ ctx.assertEquals(0, result1.size());
async.complete();
}));
}));
From bdeeae24f6975b3ff4568cc720a00c9df6fc9b7b Mon Sep 17 00:00:00 2001
From: psolomin
Date: Sat, 22 Apr 2023 14:43:38 +0100
Subject: [PATCH 06/12] WIP - 0
Hacking around the API to make Future coming out
---
.../main/java/examples/PgClientExamples.java | 2 +-
.../java/io/vertx/pgclient/PgConnection.java | 2 +-
.../vertx/pgclient/impl/PgConnectionImpl.java | 27 ++++++++++++--
.../impl/codec/PgProtocolConstants.java | 8 ++++-
.../vertx/pgclient/PgConnectionCopyTest.java | 36 ++++++++++++++++++-
.../sqlclient/impl/QueryResultBuilder.java | 3 +-
6 files changed, 71 insertions(+), 7 deletions(-)
diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
index beffbdd74..a9ebbd935 100644
--- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java
+++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
@@ -748,7 +748,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
public void exportDataFromDb(Vertx vertx, PgConnection client) {
String path = "path/to/file";
- client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute()
+ client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
.flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
.onSuccess(res -> System.out.println("Data exported to " + path));
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
index a52c84849..c4f034719 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
@@ -130,7 +130,7 @@ static Future connect(Vertx vertx, String connectionUri) {
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
* @return async result of bytes container data will be written to
*/
- Query copyToBytes(String sql);
+ Future copyToBytes(String sql);
/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index 4db7c6140..533b1acec 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -21,6 +21,7 @@
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
@@ -31,10 +32,16 @@
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
+import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
+import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
+import io.vertx.sqlclient.impl.SqlResultImpl;
+import io.vertx.sqlclient.impl.command.CommandBase;
+
+import java.util.function.Function;
public class PgConnectionImpl extends SqlConnectionBase implements PgConnection {
@@ -122,8 +129,14 @@ public Query> copyToRows(String sql) {
}
@Override
- public Query copyToBytes(String sql) {
- return null;
+ public Future copyToBytes(String sql) {
+ Function> factory = null;
+ PromiseInternal> promise = null;
+
+ QueryResultBuilder, SqlResult> resultHandler =
+ new QueryResultBuilder<>(factory, promise);
+ CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
+ return this.schedule(context, cmd);
}
@Override
@@ -145,4 +158,14 @@ public Future cancelRequest() {
});
return promise.future();
}
+
+ private class CopyOutCommand extends CommandBase {
+ private final String sql;
+ private final QueryResultBuilder, SqlResult> resultHandler;
+
+ CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) {
+ this.sql = sql;
+ this.resultHandler = resultHandler;
+ }
+ }
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
index de61c2136..dd487ce7e 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java
@@ -72,7 +72,13 @@ public class PgProtocolConstants {
public static final byte MESSAGE_TYPE_SSL_YES = 'S';
public static final byte MESSAGE_TYPE_SSL_NO = 'N';
- // COPY-related
+ /**
+ * COPY-out messages.
+ *
+ * Other messages which might appear in between CopyData:
+ *
NoticeResponse
+ * ParameterStatus
+ */
public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H';
public static final byte MESSAGE_TYPE_COPY_DATA = 'd';
public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c';
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 4a5abb6fa..8acdf3ba9 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -2,6 +2,7 @@
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
+import org.junit.Ignore;
import org.junit.Test;
public class PgConnectionCopyTest extends PgConnectionTestBase {
@@ -32,6 +33,24 @@ public void testCopyToRows(TestContext ctx) {
}));
}
+ @Test
+ @Ignore("For now it just hangs forever")
+ public void testCopyToCsvBytes(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ deleteFromTestTable(ctx, conn, () -> {
+ insertIntoTestTable(ctx, conn, 10, () -> {
+ PgConnection pgConn = (PgConnection) conn;
+ pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
+ .onComplete(ctx.asyncAssertSuccess(buffer -> {
+ buffer.getBytes();
+ async.complete();
+ }));
+ });
+ });
+ }));
+ }
+
/**
* Just a thingy to eavesdrop protocol interactions.
*
@@ -54,9 +73,24 @@ public void testSimpleQuery(TestContext ctx) {
Async async = ctx.async();
connector.accept(ctx.asyncAssertSuccess(conn -> {
conn
- .query("COPY world TO STDOUT (FORMAT csv)")
+ .query("select 1")
+ .execute()
+ .onComplete(ctx.asyncAssertSuccess(result1 -> {
+ ctx.assertEquals(1, result1.size());
+ async.complete();
+ }));
+ }));
+ }
+
+ @Test
+ public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) {
+ Async async = ctx.async();
+ connector.accept(ctx.asyncAssertSuccess(conn -> {
+ conn
+ .query("copy fortune to stdout (format csv)")
.execute()
.onComplete(ctx.asyncAssertSuccess(result1 -> {
+ // nothing comes out
ctx.assertEquals(0, result1.size());
async.complete();
}));
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
index 091cf9e31..9e8287df1 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java
@@ -38,7 +38,8 @@ public class QueryResultBuilder, L extends SqlResu
private Throwable failure;
private boolean suspended;
- QueryResultBuilder(Function factory, PromiseInternal handler) {
+ // TODO: public-ing this shouldn't be needed
+ public QueryResultBuilder(Function factory, PromiseInternal handler) {
this.factory = factory;
this.handler = handler;
}
From c335f8a58d86c04d5472f7485c0f424afb59252d Mon Sep 17 00:00:00 2001
From: psolomin
Date: Thu, 27 Apr 2023 10:41:55 +0100
Subject: [PATCH 07/12] WIP: Stuck with copyToBytes
---
.../main/java/examples/PgClientExamples.java | 2 +-
.../java/io/vertx/pgclient/PgConnection.java | 3 +-
.../vertx/pgclient/impl/PgConnectionImpl.java | 42 ++++++++++++-------
.../pgclient/impl/codec/CopyOutCommand.java | 17 ++++++++
.../impl/codec/CopyOutCommandCodec.java | 4 ++
.../vertx/pgclient/PgConnectionCopyTest.java | 9 ++--
.../vertx/sqlclient/impl/QueryExecutor.java | 4 +-
7 files changed, 57 insertions(+), 24 deletions(-)
create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
diff --git a/vertx-pg-client/src/main/java/examples/PgClientExamples.java b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
index a9ebbd935..6c465fab9 100644
--- a/vertx-pg-client/src/main/java/examples/PgClientExamples.java
+++ b/vertx-pg-client/src/main/java/examples/PgClientExamples.java
@@ -749,7 +749,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
public void exportDataFromDb(Vertx vertx, PgConnection client) {
String path = "path/to/file";
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
- .flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
+ .flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value()))
.onSuccess(res -> System.out.println("Data exported to " + path));
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
index c4f034719..5e765550b 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
@@ -29,6 +29,7 @@
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
+import io.vertx.sqlclient.SqlResult;
import java.util.List;
@@ -130,7 +131,7 @@ static Future connect(Vertx vertx, String connectionUri) {
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
* @return async result of bytes container data will be written to
*/
- Future copyToBytes(String sql);
+ Future> copyToBytes(String sql);
/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index 533b1acec..99c5a4d23 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -20,12 +20,14 @@
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
+import io.vertx.pgclient.impl.codec.CopyOutCommand;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
@@ -35,13 +37,17 @@
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
+import io.vertx.sqlclient.impl.QueryExecutor;
import io.vertx.sqlclient.impl.QueryResultBuilder;
+import io.vertx.sqlclient.impl.QueryResultHandler;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
import io.vertx.sqlclient.impl.SqlResultImpl;
-import io.vertx.sqlclient.impl.command.CommandBase;
+import io.vertx.sqlclient.impl.command.QueryCommandBase;
+import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
import java.util.function.Function;
+import java.util.stream.Collector;
public class PgConnectionImpl extends SqlConnectionBase implements PgConnection {
@@ -129,14 +135,29 @@ public Query> copyToRows(String sql) {
}
@Override
- public Future copyToBytes(String sql) {
- Function> factory = null;
- PromiseInternal> promise = null;
+ public Future> copyToBytes(String sql) {
+ Function> factory = SqlResultImpl::new;
+ PromiseInternal> promise = context.promise();
QueryResultBuilder, SqlResult> resultHandler =
new QueryResultBuilder<>(factory, promise);
- CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
- return this.schedule(context, cmd);
+
+ Collector collector = Collector.of(
+ BufferImpl::new,
+ (v, row) -> {
+ System.out.println(row);
+ },
+ (v1, v2) -> null,
+ Function.identity()
+ );
+
+ SimpleQueryCommand cmd = new SimpleQueryCommand<>(
+ sql, true, false, collector, resultHandler);
+ // this.schedule(promise.context(), cmd);
+
+ QueryExecutor executor = new QueryExecutor(factory, collector);
+ executor.executeSimpleQuery(this, sql, false, false, promise);
+ return promise.future();
}
@Override
@@ -159,13 +180,4 @@ public Future cancelRequest() {
return promise.future();
}
- private class CopyOutCommand extends CommandBase {
- private final String sql;
- private final QueryResultBuilder, SqlResult> resultHandler;
-
- CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) {
- this.sql = sql;
- this.resultHandler = resultHandler;
- }
- }
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
new file mode 100644
index 000000000..13b0e8faf
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -0,0 +1,17 @@
+package io.vertx.pgclient.impl.codec;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.sqlclient.SqlResult;
+import io.vertx.sqlclient.impl.QueryResultBuilder;
+import io.vertx.sqlclient.impl.SqlResultImpl;
+import io.vertx.sqlclient.impl.command.CommandBase;
+
+public class CopyOutCommand extends CommandBase {
+ private final String sql;
+ private final QueryResultBuilder, SqlResult> resultHandler;
+
+ public CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) {
+ this.sql = sql;
+ this.resultHandler = resultHandler;
+ }
+}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
new file mode 100644
index 000000000..70b6fbfb1
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
@@ -0,0 +1,4 @@
+package io.vertx.pgclient.impl.codec;
+
+public class CopyOutCommandCodec {
+}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 8acdf3ba9..e2b4da7f3 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -34,16 +34,15 @@ public void testCopyToRows(TestContext ctx) {
}
@Test
- @Ignore("For now it just hangs forever")
public void testCopyToCsvBytes(TestContext ctx) {
Async async = ctx.async();
connector.accept(ctx.asyncAssertSuccess(conn -> {
deleteFromTestTable(ctx, conn, () -> {
- insertIntoTestTable(ctx, conn, 10, () -> {
+ insertIntoTestTable(ctx, conn, 1, () -> {
PgConnection pgConn = (PgConnection) conn;
- pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
- .onComplete(ctx.asyncAssertSuccess(buffer -> {
- buffer.getBytes();
+ pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)")
+ .onComplete(ctx.asyncAssertSuccess(result -> {
+ result.value().getBytes();
async.complete();
}));
});
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
index 182472c86..67019b057 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java
@@ -34,7 +34,7 @@
/**
* Executes query.
*/
-class QueryExecutor, L extends SqlResult> {
+public class QueryExecutor, L extends SqlResult> {
private final Function factory;
private final Collector collector;
@@ -49,7 +49,7 @@ private QueryResultBuilder createHandler(PromiseInternal promise) {
return new QueryResultBuilder<>(factory, promise);
}
- void executeSimpleQuery(CommandScheduler scheduler,
+ public void executeSimpleQuery(CommandScheduler scheduler,
String sql,
boolean autoCommit,
boolean singleton,
From de9fabd5b8163d378021636cb5c9a5c75a13f123 Mon Sep 17 00:00:00 2001
From: psolomin
Date: Thu, 27 Apr 2023 14:43:32 +0100
Subject: [PATCH 08/12] WIP - buffer accumulates csv text; Broken resultHandler
---
.../vertx/pgclient/impl/PgConnectionImpl.java | 19 +----
.../pgclient/impl/codec/CopyOutCommand.java | 30 +++++++-
.../impl/codec/CopyOutCommandCodec.java | 40 ++++++++++-
.../impl/codec/CopyOutDataDecoder.java | 71 +++++++++++++++++++
.../vertx/pgclient/impl/codec/PgDecoder.java | 7 +-
.../vertx/pgclient/impl/codec/PgEncoder.java | 2 +
.../vertx/pgclient/PgConnectionCopyTest.java | 15 ----
7 files changed, 149 insertions(+), 35 deletions(-)
create mode 100644 vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index 99c5a4d23..6cab9c519 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -19,6 +19,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.ContextInternal;
@@ -142,22 +143,8 @@ public Future> copyToBytes(String sql) {
QueryResultBuilder, SqlResult> resultHandler =
new QueryResultBuilder<>(factory, promise);
- Collector collector = Collector.of(
- BufferImpl::new,
- (v, row) -> {
- System.out.println(row);
- },
- (v1, v2) -> null,
- Function.identity()
- );
-
- SimpleQueryCommand cmd = new SimpleQueryCommand<>(
- sql, true, false, collector, resultHandler);
- // this.schedule(promise.context(), cmd);
-
- QueryExecutor executor = new QueryExecutor(factory, collector);
- executor.executeSimpleQuery(this, sql, false, false, promise);
- return promise.future();
+ CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
+ return this.schedule(promise.context(), cmd);
}
@Override
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
index 13b0e8faf..f128c9a61 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -1,17 +1,43 @@
package io.vertx.pgclient.impl.codec;
+import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.CommandBase;
-public class CopyOutCommand extends CommandBase {
+import java.util.function.Function;
+import java.util.stream.Collector;
+
+public class CopyOutCommand extends CommandBase> {
private final String sql;
+ private final Collector collector;
private final QueryResultBuilder, SqlResult> resultHandler;
- public CopyOutCommand(String sql, QueryResultBuilder, SqlResult> resultHandler) {
+ public CopyOutCommand(
+ String sql,
+ QueryResultBuilder, SqlResult> resultHandler
+ ) {
this.sql = sql;
this.resultHandler = resultHandler;
+ this.collector = Collector.of(
+ Buffer::buffer,
+ (v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
+ (v1, v2) -> null,
+ Function.identity()
+ );
+ }
+
+ QueryResultBuilder, SqlResult> resultHandler() {
+ return resultHandler;
+ }
+
+ String sql() {
+ return sql;
+ }
+
+ Collector collector() {
+ return collector;
}
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
index 70b6fbfb1..948dc8db9 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
@@ -1,4 +1,42 @@
package io.vertx.pgclient.impl.codec;
-public class CopyOutCommandCodec {
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.BufferImpl;
+import io.vertx.sqlclient.SqlResult;
+
+class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> {
+ CopyOutDataDecoder decoder;
+
+ CopyOutCommandCodec(CopyOutCommand cmd) {
+ super(cmd);
+ decoder = new CopyOutDataDecoder(cmd.collector());
+ }
+
+ @Override
+ public void handleCommandComplete(int updated) {
+ this.result = null;
+ Buffer result;
+ Throwable failure;
+ int size;
+ if (decoder != null) {
+ failure = decoder.complete();
+ result = decoder.result();
+ size = decoder.size();
+ decoder.reset();
+ } else {
+ failure = null;
+ result = new BufferImpl();
+ size = 0;
+ }
+ cmd.resultHandler().handleResult(updated, size, null, result, failure);
+ }
+
+ @Override
+ public void handleErrorResponse(ErrorResponse errorResponse) {
+ failure = errorResponse.toException();
+ }
+
+ void encode(PgEncoder encoder) {
+ encoder.writeQuery(new Query(cmd.sql()));
+ }
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
new file mode 100644
index 000000000..5ca742ec4
--- /dev/null
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
@@ -0,0 +1,71 @@
+package io.vertx.pgclient.impl.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.vertx.core.buffer.Buffer;
+import java.util.function.BiConsumer;
+import java.util.stream.Collector;
+
+public class CopyOutDataDecoder {
+
+ private final Collector collector;
+ private BiConsumer accumulator;
+
+ private int size;
+ private Buffer container;
+ private Throwable failure;
+ private Buffer result;
+
+ protected CopyOutDataDecoder(Collector collector) {
+ this.collector = collector;
+ reset();
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public void handleChunk(int len, ByteBuf in) {
+ if (failure != null) {
+ return;
+ }
+ if (accumulator == null) {
+ try {
+ accumulator = collector.accumulator();
+ } catch (Exception e) {
+ failure = e;
+ return;
+ }
+ }
+ try {
+ accumulator.accept(container, in);
+ } catch (Exception e) {
+ failure = e;
+ return;
+ }
+ size++;
+ }
+
+ public Buffer result() {
+ return result;
+ }
+
+ public Throwable complete() {
+ try {
+ result = collector.finisher().apply(container);
+ } catch (Exception e) {
+ failure = e;
+ }
+ return failure;
+ }
+
+ public void reset() {
+ size = 0;
+ failure = null;
+ result = null;
+ try {
+ this.container = collector.supplier().get();
+ } catch (Exception e) {
+ failure = e;
+ }
+ }
+}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
index 76991bb00..f9c951e4c 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
@@ -472,7 +472,12 @@ private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}
- private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {}
+ private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
+ PgCommandCodec, ?> codec = inflight.peek();
+ CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
+ int len = in.readUnsignedShort();
+ cmdCodec.decoder.handleChunk(len, in);
+ }
private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
index d35200cb8..0f10e1aed 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java
@@ -119,6 +119,8 @@ void write(CommandBase> cmd) {
return new ClosePortalCommandCodec((CloseCursorCommand) cmd);
} else if (cmd instanceof CloseStatementCommand) {
return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
+ } else if (cmd instanceof CopyOutCommand) {
+ return new CopyOutCommandCodec((CopyOutCommand) cmd);
}
throw new AssertionError();
}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index e2b4da7f3..6c574ba9f 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -80,19 +80,4 @@ public void testSimpleQuery(TestContext ctx) {
}));
}));
}
-
- @Test
- public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) {
- Async async = ctx.async();
- connector.accept(ctx.asyncAssertSuccess(conn -> {
- conn
- .query("copy fortune to stdout (format csv)")
- .execute()
- .onComplete(ctx.asyncAssertSuccess(result1 -> {
- // nothing comes out
- ctx.assertEquals(0, result1.size());
- async.complete();
- }));
- }));
- }
}
From 7977926d6359f586237db5fd0410a645261834ea Mon Sep 17 00:00:00 2001
From: psolomin
Date: Thu, 27 Apr 2023 20:16:43 +0100
Subject: [PATCH 09/12] WIP - unable to get the correct result out
---
.../main/java/io/vertx/pgclient/impl/PgConnectionImpl.java | 5 ++++-
.../java/io/vertx/pgclient/impl/codec/CopyOutCommand.java | 3 +--
.../io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java | 3 ++-
.../test/java/io/vertx/pgclient/PgConnectionCopyTest.java | 2 +-
4 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index 6cab9c519..491c2e35b 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -137,9 +137,12 @@ public Query> copyToRows(String sql) {
@Override
public Future> copyToBytes(String sql) {
- Function> factory = SqlResultImpl::new;
+ Function> factory = (buffer) -> new SqlResultImpl<>(buffer);
PromiseInternal> promise = context.promise();
+ // currently, this loads entire content into Buffer
+ // it should stream bytes out instead
+ // TODO: signal completion as soon as the database replied CopyOutResponse 'H' ?
QueryResultBuilder, SqlResult> resultHandler =
new QueryResultBuilder<>(factory, promise);
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
index f128c9a61..69a8fb7cb 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -7,7 +7,6 @@
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.CommandBase;
-import java.util.function.Function;
import java.util.stream.Collector;
public class CopyOutCommand extends CommandBase> {
@@ -25,7 +24,7 @@ public CopyOutCommand(
Buffer::buffer,
(v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
(v1, v2) -> null,
- Function.identity()
+ (finalResult) -> finalResult
);
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
index 948dc8db9..13993532b 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
@@ -3,6 +3,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.sqlclient.SqlResult;
+import io.vertx.sqlclient.impl.SqlResultImpl;
class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> {
CopyOutDataDecoder decoder;
@@ -14,7 +15,7 @@ class CopyOutCommandCodec extends PgCommandCodec, CopyOutComma
@Override
public void handleCommandComplete(int updated) {
- this.result = null;
+ this.result = new SqlResultImpl(Buffer.buffer("abc"));
Buffer result;
Throwable failure;
int size;
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 6c574ba9f..04e50585c 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -2,7 +2,6 @@
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
-import org.junit.Ignore;
import org.junit.Test;
public class PgConnectionCopyTest extends PgConnectionTestBase {
@@ -74,6 +73,7 @@ public void testSimpleQuery(TestContext ctx) {
conn
.query("select 1")
.execute()
+ // when does the result is transformed from bool to rows?
.onComplete(ctx.asyncAssertSuccess(result1 -> {
ctx.assertEquals(1, result1.size());
async.complete();
From c499da900da626424060d30c2b8ec68a1a16b909 Mon Sep 17 00:00:00 2001
From: psolomin
Date: Fri, 28 Apr 2023 12:15:33 +0100
Subject: [PATCH 10/12] WIP - managed to get bytes out from query result
---
.../vertx/pgclient/impl/PgConnectionImpl.java | 3 ++-
.../pgclient/impl/codec/CopyOutCommand.java | 2 +-
.../impl/codec/CopyOutCommandCodec.java | 4 ++--
.../vertx/pgclient/PgConnectionCopyTest.java | 23 ++++++++++++++++++-
4 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
index 491c2e35b..5161e8d51 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java
@@ -147,7 +147,8 @@ public Future> copyToBytes(String sql) {
new QueryResultBuilder<>(factory, promise);
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
- return this.schedule(promise.context(), cmd);
+ this.schedule(promise.context(), cmd).onComplete(resultHandler);
+ return promise.future();
}
@Override
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
index 69a8fb7cb..855020687 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -9,7 +9,7 @@
import java.util.stream.Collector;
-public class CopyOutCommand extends CommandBase> {
+public class CopyOutCommand extends CommandBase {
private final String sql;
private final Collector collector;
private final QueryResultBuilder, SqlResult> resultHandler;
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
index 13993532b..5f23a7651 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommandCodec.java
@@ -5,7 +5,7 @@
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.SqlResultImpl;
-class CopyOutCommandCodec extends PgCommandCodec, CopyOutCommand> {
+class CopyOutCommandCodec extends PgCommandCodec {
CopyOutDataDecoder decoder;
CopyOutCommandCodec(CopyOutCommand cmd) {
@@ -15,7 +15,7 @@ class CopyOutCommandCodec extends PgCommandCodec, CopyOutComma
@Override
public void handleCommandComplete(int updated) {
- this.result = new SqlResultImpl(Buffer.buffer("abc"));
+ this.result = false;
Buffer result;
Throwable failure;
int size;
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 04e50585c..8b2ce3e85 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -1,9 +1,13 @@
package io.vertx.pgclient;
+import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+
public class PgConnectionCopyTest extends PgConnectionTestBase {
public PgConnectionCopyTest() {
connector = (handler) -> PgConnection.connect(vertx, options).onComplete(ar -> {
@@ -41,7 +45,24 @@ public void testCopyToCsvBytes(TestContext ctx) {
PgConnection pgConn = (PgConnection) conn;
pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)")
.onComplete(ctx.asyncAssertSuccess(result -> {
- result.value().getBytes();
+ ctx.assertNull(result.columnDescriptors());
+ ctx.assertEquals(10, result.rowCount());
+ ctx.assertEquals(10, result.size());
+ ctx.assertEquals(
+ Buffer.buffer(
+ "Whatever-0\n" +
+ "Whatever-1\n" +
+ "Whatever-2\n" +
+ "Whatever-3\n" +
+ "Whatever-4\n" +
+ "Whatever-5\n" +
+ "Whatever-6\n" +
+ "Whatever-7\n" +
+ "Whatever-8\n" +
+ "Whatever-9\n"
+ ),
+ result.value()
+ );
async.complete();
}));
});
From 3797477077933b61f66ac9874dbfa6670e8fc298 Mon Sep 17 00:00:00 2001
From: psolomin
Date: Fri, 28 Apr 2023 12:42:58 +0100
Subject: [PATCH 11/12] Fix bug: unnecessary read of data content - corrupt
output
---
.../pgclient/impl/codec/CopyOutCommand.java | 1 +
.../impl/codec/CopyOutDataDecoder.java | 2 +-
.../vertx/pgclient/impl/codec/PgDecoder.java | 3 +--
.../vertx/pgclient/PgConnectionCopyTest.java | 20 +++++++++----------
4 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
index 855020687..13170fb2d 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutCommand.java
@@ -22,6 +22,7 @@ public CopyOutCommand(
this.resultHandler = resultHandler;
this.collector = Collector.of(
Buffer::buffer,
+ // TODO: this might be unnecessary slow - think of alternatives
(v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
(v1, v2) -> null,
(finalResult) -> finalResult
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
index 5ca742ec4..320fc13a8 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CopyOutDataDecoder.java
@@ -24,7 +24,7 @@ public int size() {
return size;
}
- public void handleChunk(int len, ByteBuf in) {
+ public void handleChunk(ByteBuf in) {
if (failure != null) {
return;
}
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
index f9c951e4c..eb58ff342 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java
@@ -475,8 +475,7 @@ private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}
private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
PgCommandCodec, ?> codec = inflight.peek();
CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
- int len = in.readUnsignedShort();
- cmdCodec.decoder.handleChunk(len, in);
+ cmdCodec.decoder.handleChunk(in);
}
private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
index 8b2ce3e85..67579a901 100644
--- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
+++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java
@@ -50,16 +50,16 @@ public void testCopyToCsvBytes(TestContext ctx) {
ctx.assertEquals(10, result.size());
ctx.assertEquals(
Buffer.buffer(
- "Whatever-0\n" +
- "Whatever-1\n" +
- "Whatever-2\n" +
- "Whatever-3\n" +
- "Whatever-4\n" +
- "Whatever-5\n" +
- "Whatever-6\n" +
- "Whatever-7\n" +
- "Whatever-8\n" +
- "Whatever-9\n"
+ "0,Whatever-0\n" +
+ "1,Whatever-1\n" +
+ "2,Whatever-2\n" +
+ "3,Whatever-3\n" +
+ "4,Whatever-4\n" +
+ "5,Whatever-5\n" +
+ "6,Whatever-6\n" +
+ "7,Whatever-7\n" +
+ "8,Whatever-8\n" +
+ "9,Whatever-9\n"
),
result.value()
);
From f200835b78c0c4778da9faf0a1841ef33681d18d Mon Sep 17 00:00:00 2001
From: psolomin
Date: Fri, 28 Apr 2023 14:50:52 +0100
Subject: [PATCH 12/12] Add notes
---
.../src/main/java/io/vertx/pgclient/PgConnection.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
index 5e765550b..e00ed44c2 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
@@ -130,6 +130,15 @@ static Future connect(Vertx vertx, String connectionUri) {
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
* @return async result of bytes container data will be written to
+ *
+ * - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
+ * - future of read stream.
+ * - when we do query operation
+ * - we should not use query result builder
+ * - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ?
+ * - we need a new interface Future>
+ * - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
+ * - PgSocketConnection - we will apply backpressure in SocketInternal
*/
Future> copyToBytes(String sql);