Skip to content

Commit 410e042

Browse files
committed
WIP - 0
Hacking around the API to make Future<Buffer> coming out
1 parent 6e380fa commit 410e042

File tree

6 files changed

+71
-7
lines changed

6 files changed

+71
-7
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
748748

749749
public void exportDataFromDb(Vertx vertx, PgConnection client) {
750750
String path = "path/to/file";
751-
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)").execute()
751+
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
752752
.flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
753753
.onSuccess(res -> System.out.println("Data exported to " + path));
754754
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
130130
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
131131
* @return async result of bytes container data will be written to
132132
*/
133-
Query<Buffer> copyToBytes(String sql);
133+
Future<Buffer> copyToBytes(String sql);
134134

135135
/**
136136
* Send a request cancellation message to tell the server to cancel processing request in this connection.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.Promise;
2222
import io.vertx.core.buffer.Buffer;
2323
import io.vertx.core.impl.ContextInternal;
24+
import io.vertx.core.impl.future.PromiseInternal;
2425
import io.vertx.pgclient.PgConnectOptions;
2526
import io.vertx.pgclient.PgConnection;
2627
import io.vertx.pgclient.PgNotice;
@@ -31,10 +32,16 @@
3132
import io.vertx.sqlclient.Query;
3233
import io.vertx.sqlclient.Row;
3334
import io.vertx.sqlclient.RowSet;
35+
import io.vertx.sqlclient.SqlResult;
3436
import io.vertx.sqlclient.impl.Connection;
3537
import io.vertx.sqlclient.impl.Notification;
38+
import io.vertx.sqlclient.impl.QueryResultBuilder;
3639
import io.vertx.sqlclient.impl.SocketConnectionBase;
3740
import io.vertx.sqlclient.impl.SqlConnectionBase;
41+
import io.vertx.sqlclient.impl.SqlResultImpl;
42+
import io.vertx.sqlclient.impl.command.CommandBase;
43+
44+
import java.util.function.Function;
3845

3946
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
4047

@@ -122,8 +129,14 @@ public Query<RowSet<Row>> copyToRows(String sql) {
122129
}
123130

124131
@Override
125-
public Query<Buffer> copyToBytes(String sql) {
126-
return null;
132+
public Future<Buffer> copyToBytes(String sql) {
133+
Function<Buffer, SqlResultImpl<Buffer>> factory = null;
134+
PromiseInternal<SqlResult<Buffer>> promise = null;
135+
136+
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
137+
new QueryResultBuilder<>(factory, promise);
138+
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
139+
return this.schedule(context, cmd);
127140
}
128141

129142
@Override
@@ -145,4 +158,14 @@ public Future<Void> cancelRequest() {
145158
});
146159
return promise.future();
147160
}
161+
162+
private class CopyOutCommand extends CommandBase<Buffer> {
163+
private final String sql;
164+
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
165+
166+
CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
167+
this.sql = sql;
168+
this.resultHandler = resultHandler;
169+
}
170+
}
148171
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgProtocolConstants.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,13 @@ public class PgProtocolConstants {
7272
public static final byte MESSAGE_TYPE_SSL_YES = 'S';
7373
public static final byte MESSAGE_TYPE_SSL_NO = 'N';
7474

75-
// COPY-related
75+
/**
76+
* COPY-out messages.
77+
*
78+
* <p>Other messages which might appear in between CopyData:
79+
* <li>NoticeResponse
80+
* <li>ParameterStatus
81+
*/
7682
public static final byte MESSAGE_TYPE_COPY_OUT_RESPONSE = 'H';
7783
public static final byte MESSAGE_TYPE_COPY_DATA = 'd';
7884
public static final byte MESSAGE_TYPE_COPY_COMPLETION = 'c';

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.vertx.ext.unit.Async;
44
import io.vertx.ext.unit.TestContext;
5+
import org.junit.Ignore;
56
import org.junit.Test;
67

78
public class PgConnectionCopyTest extends PgConnectionTestBase {
@@ -32,6 +33,24 @@ public void testCopyToRows(TestContext ctx) {
3233
}));
3334
}
3435

36+
@Test
37+
@Ignore("For now it just hangs forever")
38+
public void testCopyToCsvBytes(TestContext ctx) {
39+
Async async = ctx.async();
40+
connector.accept(ctx.asyncAssertSuccess(conn -> {
41+
deleteFromTestTable(ctx, conn, () -> {
42+
insertIntoTestTable(ctx, conn, 10, () -> {
43+
PgConnection pgConn = (PgConnection) conn;
44+
pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
45+
.onComplete(ctx.asyncAssertSuccess(buffer -> {
46+
buffer.getBytes();
47+
async.complete();
48+
}));
49+
});
50+
});
51+
}));
52+
}
53+
3554
/**
3655
* Just a thingy to eavesdrop protocol interactions.
3756
*
@@ -54,9 +73,24 @@ public void testSimpleQuery(TestContext ctx) {
5473
Async async = ctx.async();
5574
connector.accept(ctx.asyncAssertSuccess(conn -> {
5675
conn
57-
.query("COPY world TO STDOUT (FORMAT csv)")
76+
.query("select 1")
77+
.execute()
78+
.onComplete(ctx.asyncAssertSuccess(result1 -> {
79+
ctx.assertEquals(1, result1.size());
80+
async.complete();
81+
}));
82+
}));
83+
}
84+
85+
@Test
86+
public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) {
87+
Async async = ctx.async();
88+
connector.accept(ctx.asyncAssertSuccess(conn -> {
89+
conn
90+
.query("copy fortune to stdout (format csv)")
5891
.execute()
5992
.onComplete(ctx.asyncAssertSuccess(result1 -> {
93+
// nothing comes out
6094
ctx.assertEquals(0, result1.size());
6195
async.complete();
6296
}));

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResu
3838
private Throwable failure;
3939
private boolean suspended;
4040

41-
QueryResultBuilder(Function<T, R> factory, PromiseInternal<L> handler) {
41+
// TODO: public-ing this shouldn't be needed
42+
public QueryResultBuilder(Function<T, R> factory, PromiseInternal<L> handler) {
4243
this.factory = factory;
4344
this.handler = handler;
4445
}

0 commit comments

Comments
 (0)