Skip to content

Commit 0b46903

Browse files
committed
WIP: Stuck with copyToBytes
1 parent 410e042 commit 0b46903

File tree

7 files changed

+57
-24
lines changed

7 files changed

+57
-24
lines changed

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ public void importDataToDb(Vertx vertx, PgConnection client) {
749749
public void exportDataFromDb(Vertx vertx, PgConnection client) {
750750
String path = "path/to/file";
751751
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
752-
.flatMap(buffer -> vertx.fileSystem().writeFile("path/to/file.csv", buffer))
752+
.flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value()))
753753
.onSuccess(res -> System.out.println("Data exported to " + path));
754754
}
755755

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.vertx.sqlclient.Row;
3030
import io.vertx.sqlclient.RowSet;
3131
import io.vertx.sqlclient.SqlConnection;
32+
import io.vertx.sqlclient.SqlResult;
3233

3334
import java.util.List;
3435

@@ -130,7 +131,7 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
130131
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
131132
* @return async result of bytes container data will be written to
132133
*/
133-
Future<Buffer> copyToBytes(String sql);
134+
Future<SqlResult<Buffer>> copyToBytes(String sql);
134135

135136
/**
136137
* Send a request cancellation message to tell the server to cancel processing request in this connection.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import io.vertx.core.Handler;
2121
import io.vertx.core.Promise;
2222
import io.vertx.core.buffer.Buffer;
23+
import io.vertx.core.buffer.impl.BufferImpl;
2324
import io.vertx.core.impl.ContextInternal;
2425
import io.vertx.core.impl.future.PromiseInternal;
2526
import io.vertx.pgclient.PgConnectOptions;
2627
import io.vertx.pgclient.PgConnection;
2728
import io.vertx.pgclient.PgNotice;
2829
import io.vertx.pgclient.PgNotification;
30+
import io.vertx.pgclient.impl.codec.CopyOutCommand;
2931
import io.vertx.pgclient.impl.codec.NoticeResponse;
3032
import io.vertx.pgclient.impl.codec.TxFailedEvent;
3133
import io.vertx.pgclient.spi.PgDriver;
@@ -35,13 +37,17 @@
3537
import io.vertx.sqlclient.SqlResult;
3638
import io.vertx.sqlclient.impl.Connection;
3739
import io.vertx.sqlclient.impl.Notification;
40+
import io.vertx.sqlclient.impl.QueryExecutor;
3841
import io.vertx.sqlclient.impl.QueryResultBuilder;
42+
import io.vertx.sqlclient.impl.QueryResultHandler;
3943
import io.vertx.sqlclient.impl.SocketConnectionBase;
4044
import io.vertx.sqlclient.impl.SqlConnectionBase;
4145
import io.vertx.sqlclient.impl.SqlResultImpl;
42-
import io.vertx.sqlclient.impl.command.CommandBase;
46+
import io.vertx.sqlclient.impl.command.QueryCommandBase;
47+
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
4348

4449
import java.util.function.Function;
50+
import java.util.stream.Collector;
4551

4652
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
4753

@@ -129,14 +135,29 @@ public Query<RowSet<Row>> copyToRows(String sql) {
129135
}
130136

131137
@Override
132-
public Future<Buffer> copyToBytes(String sql) {
133-
Function<Buffer, SqlResultImpl<Buffer>> factory = null;
134-
PromiseInternal<SqlResult<Buffer>> promise = null;
138+
public Future<SqlResult<Buffer>> copyToBytes(String sql) {
139+
Function<Buffer, SqlResultImpl<Buffer>> factory = SqlResultImpl::new;
140+
PromiseInternal<SqlResult<Buffer>> promise = context.promise();
135141

136142
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
137143
new QueryResultBuilder<>(factory, promise);
138-
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
139-
return this.schedule(context, cmd);
144+
145+
Collector<Row, Buffer, Buffer> collector = Collector.of(
146+
BufferImpl::new,
147+
(v, row) -> {
148+
System.out.println(row);
149+
},
150+
(v1, v2) -> null,
151+
Function.identity()
152+
);
153+
154+
SimpleQueryCommand<Buffer> cmd = new SimpleQueryCommand<>(
155+
sql, true, false, collector, resultHandler);
156+
// this.schedule(promise.context(), cmd);
157+
158+
QueryExecutor executor = new QueryExecutor(factory, collector);
159+
executor.executeSimpleQuery(this, sql, false, false, promise);
160+
return promise.future();
140161
}
141162

142163
@Override
@@ -159,13 +180,4 @@ public Future<Void> cancelRequest() {
159180
return promise.future();
160181
}
161182

162-
private class CopyOutCommand extends CommandBase<Buffer> {
163-
private final String sql;
164-
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
165-
166-
CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
167-
this.sql = sql;
168-
this.resultHandler = resultHandler;
169-
}
170-
}
171183
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
import io.vertx.sqlclient.SqlResult;
5+
import io.vertx.sqlclient.impl.QueryResultBuilder;
6+
import io.vertx.sqlclient.impl.SqlResultImpl;
7+
import io.vertx.sqlclient.impl.command.CommandBase;
8+
9+
public class CopyOutCommand extends CommandBase<Buffer> {
10+
private final String sql;
11+
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
12+
13+
public CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
14+
this.sql = sql;
15+
this.resultHandler = resultHandler;
16+
}
17+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
public class CopyOutCommandCodec {
4+
}

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,15 @@ public void testCopyToRows(TestContext ctx) {
3434
}
3535

3636
@Test
37-
@Ignore("For now it just hangs forever")
3837
public void testCopyToCsvBytes(TestContext ctx) {
3938
Async async = ctx.async();
4039
connector.accept(ctx.asyncAssertSuccess(conn -> {
4140
deleteFromTestTable(ctx, conn, () -> {
42-
insertIntoTestTable(ctx, conn, 10, () -> {
41+
insertIntoTestTable(ctx, conn, 1, () -> {
4342
PgConnection pgConn = (PgConnection) conn;
44-
pgConn.copyToBytes("COPY my_table TO STDOUT (FORMAT csv HEADER)")
45-
.onComplete(ctx.asyncAssertSuccess(buffer -> {
46-
buffer.getBytes();
43+
pgConn.copyToBytes("COPY Test TO STDOUT (FORMAT csv)")
44+
.onComplete(ctx.asyncAssertSuccess(result -> {
45+
result.value().getBytes();
4746
async.complete();
4847
}));
4948
});

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/**
3535
* Executes query.
3636
*/
37-
class QueryExecutor<T, R extends SqlResultBase<T>, L extends SqlResult<T>> {
37+
public class QueryExecutor<T, R extends SqlResultBase<T>, L extends SqlResult<T>> {
3838

3939
private final Function<T, R> factory;
4040
private final Collector<Row, ?, T> collector;
@@ -49,7 +49,7 @@ private QueryResultBuilder<T, R, L> createHandler(PromiseInternal<L> promise) {
4949
return new QueryResultBuilder<>(factory, promise);
5050
}
5151

52-
void executeSimpleQuery(CommandScheduler scheduler,
52+
public void executeSimpleQuery(CommandScheduler scheduler,
5353
String sql,
5454
boolean autoCommit,
5555
boolean singleton,

0 commit comments

Comments
 (0)