Skip to content

Commit 96f03f0

Browse files
committed
Add skeleton for COPY feature API
1 parent 76c8336 commit 96f03f0

File tree

4 files changed

+98
-0
lines changed

4 files changed

+98
-0
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,16 @@ public void batchReturning(SqlClient client) {
733733
});
734734
}
735735

736+
public void copyFromStdinReturning(Vertx vertx, PgConnection client) {
737+
client.copyFrom(
738+
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
739+
vertx.fileSystem().readFile("path/to/file")
740+
).execute().onSuccess(res -> {
741+
Long rowsWritten = res.iterator().next().getLong("rowsWritten");
742+
System.out.println("rows affected: " + rowsWritten);
743+
});
744+
}
745+
736746
public void pgBouncer(PgConnectOptions connectOptions) {
737747
connectOptions.setUseLayer7Proxy(true);
738748
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,21 @@
1717

1818
package io.vertx.pgclient;
1919

20+
import io.vertx.core.buffer.Buffer;
2021
import io.vertx.codegen.annotations.Fluent;
2122
import io.vertx.codegen.annotations.VertxGen;
2223
import io.vertx.core.Future;
2324
import io.vertx.core.Handler;
2425
import io.vertx.core.Vertx;
2526
import io.vertx.core.impl.ContextInternal;
2627
import io.vertx.pgclient.impl.PgConnectionImpl;
28+
import io.vertx.sqlclient.Query;
29+
import io.vertx.sqlclient.Row;
30+
import io.vertx.sqlclient.RowSet;
2731
import io.vertx.sqlclient.SqlConnection;
2832

33+
import java.util.List;
34+
2935
/**
3036
* A connection to Postgres.
3137
* <P>
@@ -34,6 +40,7 @@
3440
* <ul>
3541
* <li>Notification</li>
3642
* <li>Request Cancellation</li>
43+
* <li>Copy from STDIN / to STDOUT</li>
3744
* </ul>
3845
* </P>
3946
*
@@ -92,6 +99,40 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
9299
@Fluent
93100
PgConnection noticeHandler(Handler<PgNotice> handler);
94101

102+
/**
103+
* Imports data into a database.
104+
*
105+
* <p>Use this method when importing opaque bytes, e.g. from a CSV file.
106+
*
107+
* <p>If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
108+
*
109+
* @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
110+
* @param from byte stream data will be fetched from
111+
* @return result set with single field {@code rowsWritten}
112+
*/
113+
Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from);
114+
115+
/**
116+
* Exports data from a database with decoding.
117+
*
118+
* {@code FORMAT} can only be {@code binary}.
119+
*
120+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
121+
* @return decoded records
122+
*/
123+
Query<RowSet<Row>> copyTo(String sql);
124+
125+
/**
126+
* Exports data from a database as-is, without decoding.
127+
*
128+
* <p>Use this method when exporting opaque bytes, e.g. to a CSV file.
129+
*
130+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
131+
* @param to bytes container data will be written to
132+
* @return async result
133+
*/
134+
Future<Void> copyTo(String sql, Buffer to);
135+
95136
/**
96137
* Send a request cancellation message to tell the server to cancel processing request in this connection.
97138
* <br>Note: Use this with caution because the cancellation signal may or may not have any effect.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.vertx.core.Future;
2020
import io.vertx.core.Handler;
2121
import io.vertx.core.Promise;
22+
import io.vertx.core.buffer.Buffer;
2223
import io.vertx.core.impl.ContextInternal;
2324
import io.vertx.pgclient.PgConnectOptions;
2425
import io.vertx.pgclient.PgConnection;
@@ -27,6 +28,9 @@
2728
import io.vertx.pgclient.impl.codec.NoticeResponse;
2829
import io.vertx.pgclient.impl.codec.TxFailedEvent;
2930
import io.vertx.pgclient.spi.PgDriver;
31+
import io.vertx.sqlclient.Query;
32+
import io.vertx.sqlclient.Row;
33+
import io.vertx.sqlclient.RowSet;
3034
import io.vertx.sqlclient.impl.Connection;
3135
import io.vertx.sqlclient.impl.Notification;
3236
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -107,6 +111,21 @@ public PgConnection noticeHandler(Handler<PgNotice> handler) {
107111
return this;
108112
}
109113

114+
@Override
115+
public Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from) {
116+
return null;
117+
}
118+
119+
@Override
120+
public Query<RowSet<Row>> copyTo(String sql) {
121+
return null;
122+
}
123+
124+
@Override
125+
public Future<Void> copyTo(String sql, Buffer to) {
126+
return Future.succeededFuture();
127+
}
128+
110129
@Override
111130
public int processId() {
112131
return conn.getProcessId();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.vertx.pgclient;
2+
3+
import io.vertx.ext.unit.Async;
4+
import io.vertx.ext.unit.TestContext;
5+
import org.junit.Test;
6+
7+
public class PgConnectionCopyTest extends PgConnectionTestBase {
8+
@Test
9+
public void testCopyToRows(TestContext ctx) {
10+
Async async = ctx.async();
11+
connector.accept(ctx.asyncAssertSuccess(conn -> {
12+
deleteFromTestTable(ctx, conn, () -> {
13+
insertIntoTestTable(ctx, conn, 10, () -> {
14+
PgConnection pgConn = (PgConnection) conn;
15+
pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)")
16+
.execute()
17+
.onComplete(ctx.asyncAssertSuccess(result -> {
18+
for (int i = 0; i < 2; i++) {
19+
ctx.assertEquals(1, result.rowCount());
20+
result = result.next();
21+
}
22+
async.complete();
23+
}));
24+
});
25+
});
26+
}));
27+
}
28+
}

0 commit comments

Comments
 (0)