Skip to content

Commit 200a515

Browse files
committed
WIP - buffer accumulates csv text; Broken resultHandler
1 parent 0b46903 commit 200a515

File tree

7 files changed

+149
-35
lines changed

7 files changed

+149
-35
lines changed

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.vertx.core.Future;
2020
import io.vertx.core.Handler;
2121
import io.vertx.core.Promise;
22+
import io.netty.buffer.ByteBuf;
2223
import io.vertx.core.buffer.Buffer;
2324
import io.vertx.core.buffer.impl.BufferImpl;
2425
import io.vertx.core.impl.ContextInternal;
@@ -142,22 +143,8 @@ public Future<SqlResult<Buffer>> copyToBytes(String sql) {
142143
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
143144
new QueryResultBuilder<>(factory, promise);
144145

145-
Collector<Row, Buffer, Buffer> collector = Collector.of(
146-
BufferImpl::new,
147-
(v, row) -> {
148-
System.out.println(row);
149-
},
150-
(v1, v2) -> null,
151-
Function.identity()
152-
);
153-
154-
SimpleQueryCommand<Buffer> cmd = new SimpleQueryCommand<>(
155-
sql, true, false, collector, resultHandler);
156-
// this.schedule(promise.context(), cmd);
157-
158-
QueryExecutor executor = new QueryExecutor(factory, collector);
159-
executor.executeSimpleQuery(this, sql, false, false, promise);
160-
return promise.future();
146+
CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
147+
return this.schedule(promise.context(), cmd);
161148
}
162149

163150
@Override
Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,43 @@
11
package io.vertx.pgclient.impl.codec;
22

3+
import io.netty.buffer.ByteBuf;
34
import io.vertx.core.buffer.Buffer;
45
import io.vertx.sqlclient.SqlResult;
56
import io.vertx.sqlclient.impl.QueryResultBuilder;
67
import io.vertx.sqlclient.impl.SqlResultImpl;
78
import io.vertx.sqlclient.impl.command.CommandBase;
89

9-
public class CopyOutCommand extends CommandBase<Buffer> {
10+
import java.util.function.Function;
11+
import java.util.stream.Collector;
12+
13+
public class CopyOutCommand extends CommandBase<SqlResult<Buffer>> {
1014
private final String sql;
15+
private final Collector<ByteBuf, Buffer, Buffer> collector;
1116
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;
1217

13-
public CopyOutCommand(String sql, QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler) {
18+
public CopyOutCommand(
19+
String sql,
20+
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler
21+
) {
1422
this.sql = sql;
1523
this.resultHandler = resultHandler;
24+
this.collector = Collector.of(
25+
Buffer::buffer,
26+
(v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
27+
(v1, v2) -> null,
28+
Function.identity()
29+
);
30+
}
31+
32+
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler() {
33+
return resultHandler;
34+
}
35+
36+
String sql() {
37+
return sql;
38+
}
39+
40+
Collector<ByteBuf, Buffer, Buffer> collector() {
41+
return collector;
1642
}
1743
}
Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,42 @@
11
package io.vertx.pgclient.impl.codec;
22

3-
public class CopyOutCommandCodec {
3+
import io.vertx.core.buffer.Buffer;
4+
import io.vertx.core.buffer.impl.BufferImpl;
5+
import io.vertx.sqlclient.SqlResult;
6+
7+
class CopyOutCommandCodec extends PgCommandCodec<SqlResult<Buffer>, CopyOutCommand> {
8+
CopyOutDataDecoder decoder;
9+
10+
CopyOutCommandCodec(CopyOutCommand cmd) {
11+
super(cmd);
12+
decoder = new CopyOutDataDecoder(cmd.collector());
13+
}
14+
15+
@Override
16+
public void handleCommandComplete(int updated) {
17+
this.result = null;
18+
Buffer result;
19+
Throwable failure;
20+
int size;
21+
if (decoder != null) {
22+
failure = decoder.complete();
23+
result = decoder.result();
24+
size = decoder.size();
25+
decoder.reset();
26+
} else {
27+
failure = null;
28+
result = new BufferImpl();
29+
size = 0;
30+
}
31+
cmd.resultHandler().handleResult(updated, size, null, result, failure);
32+
}
33+
34+
@Override
35+
public void handleErrorResponse(ErrorResponse errorResponse) {
36+
failure = errorResponse.toException();
37+
}
38+
39+
void encode(PgEncoder encoder) {
40+
encoder.writeQuery(new Query(cmd.sql()));
41+
}
442
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.vertx.core.buffer.Buffer;
5+
import java.util.function.BiConsumer;
6+
import java.util.stream.Collector;
7+
8+
public class CopyOutDataDecoder {
9+
10+
private final Collector<ByteBuf, Buffer, Buffer> collector;
11+
private BiConsumer<Buffer, ByteBuf> accumulator;
12+
13+
private int size;
14+
private Buffer container;
15+
private Throwable failure;
16+
private Buffer result;
17+
18+
protected CopyOutDataDecoder(Collector<ByteBuf, Buffer, Buffer> collector) {
19+
this.collector = collector;
20+
reset();
21+
}
22+
23+
public int size() {
24+
return size;
25+
}
26+
27+
public void handleChunk(int len, ByteBuf in) {
28+
if (failure != null) {
29+
return;
30+
}
31+
if (accumulator == null) {
32+
try {
33+
accumulator = collector.accumulator();
34+
} catch (Exception e) {
35+
failure = e;
36+
return;
37+
}
38+
}
39+
try {
40+
accumulator.accept(container, in);
41+
} catch (Exception e) {
42+
failure = e;
43+
return;
44+
}
45+
size++;
46+
}
47+
48+
public Buffer result() {
49+
return result;
50+
}
51+
52+
public Throwable complete() {
53+
try {
54+
result = collector.finisher().apply(container);
55+
} catch (Exception e) {
56+
failure = e;
57+
}
58+
return failure;
59+
}
60+
61+
public void reset() {
62+
size = 0;
63+
failure = null;
64+
result = null;
65+
try {
66+
this.container = collector.supplier().get();
67+
} catch (Exception e) {
68+
failure = e;
69+
}
70+
}
71+
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,12 @@ private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
472472

473473
private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}
474474

475-
private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {}
475+
private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
476+
PgCommandCodec<?, ?> codec = inflight.peek();
477+
CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
478+
int len = in.readUnsignedShort();
479+
cmdCodec.decoder.handleChunk(len, in);
480+
}
476481

477482
private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
478483
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ void write(CommandBase<?> cmd) {
119119
return new ClosePortalCommandCodec((CloseCursorCommand) cmd);
120120
} else if (cmd instanceof CloseStatementCommand) {
121121
return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
122+
} else if (cmd instanceof CopyOutCommand) {
123+
return new CopyOutCommandCodec((CopyOutCommand) cmd);
122124
}
123125
throw new AssertionError();
124126
}

vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionCopyTest.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,4 @@ public void testSimpleQuery(TestContext ctx) {
8080
}));
8181
}));
8282
}
83-
84-
@Test
85-
public void testMakeSureCopyOutProtocolIsDefined(TestContext ctx) {
86-
Async async = ctx.async();
87-
connector.accept(ctx.asyncAssertSuccess(conn -> {
88-
conn
89-
.query("copy fortune to stdout (format csv)")
90-
.execute()
91-
.onComplete(ctx.asyncAssertSuccess(result1 -> {
92-
// nothing comes out
93-
ctx.assertEquals(0, result1.size());
94-
async.complete();
95-
}));
96-
}));
97-
}
9883
}

0 commit comments

Comments
 (0)