Skip to content

Commit 85a8ea4

Browse files
committed
Add skeleton for COPY feature API
1 parent d1cfb6c commit 85a8ea4

File tree

4 files changed

+98
-0
lines changed

4 files changed

+98
-0
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,16 @@ public void batchReturning(SqlClient client) {
733733
});
734734
}
735735

736+
public void copyFromStdinReturning(Vertx vertx, PgConnection client) {
737+
client.copyFrom(
738+
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
739+
vertx.fileSystem().readFile("path/to/file")
740+
).execute().onSuccess(res -> {
741+
Long rowsWritten = res.iterator().next().getLong("rowsWritten");
742+
System.out.println("rows affected: " + rowsWritten);
743+
});
744+
}
745+
736746
public void pgBouncer(PgConnectOptions connectOptions) {
737747
connectOptions.setUseLayer7Proxy(true);
738748
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.pgclient;
1919

20+
import io.vertx.core.buffer.Buffer;
2021
import io.vertx.codegen.annotations.Fluent;
2122
import io.vertx.codegen.annotations.VertxGen;
2223
import io.vertx.core.AsyncResult;
@@ -25,8 +26,13 @@
2526
import io.vertx.core.Vertx;
2627
import io.vertx.core.impl.ContextInternal;
2728
import io.vertx.pgclient.impl.PgConnectionImpl;
29+
import io.vertx.sqlclient.Query;
30+
import io.vertx.sqlclient.Row;
31+
import io.vertx.sqlclient.RowSet;
2832
import io.vertx.sqlclient.SqlConnection;
2933

34+
import java.util.List;
35+
3036
/**
3137
* A connection to Postgres.
3238
* <P>
@@ -35,6 +41,7 @@
3541
* <ul>
3642
* <li>Notification</li>
3743
* <li>Request Cancellation</li>
44+
* <li>Copy from STDIN / to STDOUT</li>
3845
* </ul>
3946
* </P>
4047
*
@@ -93,6 +100,40 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
93100
@Fluent
94101
PgConnection noticeHandler(Handler<PgNotice> handler);
95102

103+
/**
104+
* Imports data into a database.
105+
*
106+
* <p>Use this method when importing opaque bytes, e.g. from a CSV file.
107+
*
108+
* <p>If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
109+
*
110+
* @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
111+
* @param from byte stream data will be fetched from
112+
* @return result set with single field {@code rowsWritten}
113+
*/
114+
Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from);
115+
116+
/**
117+
* Exports data from a database with decoding.
118+
*
119+
* {@code FORMAT} can only be {@code binary}.
120+
*
121+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
122+
* @return decoded records
123+
*/
124+
Query<RowSet<Row>> copyTo(String sql);
125+
126+
/**
127+
* Exports data from a database as-is, without decoding.
128+
*
129+
* <p>Use this method when exporting opaque bytes, e.g. to a CSV file.
130+
*
131+
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
132+
* @param to bytes container data will be written to
133+
* @return async result
134+
*/
135+
Future<Void> copyTo(String sql, Buffer to);
136+
96137
/**
97138
* Send a request cancellation message to tell the server to cancel processing request in this connection.
98139
* <br>Note: Use this with caution because the cancellation signal may or may not have any effect.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.vertx.pgclient.impl;
1818

1919
import io.vertx.core.*;
20+
import io.vertx.core.buffer.Buffer;
2021
import io.vertx.core.impl.ContextInternal;
2122
import io.vertx.pgclient.PgConnectOptions;
2223
import io.vertx.pgclient.PgConnection;
@@ -25,6 +26,9 @@
2526
import io.vertx.pgclient.impl.codec.NoticeResponse;
2627
import io.vertx.pgclient.impl.codec.TxFailedEvent;
2728
import io.vertx.pgclient.spi.PgDriver;
29+
import io.vertx.sqlclient.Query;
30+
import io.vertx.sqlclient.Row;
31+
import io.vertx.sqlclient.RowSet;
2832
import io.vertx.sqlclient.impl.Connection;
2933
import io.vertx.sqlclient.impl.Notification;
3034
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -105,6 +109,21 @@ public PgConnection noticeHandler(Handler<PgNotice> handler) {
105109
return this;
106110
}
107111

112+
@Override
113+
public Query<RowSet<Row>> copyFrom(String sql, Future<Buffer> from) {
114+
return null;
115+
}
116+
117+
@Override
118+
public Query<RowSet<Row>> copyTo(String sql) {
119+
return null;
120+
}
121+
122+
@Override
123+
public Future<Void> copyTo(String sql, Buffer to) {
124+
return Future.succeededFuture();
125+
}
126+
108127
@Override
109128
public int processId() {
110129
return conn.getProcessId();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.vertx.pgclient;
2+
3+
import io.vertx.ext.unit.Async;
4+
import io.vertx.ext.unit.TestContext;
5+
import org.junit.Test;
6+
7+
public class PgConnectionCopyTest extends PgConnectionTestBase {
8+
@Test
9+
public void testCopyToRows(TestContext ctx) {
10+
Async async = ctx.async();
11+
connector.accept(ctx.asyncAssertSuccess(conn -> {
12+
deleteFromTestTable(ctx, conn, () -> {
13+
insertIntoTestTable(ctx, conn, 10, () -> {
14+
PgConnection pgConn = (PgConnection) conn;
15+
pgConn.copyTo("COPY my_table TO STDOUT (FORMAT binary)")
16+
.execute()
17+
.onComplete(ctx.asyncAssertSuccess(result -> {
18+
for (int i = 0; i < 2; i++) {
19+
ctx.assertEquals(1, result.rowCount());
20+
result = result.next();
21+
}
22+
async.complete();
23+
}));
24+
});
25+
});
26+
}));
27+
}
28+
}

0 commit comments

Comments
 (0)