Skip to content

Commit 2e0d57d

Browse files
committed
Add skeleton for COPY feature API
1 parent 769a6d2 commit 2e0d57d

File tree

4 files changed

+98
-1
lines changed

4 files changed

+98
-1
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,16 @@ public void batchReturning(SqlClient client) {
733733
});
734734
}
735735

736+
public void copyFromStdinReturning(Vertx vertx, PgConnection client) {
737+
client.copyFrom(
738+
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
739+
vertx.fileSystem().readFile("path/to/file")
740+
).execute().onSuccess(res -> {
741+
Long rowsWritten = res.iterator().next().getLong("rowsWritten");
742+
System.out.println("rows affected: " + rowsWritten);
743+
});
744+
}
745+
736746
public void pgBouncer(PgConnectOptions connectOptions) {
737747
connectOptions.setUseLayer7Proxy(true);
738748
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717

1818
package io.vertx.pgclient;
1919

20+
import io.vertx.core.buffer.Buffer;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.pgclient.impl.PgConnectionImpl;
23+
import io.vertx.sqlclient.Query;
24+
import io.vertx.sqlclient.Row;
25+
import io.vertx.sqlclient.RowSet;
2226
import io.vertx.sqlclient.SqlConnection;
2327
import io.vertx.codegen.annotations.Fluent;
2428
import io.vertx.codegen.annotations.VertxGen;
2529
import io.vertx.core.*;
2630

31+
import java.util.List;
32+
2733
/**
2834
* A connection to Postgres.
2935
* <P>
@@ -32,6 +38,7 @@
3238
* <ul>
3339
* <li>Notification</li>
3440
* <li>Request Cancellation</li>
41+
* <li>Copy from STDIN / to STDOUT</li>
3542
* </ul>
3643
* </P>
3744
*
@@ -90,6 +97,40 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
9097
@Fluent
9198
PgConnection noticeHandler(Handler<PgNotice> handler);
9299

100+
/**
101+
* Imports data into a database.
102+
*
103+
* <p>Use this method when importing opaque bytes, e.g. from a CSV file.
104+
*
105+
* <p>If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
106+
*
107+
* @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
108+
* @param from byte stream data will be fetched from
109+
* @return result set with single field {@code rowsWritten}
110+
*/
111+
Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from);
112+
113+
/**
114+
* Exports data from a database with decoding.
115+
*
116+
* {@code FORMAT} can only be {@code binary}.
117+
*
118+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
119+
* @return decoded records
120+
*/
121+
Query<RowSet<Row>> copyTo(String sql);
122+
123+
/**
124+
* Exports data from a database as-is, without decoding.
125+
*
126+
* <p>Use this method when exporting opaque bytes, e.g. to a CSV file.
127+
*
128+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
129+
* @param to bytes container data will be written to
130+
* @return async result
131+
*/
132+
Future<Void> copyTo(String sql, Buffer to);
133+
93134
/**
94135
* Send a request cancellation message to tell the server to cancel processing request in this connection.
95136
* <br>Note: Use this with caution because the cancellation signal may or may not have any effect.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
*/
1717
package io.vertx.pgclient.impl;
1818

19+
import io.vertx.core.buffer.Buffer;
1920
import io.vertx.core.impl.ContextInternal;
20-
import io.vertx.core.spi.metrics.ClientMetrics;
2121
import io.vertx.pgclient.PgConnectOptions;
2222
import io.vertx.pgclient.PgConnection;
2323
import io.vertx.pgclient.PgNotice;
2424
import io.vertx.pgclient.PgNotification;
2525
import io.vertx.pgclient.impl.codec.NoticeResponse;
2626
import io.vertx.pgclient.spi.PgDriver;
27+
import io.vertx.sqlclient.Query;
28+
import io.vertx.sqlclient.Row;
29+
import io.vertx.sqlclient.RowSet;
2730
import io.vertx.sqlclient.impl.Connection;
2831
import io.vertx.sqlclient.impl.Notification;
2932
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -113,6 +116,21 @@ public PgConnection noticeHandler(Handler<PgNotice> handler) {
113116
return this;
114117
}
115118

119+
@Override
120+
public Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from) {
121+
return null;
122+
}
123+
124+
@Override
125+
public Query<RowSet<Row>> copyTo(String sql) {
126+
return null;
127+
}
128+
129+
@Override
130+
public Future<Void> copyTo(String sql, Buffer to) {
131+
return Future.succeededFuture();
132+
}
133+
116134
@Override
117135
public int processId() {
118136
return conn.getProcessId();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.vertx.pgclient;
2+
3+
import io.vertx.ext.unit.Async;
4+
import io.vertx.ext.unit.TestContext;
5+
import org.junit.Test;
6+
7+
public class PgConnectionCopyTest extends PgConnectionTestBase {
8+
@Test
9+
public void testCopyToRows(TestContext ctx) {
10+
Async async = ctx.async();
11+
connector.accept(ctx.asyncAssertSuccess(conn -> {
12+
deleteFromTestTable(ctx, conn, () -> {
13+
insertIntoTestTable(ctx, conn, 10, () -> {
14+
PgConnection pgConn = (PgConnection) conn;
15+
pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)")
16+
.execute()
17+
.onComplete(ctx.asyncAssertSuccess(result -> {
18+
for (int i = 0; i < 2; i++) {
19+
ctx.assertEquals(1, result.rowCount());
20+
result = result.next();
21+
}
22+
async.complete();
23+
}));
24+
});
25+
});
26+
}));
27+
}
28+
}

0 commit comments

Comments
 (0)