Skip to content

Commit 0b63471

Browse files
committed
Enable BlobCodec and ClobCodec for binary usage
[resolves #186]
1 parent a33414c commit 0b63471

File tree

3 files changed

+75
-11
lines changed

3 files changed

+75
-11
lines changed

src/main/java/io/r2dbc/postgresql/codec/BlobCodec.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ boolean doCanDecode(PostgresqlObjectId type, Format format) {
5757
Assert.requireNonNull(format, "format must not be null");
5858
Assert.requireNonNull(type, "type must not be null");
5959

60-
return FORMAT_TEXT == format && BYTEA == type;
60+
return BYTEA == type;
6161
}
6262

6363
@Override
6464
Blob doDecode(ByteBuf buffer, PostgresqlObjectId dataType, @Nullable Format format, @Nullable Class<? extends Blob> type) {
6565
Assert.requireNonNull(buffer, "byteBuf must not be null");
6666

67-
return new ByteABlob(buffer);
67+
return new ByteABlob(buffer, format);
6868
}
6969

7070
@Override
@@ -103,29 +103,37 @@ private static final class ByteABlob implements Blob {
103103

104104
private final ByteBuf byteBuf;
105105

106-
private ByteABlob(ByteBuf byteBuf) {
106+
private final Format format;
107+
108+
private ByteABlob(ByteBuf byteBuf, Format format) {
107109
this.byteBuf = byteBuf.retain();
110+
this.format = format;
108111
}
109112

110113
@Override
111114
public Mono<Void> discard() {
112-
return Mono.defer(() -> {
113-
this.byteBuf.release();
114-
return Mono.empty();
115+
return Mono.fromRunnable(() -> {
116+
if (this.byteBuf.refCnt() > 0) {
117+
this.byteBuf.release();
118+
}
115119
});
116120
}
117121

118122
@Override
119123
public Mono<ByteBuffer> stream() {
120-
return Mono.defer(() -> {
124+
return Mono.fromSupplier(() -> {
125+
if (this.format == Format.FORMAT_BINARY) {
126+
return this.byteBuf.nioBuffer();
127+
}
128+
121129
Matcher matcher = BLOB_PATTERN.matcher(ByteBufUtils.decode(this.byteBuf));
122130

123131
if (!matcher.find()) {
124-
return Mono.error(new IllegalStateException("ByteBuf does not contain BYTEA hex format"));
132+
throw new IllegalStateException("ByteBuf does not contain BYTEA hex format");
125133
}
126134

127-
return Mono.just(ByteBuffer.wrap(ByteBufUtil.decodeHexDump(matcher.group(1))));
128-
});
135+
return ByteBuffer.wrap(ByteBufUtil.decodeHexDump(matcher.group(1)));
136+
}).doAfterTerminate(this.byteBuf::release);
129137
}
130138
}
131139

src/main/java/io/r2dbc/postgresql/codec/ClobCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ boolean doCanDecode(PostgresqlObjectId type, Format format) {
5151
Assert.requireNonNull(format, "format must not be null");
5252
Assert.requireNonNull(type, "type must not be null");
5353

54-
return FORMAT_TEXT == format && TEXT == type;
54+
return TEXT == type;
5555
}
5656

5757
@Override

src/test/java/io/r2dbc/postgresql/AbstractCodecIntegrationTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import io.r2dbc.postgresql.api.PostgresqlResult;
2323
import io.r2dbc.postgresql.api.PostgresqlStatement;
2424
import io.r2dbc.postgresql.codec.Json;
25+
import io.r2dbc.spi.Blob;
26+
import io.r2dbc.spi.Clob;
2527
import io.r2dbc.spi.Connection;
2628
import org.assertj.core.data.Offset;
2729
import org.junit.jupiter.api.Test;
30+
import org.reactivestreams.Publisher;
2831
import org.springframework.util.StreamUtils;
32+
import reactor.core.publisher.Flux;
2933
import reactor.core.publisher.Mono;
3034
import reactor.test.StepVerifier;
3135

@@ -52,6 +56,7 @@
5256
import java.util.function.Consumer;
5357
import java.util.function.Function;
5458

59+
import static io.r2dbc.postgresql.util.TestByteBufAllocator.TEST;
5560
import static org.assertj.core.api.Assertions.assertThat;
5661

5762
abstract class AbstractCodecIntegrationTests extends AbstractIntegrationTests {
@@ -92,6 +97,57 @@ void charPrimitive() {
9297
testCodec(Character.class, 'a', "VARCHAR(1)");
9398
}
9499

100+
@Test
101+
void blob() {
102+
byte[] bytes = {1, 2, 3, 4};
103+
testCodec(Blob.class,
104+
new Blob() {
105+
106+
@Override
107+
public Publisher<Void> discard() {
108+
return Mono.empty();
109+
}
110+
111+
@Override
112+
public Publisher<ByteBuffer> stream() {
113+
return Mono.just(ByteBuffer.wrap(bytes));
114+
}
115+
},
116+
(actual, expected) -> Flux.zip(
117+
Flux.from(actual.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes),
118+
Flux.from(expected.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes)
119+
)
120+
.as(StepVerifier::create)
121+
.assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2()))
122+
.verifyComplete()
123+
, "BYTEA");
124+
}
125+
126+
@Test
127+
void clob() {
128+
testCodec(Clob.class,
129+
new Clob() {
130+
131+
@Override
132+
public Publisher<Void> discard() {
133+
return Mono.empty();
134+
}
135+
136+
@Override
137+
public Publisher<CharSequence> stream() {
138+
return Mono.just("test-value");
139+
}
140+
},
141+
(actual, expected) -> Flux.zip(
142+
Flux.from(actual.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString),
143+
Flux.from(expected.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString)
144+
)
145+
.as(StepVerifier::create)
146+
.assertNext(t -> assertThat(t.getT1()).isEqualToIgnoringWhitespace(t.getT2()))
147+
.verifyComplete()
148+
, "TEXT");
149+
}
150+
95151
@Test
96152
void date() {
97153
testCodec(Date.class, new Date(), "TIMESTAMP");

0 commit comments

Comments
 (0)