Skip to content

Commit ccdf631

Browse files
committed
WIP: Stuck with copyToBytes
1 parent 08ab457 commit ccdf631

File tree

7 files changed

+57
-24
lines changed

7 files changed

+57
-24
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
749749
public void exportDataFromDb(Vertx vertx, PgConnection client) {
750750
String path = "path/to/file";
751751
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
752-
.flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
752+
.flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value()))
753753
.onSuccess(res -> System.out.println("Data exported to " + path));
754754
}
755755

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.vertx.sqlclient.Row;
3131
import io.vertx.sqlclient.RowSet;
3232
import io.vertx.sqlclient.SqlConnection;
33+
import io.vertx.sqlclient.SqlResult;
3334

3435
import java.util.List;
3536

@@ -131,7 +132,7 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
131132
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
132133
* @return async result of bytes container data will be written to
133134
*/
134-
Future<Buffer> copyToBytes(String sql);
135+
Future<SqlResult<Buffer>> copyToBytes(String sql);
135136

136137
/**
137138
* Send a request cancellation message to tell the server to cancel processing request in this connection.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
import io.vertx.core.*;
2020
import io.vertx.core.buffer.Buffer;
21+
import io.vertx.core.buffer.impl.BufferImpl;
2122
import io.vertx.core.impl.ContextInternal;
2223
import io.vertx.core.impl.future.PromiseInternal;
2324
import io.vertx.pgclient.PgConnectOptions;
2425
import io.vertx.pgclient.PgConnection;
2526
import io.vertx.pgclient.PgNotice;
2627
import io.vertx.pgclient.PgNotification;
28+
import io.vertx.pgclient.impl.codec.CopyOutCommand;
2729
import io.vertx.pgclient.impl.codec.NoticeResponse;
2830
import io.vertx.pgclient.impl.codec.TxFailedEvent;
2931
import io.vertx.pgclient.spi.PgDriver;
@@ -33,13 +35,17 @@
3335
import io.vertx.sqlclient.SqlResult;
3436
import io.vertx.sqlclient.impl.Connection;
3537
import io.vertx.sqlclient.impl.Notification;
38+
import io.vertx.sqlclient.impl.QueryExecutor;
3639
import io.vertx.sqlclient.impl.QueryResultBuilder;
40+
import io.vertx.sqlclient.impl.QueryResultHandler;
3741
import io.vertx.sqlclient.impl.SocketConnectionBase;
3842
import io.vertx.sqlclient.impl.SqlConnectionBase;
3943
import io.vertx.sqlclient.impl.SqlResultImpl;
40-
import io.vertx.sqlclient.impl.command.CommandBase;
44+
import io.vertx.sqlclient.impl.command.QueryCommandBase;
45+
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
4146

4247
import java.util.function.Function;
48+
import java.util.stream.Collector;
4349

4450
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
4551

@@ -127,14 +133,29 @@ public Query<RowSet<Row>> copyToRows(String sql) {
127133
}
128134

129135
@Override
130-
public Future<Buffer> copyToBytes(String sql) {
131-
Function<Buffer, SqlResultImpl<Buffer>> factory = null;
132-
PromiseInternal<SqlResult<Buffer>> promise = null;
136+
public Future<SqlResult<Buffer>> copyToBytes(String sql) {
137+
Function<Buffer, SqlResultImpl<Buffer>> factory = SqlResultImpl::new;
138+
PromiseInternal<SqlResult<Buffer>> promise = context.promise();
133139

134140
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
135141
new QueryResultBuilder<>(factory, promise);
136-
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
137-
return this.schedule(context, cmd);
142+
143+
Collector<Row, Buffer, Buffer> collector = Collector.of(
144+
BufferImpl::new,
145+
(v, row) -> {
146+
System.out.println(row);
147+
},
148+
(v1, v2) -> null,
149+
Function.identity()
150+
);
151+
152+
SimpleQueryCommand<Buffer> cmd = new SimpleQueryCommand<>(
153+
sql, true, false, collector, resultHandler);
154+
// this.schedule(promise.context(), cmd);
155+
156+
QueryExecutor executor = new QueryExecutor(factory, collector);
157+
executor.executeSimpleQuery(this, sql, false, false, promise);
158+
return promise.future();
138159
}
139160

140161
@Override
@@ -164,13 +185,4 @@ public PgConnection cancelRequest(Handler<AsyncResult<Void>> handler) {
164185
return this;
165186
}
166187

167-
private class CopyOutCommand extends CommandBase<Buffer> {
168-
private final String sql;
169-
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
170-
171-
CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
172-
this.sql = sql;
173-
this.resultHandler = resultHandler;
174-
}
175-
}
176188
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
import io.vertx.sqlclient.SqlResult;
5+
import io.vertx.sqlclient.impl.QueryResultBuilder;
6+
import io.vertx.sqlclient.impl.SqlResultImpl;
7+
import io.vertx.sqlclient.impl.command.CommandBase;
8+
9+
public class CopyOutCommand extends CommandBase<Buffer> {
10+
private final String sql;
11+
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
12+
13+
public CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
14+
this.sql = sql;
15+
this.resultHandler = resultHandler;
16+
}
17+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
public class CopyOutCommandCodec {
4+
}

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,15 @@ public void testCopyToRows(TestContext ctx) {
3434
}
3535

3636
@Test
37-
@Ignore("For now it just hangs forever")
3837
public void testCopyToCsvBytes(TestContext ctx) {
3938
Async async = ctx.async();
4039
connector.accept(ctx.asyncAssertSuccess(conn -> {
4140
deleteFromTestTable(ctx, conn, () -> {
42-
insertIntoTestTable(ctx, conn, 10, () -> {
41+
insertIntoTestTable(ctx, conn, 1, () -> {
4342
PgConnection pgConn = (PgConnection) conn;
44-
pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
45-
.onComplete(ctx.asyncAssertSuccess(buffer -> {
46-
buffer.getBytes();
43+
pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)")
44+
.onComplete(ctx.asyncAssertSuccess(result -> {
45+
result.value().getBytes();
4746
async.complete();
4847
}));
4948
});

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/**
3535
* Executes query.
3636
*/
37-
class QueryExecutor<T, R extends SqlResultBase<T>, L extends SqlResult<T>> {
37+
public class QueryExecutor<T, R extends SqlResultBase<T>, L extends SqlResult<T>> {
3838

3939
private final Function<T, R> factory;
4040
private final Collector<Row, ?, T> collector;
@@ -49,7 +49,7 @@ private QueryResultBuilder<T, R, L> createHandler(PromiseInternal<L> promise) {
4949
return new QueryResultBuilder<>(factory, promise);
5050
}
5151

52-
void executeSimpleQuery(CommandScheduler scheduler,
52+
public void executeSimpleQuery(CommandScheduler scheduler,
5353
String sql,
5454
boolean autoCommit,
5555
boolean singleton,

0 commit comments

Comments
 (0)