Skip to content

Commit 08ab457

Browse files
committed
WIP - 0
Hacking around the API to make Future<Buffer> coming out
1 parent eb4abba commit 08ab457

File tree

6 files changed

+71
-7
lines changed

6 files changed

+71
-7
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
748748

749749
public void exportDataFromDb(Vertx vertx, PgConnection client) {
750750
String path = "path/to/file";
751-
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute()
751+
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
752752
.flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
753753
.onSuccess(res -> System.out.println("Data exported to " + path));
754754
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
131131
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
132132
* @return async result of bytes container data will be written to
133133
*/
134-
Query<Buffer> copyToBytes(String sql);
134+
Future<Buffer> copyToBytes(String sql);
135135

136136
/**
137137
* Send a request cancellation message to tell the server to cancel processing request in this connection.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.vertx.core.*;
2020
import io.vertx.core.buffer.Buffer;
2121
import io.vertx.core.impl.ContextInternal;
22+
import io.vertx.core.impl.future.PromiseInternal;
2223
import io.vertx.pgclient.PgConnectOptions;
2324
import io.vertx.pgclient.PgConnection;
2425
import io.vertx.pgclient.PgNotice;
@@ -29,10 +30,16 @@
2930
import io.vertx.sqlclient.Query;
3031
import io.vertx.sqlclient.Row;
3132
import io.vertx.sqlclient.RowSet;
33+
import io.vertx.sqlclient.SqlResult;
3234
import io.vertx.sqlclient.impl.Connection;
3335
import io.vertx.sqlclient.impl.Notification;
36+
import io.vertx.sqlclient.impl.QueryResultBuilder;
3437
import io.vertx.sqlclient.impl.SocketConnectionBase;
3538
import io.vertx.sqlclient.impl.SqlConnectionBase;
39+
import io.vertx.sqlclient.impl.SqlResultImpl;
40+
import io.vertx.sqlclient.impl.command.CommandBase;
41+
42+
import java.util.function.Function;
3643

3744
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
3845

@@ -120,8 +127,14 @@ public Query<RowSet<Row>> copyToRows(String sql) {
120127
}
121128

122129
@Override
123-
public Query<Buffer> copyToBytes(String sql) {
124-
return null;
130+
public Future<Buffer> copyToBytes(String sql) {
131+
Function<Buffer, SqlResultImpl<Buffer>> factory = null;
132+
PromiseInternal<SqlResult<Buffer>> promise = null;
133+
134+
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
135+
new QueryResultBuilder<>(factory, promise);
136+
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
137+
return this.schedule(context, cmd);
125138
}
126139

127140
@Override
@@ -150,4 +163,14 @@ public PgConnection cancelRequest(Handler<AsyncResult<Void>> handler) {
150163
}
151164
return this;
152165
}
166+
167+
private class CopyOutCommand extends CommandBase<Buffer> {
168+
private final String sql;
169+
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
170+
171+
CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
172+
this.sql = sql;
173+
this.resultHandler = resultHandler;
174+
}
175+
}
153176
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,13 @@ public class PgProtocolConstants {
7272
public static final byte MESSAGE_TYPE_SSL_YES = 'S';
7373
public static final byte MESSAGE_TYPE_SSL_NO = 'N';
7474

75-
// COPY-related
75+
/**
76+
* COPY-out messages.
77+
*
78+
* <p>Other messages which might appear in between CopyData:
79+
* <li>NoticeResponse
80+
* <li>ParameterStatus
81+
*/
7682
public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H';
7783
public static final byte MESSAGE_TYPE_COPY_DATA = 'd';
7884
public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c';

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.vertx.ext.unit.Async;
44
import io.vertx.ext.unit.TestContext;
5+
import org.junit.Ignore;
56
import org.junit.Test;
67

78
public class PgConnectionCopyTest extends PgConnectionTestBase {
@@ -32,6 +33,24 @@ public void testCopyToRows(TestContext ctx) {
3233
}));
3334
}
3435

36+
@Test
37+
@Ignore("For now it just hangs forever")
38+
public void testCopyToCsvBytes(TestContext ctx) {
39+
Async async = ctx.async();
40+
connector.accept(ctx.asyncAssertSuccess(conn -> {
41+
deleteFromTestTable(ctx, conn, () -> {
42+
insertIntoTestTable(ctx, conn, 10, () -> {
43+
PgConnection pgConn = (PgConnection) conn;
44+
pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
45+
.onComplete(ctx.asyncAssertSuccess(buffer -> {
46+
buffer.getBytes();
47+
async.complete();
48+
}));
49+
});
50+
});
51+
}));
52+
}
53+
3554
/**
3655
* Just a thingy to eavesdrop protocol interactions.
3756
*
@@ -54,9 +73,24 @@ public void testSimpleQuery(TestContext ctx) {
5473
Async async = ctx.async();
5574
connector.accept(ctx.asyncAssertSuccess(conn -> {
5675
conn
57-
.query("COPY world TO STDOUT (FORMAT csv)")
76+
.query("select 1")
77+
.execute()
78+
.onComplete(ctx.asyncAssertSuccess(result1 -> {
79+
ctx.assertEquals(1, result1.size());
80+
async.complete();
81+
}));
82+
}));
83+
}
84+
85+
@Test
86+
public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) {
87+
Async async = ctx.async();
88+
connector.accept(ctx.asyncAssertSuccess(conn -> {
89+
conn
90+
.query("copy fortune to stdout (format csv)")
5891
.execute()
5992
.onComplete(ctx.asyncAssertSuccess(result1 -> {
93+
// nothing comes out
6094
ctx.assertEquals(0, result1.size());
6195
async.complete();
6296
}));

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResu
3838
private Throwable failure;
3939
private boolean suspended;
4040

41-
QueryResultBuilder(Function<T, R> factory, PromiseInternal<L> handler) {
41+
// TODO: public-ing this shouldn't be needed
42+
public QueryResultBuilder(Function<T, R> factory, PromiseInternal<L> handler) {
4243
this.factory = factory;
4344
this.handler = handler;
4445
}

0 commit comments

Comments
 (0)