Skip to content

Commit 0410505

Browse files
committed
Fix for storing Buffer and ByteBuf into binary columns
1 parent c735c41 commit 0410505

File tree

12 files changed

+134
-40
lines changed

12 files changed

+134
-40
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2RowImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public UUID[] getUUIDArray(String name) {
299299
throw new UnsupportedOperationException();
300300
}
301301

302-
public Numeric getNumeric(int pos) {
302+
private Numeric getNumeric(int pos) {
303303
Object val = getValue(pos);
304304
if (val instanceof Numeric) {
305305
return (Numeric) val;

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18+
import io.vertx.core.buffer.Buffer;
1819
import io.vertx.db2client.impl.drda.ClientTypes;
1920
import io.vertx.db2client.impl.drda.ColumnMetaData;
2021
import io.vertx.sqlclient.data.Numeric;
@@ -64,6 +65,9 @@ private static boolean canConvert(Object val, int type) {
6465
case ClientTypes.REAL:
6566
case ClientTypes.SMALLINT:
6667
return clazz == Numeric.class;
68+
case ClientTypes.VARCHAR:
69+
case ClientTypes.VARBINARY:
70+
return Buffer.class.isAssignableFrom(clazz);
6771
}
6872
return false;
6973
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryCommandBaseCodec.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.stream.Collector;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import io.vertx.core.buffer.Buffer;
2223
import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance;
2324
import io.vertx.db2client.impl.drda.DRDAQueryRequest;
2425
import io.vertx.db2client.impl.drda.DRDAQueryResponse;
@@ -106,6 +107,8 @@ private static Object[] sanitize(Tuple params) {
106107
Object val = params.getValue(i);
107108
if (val instanceof Numeric)
108109
val = ((Numeric) val).bigDecimalValue();
110+
if (val instanceof Buffer)
111+
val = ((Buffer) val).getByteBuf();
109112
inputs[i] = val;
110113
}
111114
return inputs;

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/RowResultDecoder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,15 @@ public boolean next() {
5656
protected Row decodeRow(int len, ByteBuf in) {
5757
Row row = new DB2RowImpl(rowDesc);
5858
for (int i = 1; i < rowDesc.columnDefinitions().columns_ + 1; i++) {
59+
int startingIdx = cursor.dataBuffer_.readerIndex();
5960
Object o = cursor.getObject(i);
61+
int endingIdx = cursor.dataBuffer_.readerIndex();
62+
// TODO: Remove this once all getObject paths are implemented safely
63+
// or add unit tests for this in the DRDA project
64+
if (startingIdx != endingIdx) {
65+
System.out.println("WARN: Reader index changed while getting data. Changed from " +
66+
startingIdx + " to " + endingIdx + " while obtaining object " + o);
67+
}
6068
if (o instanceof BigDecimal) {
6169
o = Numeric.create((BigDecimal) o);
6270
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/drda/ClientTypes.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.sql.RowId;
2020
import java.sql.Types;
2121

22+
import io.netty.buffer.ByteBuf;
23+
2224
// This enumeration of types represents the typing scheme used by our jdbc driver.
2325
// Once this is finished, we need to review our switches to make sure they are exhaustive
2426
/**
@@ -276,9 +278,10 @@ public static boolean canConvert(Object value, int toType) {
276278
return clazz == java.time.LocalDateTime.class ||
277279
clazz == java.sql.Timestamp.class;
278280
case ClientTypes.VARBINARY:
279-
return clazz == byte[].class;
280281
case ClientTypes.VARCHAR:
281-
return clazz == char[].class;
282+
return clazz == char[].class ||
283+
clazz == byte[].class ||
284+
ByteBuf.class.isAssignableFrom(clazz);
282285
case ClientTypes.ROWID:
283286
return clazz == RowId.class ||
284287
clazz == DB2RowId.class;

vertx-db2-client/src/main/java/io/vertx/db2client/impl/drda/Cursor.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashMap;
3838

3939
import io.netty.buffer.ByteBuf;
40+
import io.netty.buffer.ByteBufUtil;
4041

4142
public class Cursor {
4243

@@ -65,6 +66,7 @@ public class Cursor {
6566
//-----------------------------internal state---------------------------------
6667

6768
//-------------Structures for holding and scrolling the data -----------------
69+
// TODO: Encapsulate this field
6870
public ByteBuf dataBuffer_;
6971
// public byte[] dataBuffer_;
7072
// public ByteArrayOutputStream dataBufferStream_ = new ByteArrayOutputStream();
@@ -452,15 +454,17 @@ private String get_VARCHAR(int column) {
452454
throw new IllegalStateException("SQLState.CHARACTER_CONVERTER_NOT_AVAILABLE");
453455
}
454456

455-
dataBuffer_.readerIndex(columnDataPosition_[column - 1] + 2);
456-
String tempString = dataBuffer_.readCharSequence(columnDataComputedLength_[column - 1] - 2, charset_[column - 1]).toString();
457+
int dataLength = columnDataComputedLength_[column - 1] - 2;
458+
if (maxFieldSize_ != 0 && maxFieldSize_ < dataLength)
459+
dataLength = maxFieldSize_;
460+
return dataBuffer_.getCharSequence(columnDataPosition_[column - 1] + 2,
461+
dataLength, charset_[column - 1]).toString();
457462
// String tempString = new String(dataBuffer_,
458463
// columnDataPosition_[column - 1] + 2,
459464
// columnDataComputedLength_[column - 1] - 2,
460465
// charset_[column - 1]);
461-
return (maxFieldSize_ == 0) ? tempString :
462-
tempString.substring(0, Math.min(maxFieldSize_,
463-
tempString.length()));
466+
// return (maxFieldSize_ == 0) ? tempString :
467+
// tempString.substring(0, Math.min(maxFieldSize_, tempString.length()));
464468
}
465469

466470
// Build a Java String from a database CHAR field.
@@ -478,22 +482,25 @@ private String get_CHAR(int column) {
478482
throw new IllegalStateException("SQLState.CHARACTER_CONVERTER_NOT_AVAILABLE");
479483
}
480484

481-
dataBuffer_.readerIndex(columnDataPosition_[column - 1]);
482-
String tempString = dataBuffer_.readCharSequence(columnDataComputedLength_[column - 1], charset_[column - 1]).toString();
485+
int dataLength = columnDataComputedLength_[column - 1];
486+
if (maxFieldSize_ != 0 && maxFieldSize_ < dataLength)
487+
dataLength = maxFieldSize_;
488+
return dataBuffer_.getCharSequence(columnDataPosition_[column - 1],
489+
dataLength, charset_[column - 1]).toString();
483490
// String tempString = new String(dataBuffer_,
484491
// columnDataPosition_[column - 1],
485492
// columnDataComputedLength_[column - 1],
486493
// charset_[column - 1]);
487-
return (maxFieldSize_ == 0) ? tempString :
488-
tempString.substring(0, Math.min(maxFieldSize_,
489-
tempString.length()));
494+
// return (maxFieldSize_ == 0) ? tempString :
495+
// tempString.substring(0, Math.min(maxFieldSize_,
496+
// tempString.length()));
490497
}
491498

492499
// Build a JDBC Date object from the ISO DATE field.
493500
private LocalDate get_DATE(int column) {
494-
dataBuffer_.readerIndex(columnDataPosition_[column - 1]);
495501
// DATE column is always 10 chars long
496-
String dateString = dataBuffer_.readCharSequence(10, charset_[column - 1]).toString();
502+
String dateString = dataBuffer_.getCharSequence(columnDataPosition_[column - 1],
503+
10, charset_[column - 1]).toString();
497504
return LocalDate.parse(dateString);
498505
// return DateTime.dateBytesToDate(dataBuffer_,
499506
// columnDataPosition_[column - 1],
@@ -503,9 +510,9 @@ private LocalDate get_DATE(int column) {
503510

504511
// Build a JDBC Time object from the ISO TIME field.
505512
private LocalTime get_TIME(int column) {
506-
dataBuffer_.readerIndex(columnDataPosition_[column - 1]);
507513
// Time column is always 8 chars long
508-
String timeString = dataBuffer_.readCharSequence(8, charset_[column - 1]).toString();
514+
String timeString = dataBuffer_.getCharSequence(columnDataPosition_[column - 1],
515+
8, charset_[column - 1]).toString();
509516
return LocalTime.parse(timeString, db2TimeFormat);
510517
// return DateTime.timeBytesToTime(dataBuffer_,
511518
// columnDataPosition_[column - 1],
@@ -583,7 +590,8 @@ private byte[] get_CHAR_FOR_BIT_DATA(int column) {
583590
Math.min(maxFieldSize_, columnDataComputedLength_[column - 1]);
584591

585592
byte[] bytes = new byte[columnLength];
586-
System.arraycopy(dataBuffer_, columnDataPosition_[column - 1], bytes, 0, columnLength);
593+
//System.arraycopy(dataBuffer_, columnDataPosition_[column - 1], bytes, 0, columnLength);
594+
dataBuffer_.getBytes(columnDataPosition_[column - 1], bytes);
587595
return bytes;
588596
}
589597

@@ -597,7 +605,8 @@ private byte[] get_VARCHAR_FOR_BIT_DATA(int column) {
597605
(maxFieldSize_ == 0) ? columnDataComputedLength_[column - 1] - 2 :
598606
Math.min(maxFieldSize_, columnDataComputedLength_[column - 1] - 2);
599607
bytes = new byte[columnLength];
600-
System.arraycopy(dataBuffer_, columnDataPosition_[column - 1] + 2, bytes, 0, bytes.length);
608+
// System.arraycopy(dataBuffer_, columnDataPosition_[column - 1] + 2, bytes, 0, bytes.length);
609+
dataBuffer_.getBytes(columnDataPosition_[column - 1] + 2, bytes);
601610
return bytes;
602611
}
603612

@@ -609,7 +618,8 @@ private Object get_UDT(int column) {
609618
(maxFieldSize_ == 0) ? columnDataComputedLength_[column - 1] - 2 :
610619
Math.min(maxFieldSize_, columnDataComputedLength_[column - 1] - 2);
611620
bytes = new byte[columnLength];
612-
System.arraycopy(dataBuffer_, columnDataPosition_[column - 1] + 2, bytes, 0, bytes.length);
621+
//System.arraycopy(dataBuffer_, columnDataPosition_[column - 1] + 2, bytes, 0, bytes.length);
622+
dataBuffer_.getBytes(columnDataPosition_[column - 1] + 2, bytes);
613623

614624
try {
615625
ByteArrayInputStream bais = new ByteArrayInputStream( bytes );
@@ -1738,11 +1748,8 @@ private boolean isNonTrivialDataLob(int index) {
17381748
byte[] lengthBytes = new byte[columnDataComputedLength_[index]];
17391749
byte[] longBytes = new byte[8];
17401750

1741-
System.arraycopy(dataBuffer_,
1742-
position,
1743-
lengthBytes,
1744-
0,
1745-
columnDataComputedLength_[index]);
1751+
// System.arraycopy(dataBuffer_, position, lengthBytes, 0, columnDataComputedLength_[index]);
1752+
dataBuffer_.getBytes(position, lengthBytes, 0, columnDataComputedLength_[index]);
17461753

17471754
// right-justify for BIG ENDIAN
17481755
int j = 0;

vertx-db2-client/src/main/java/io/vertx/db2client/impl/drda/DRDAQueryRequest.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,13 @@ private void buildFDODTA(int[][] protocolTypesAndLengths, Object[] inputs) {
767767
case DRDAConstants.DRDA_TYPE_NLONGVARBYTE:
768768
o = retrievePromotedParameterIfExists(i);
769769
if (o == null) {
770-
writeLDBytes((byte[]) inputs[i]);
770+
if (inputs[i] instanceof byte[]) {
771+
writeLDBytes((byte[]) inputs[i]);
772+
} else if (inputs[i] instanceof ByteBuf) {
773+
writeLDBytes((ByteBuf) inputs[i]);
774+
} else {
775+
throw new UnsupportedOperationException("Cannot write " + inputs[i].getClass() + " as VARBYTE/LONGVARBYTE");
776+
}
771777
} else { // use the promoted object instead
772778
throw new UnsupportedOperationException("CLOB");
773779
// setFDODTALob(netAgent_.netConnection_.getSecurityMechanism(), (ClientClob) o,
@@ -1193,11 +1199,17 @@ private Hashtable computeProtocolTypesAndLengths(
11931199
break;
11941200
case Types.BINARY:
11951201
case Types.VARBINARY:
1196-
byte[] ba = (byte[]) inputRow[i];
1197-
if (ba == null) {
1202+
// byte[] ba = (byte[]) inputRow[i];
1203+
int length = -1;
1204+
if (inputRow[i] instanceof byte[]) {
1205+
length = ((byte[]) inputRow[i]).length;
1206+
} else if (inputRow[i] instanceof ByteBuf) {
1207+
length = ((ByteBuf) inputRow[i]).readableBytes();
1208+
}
1209+
if (inputRow[i] == null) {
11981210
lidAndLengths[i][0] = DRDAConstants.DRDA_TYPE_NVARBYTE;
11991211
lidAndLengths[i][1] = 32767;
1200-
} else if (ba.length <= 32767) {
1212+
} else if (length <= 32767) {
12011213
lidAndLengths[i][0] = DRDAConstants.DRDA_TYPE_NVARBYTE;
12021214
lidAndLengths[i][1] = 32767;
12031215
} else {
@@ -1215,11 +1227,17 @@ private Hashtable computeProtocolTypesAndLengths(
12151227
}
12161228
break;
12171229
case Types.LONGVARBINARY:
1218-
ba = (byte[]) inputRow[i];
1219-
if (ba == null) {
1230+
// ba = (byte[]) inputRow[i];
1231+
length = -1;
1232+
if (inputRow[i] instanceof byte[]) {
1233+
length = ((byte[]) inputRow[i]).length;
1234+
} else if (inputRow[i] instanceof ByteBuf) {
1235+
length = ((ByteBuf) inputRow[i]).readableBytes();
1236+
}
1237+
if (inputRow[i] == null) {
12201238
lidAndLengths[i][0] = DRDAConstants.DRDA_TYPE_NLONGVARBYTE;
12211239
lidAndLengths[i][1] = 32767;
1222-
} else if (ba.length <= 32767) {
1240+
} else if (length <= 32767) {
12231241
lidAndLengths[i][0] = DRDAConstants.DRDA_TYPE_NLONGVARBYTE;
12241242
lidAndLengths[i][1] = 32767;
12251243
} else {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/drda/DRDAQueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1824,7 +1824,7 @@ final ByteBuf getData(/*ByteArrayOutputStream existingBuffer*/) {
18241824
// read the segment
18251825
ensureALayerDataInBuffer(copySize);
18261826
adjustLengths(copySize);
1827-
baos = buffer.readRetainedSlice(copySize);
1827+
baos = buffer.readRetainedSlice(copySize).asReadOnly();
18281828
// baos.write(buffer_, pos_, copySize);
18291829
// pos_ += copySize;
18301830

vertx-db2-client/src/main/java/io/vertx/db2client/impl/drda/DRDARequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,11 @@ final void writeLDBytes(byte[] bytes) {
537537
// writeLDBytesX(bytes.length, bytes);
538538
}
539539

540+
final void writeLDBytes(ByteBuf bytes) {
541+
buffer.writeShort(bytes.readableBytes());
542+
buffer.writeBytes(bytes, bytes.readerIndex(), bytes.readableBytes());
543+
}
544+
540545
// insert a 4 byte length/codepoint pair and a 1 byte unsigned value into the buffer.
541546
// total of 5 bytes inserted in buffer.
542547
final void writeScalar1Byte(int codePoint, int value) {

vertx-db2-client/src/test/java/io/vertx/db2client/DB2DataTypeTest.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import static org.junit.Assume.assumeTrue;
1919

2020
import java.sql.RowId;
21+
import java.util.Arrays;
2122

2223
import org.junit.Test;
2324
import org.junit.runner.RunWith;
2425

26+
import io.vertx.core.buffer.Buffer;
2527
import io.vertx.ext.unit.TestContext;
2628
import io.vertx.ext.unit.junit.VertxUnitRunner;
2729
import io.vertx.sqlclient.Row;
@@ -39,10 +41,9 @@ public class DB2DataTypeTest extends DB2TestBase {
3941
@Test
4042
public void testFloatIntoFloatColumn(TestContext ctx) {
4143
connect(ctx.asyncAssertSuccess(conn -> {
42-
// Insert some data
4344
conn.preparedQuery("INSERT INTO db2_types (id,test_float) VALUES (?, ?)")
4445
.execute(Tuple.of(1, 5.0f), ctx.asyncAssertSuccess(insertResult -> {
45-
conn.query("SELECT id,test_float FROM db2_types WHERE id = 1")
46+
conn.preparedQuery("SELECT id,test_float FROM db2_types WHERE id = 1")
4647
.execute(ctx.asyncAssertSuccess(rows -> {
4748
ctx.assertEquals(1, rows.size());
4849
Row row = rows.iterator().next();
@@ -61,10 +62,9 @@ public void testFloatIntoFloatColumn(TestContext ctx) {
6162
@Test
6263
public void testByteIntoSmallIntColumn(TestContext ctx) {
6364
connect(ctx.asyncAssertSuccess(conn -> {
64-
// Insert some data
6565
conn.preparedQuery("INSERT INTO db2_types (id,test_byte) VALUES (?, ?)")
6666
.execute(Tuple.of(2, (byte) 0xCA), ctx.asyncAssertSuccess(insertResult -> {
67-
conn.query("SELECT id,test_byte FROM db2_types WHERE id = 2")
67+
conn.preparedQuery("SELECT id,test_byte FROM db2_types WHERE id = 2")
6868
.execute(ctx.asyncAssertSuccess(rows -> {
6969
ctx.assertEquals(1, rows.size());
7070
Row row = rows.iterator().next();
@@ -75,17 +75,55 @@ public void testByteIntoSmallIntColumn(TestContext ctx) {
7575
}));
7676
}
7777

78+
@Test
79+
public void testByteArrayIntoVarchar(TestContext ctx) {
80+
byte[] expected = "hello world".getBytes();
81+
connect(ctx.asyncAssertSuccess(conn -> {
82+
conn.preparedQuery("INSERT INTO db2_types (id,test_bytes) VALUES (?, ?)")
83+
.execute(Tuple.of(3, "hello world".getBytes()), ctx.asyncAssertSuccess(insertResult -> {
84+
conn.preparedQuery("SELECT id,test_bytes FROM db2_types WHERE id = 3")
85+
.execute(ctx.asyncAssertSuccess(rows -> {
86+
ctx.assertEquals(1, rows.size());
87+
Row row = rows.iterator().next();
88+
ctx.assertEquals(3, row.getInteger(0));
89+
ctx.assertTrue(Arrays.equals(expected, row.getBuffer(1).getBytes()),
90+
"Expecting " + Arrays.toString(expected) + " but got "
91+
+ Arrays.toString(row.getBuffer(1).getBytes()));
92+
}));
93+
}));
94+
}));
95+
}
96+
97+
@Test
98+
public void testByteBufIntoVarchar(TestContext ctx) {
99+
byte[] expected = "hello world".getBytes();
100+
connect(ctx.asyncAssertSuccess(conn -> {
101+
conn.preparedQuery("INSERT INTO db2_types (id,test_bytes) VALUES (?, ?)")
102+
.execute(Tuple.of(4, Buffer.buffer(expected)), ctx.asyncAssertSuccess(insertResult -> {
103+
conn.preparedQuery("SELECT id,test_bytes FROM db2_types WHERE id = 4")
104+
.execute(ctx.asyncAssertSuccess(rows -> {
105+
ctx.assertEquals(1, rows.size());
106+
Row row = rows.iterator().next();
107+
ctx.assertEquals(4, row.getInteger(0));
108+
ctx.assertTrue(Arrays.equals(expected, row.getBuffer(1).getBytes()),
109+
"Expecting " + Arrays.toString(expected) + " but got "
110+
+ Arrays.toString(row.getBuffer(1).getBytes()));
111+
}));
112+
}));
113+
}));
114+
}
115+
78116
@Test
79117
public void testRowId(TestContext ctx) {
80118
assumeTrue("Only DB2/Z supports the ROWID column type", rule.isZOS());
81119

82120
final String msg = "insert data for testRowId";
83121
connect(ctx.asyncAssertSuccess(conn -> {
84122
// Insert some data
85-
conn.query("INSERT INTO ROWTEST (message) VALUES ('" + msg + "')")
123+
conn.preparedQuery("INSERT INTO ROWTEST (message) VALUES ('" + msg + "')")
86124
.execute(ctx.asyncAssertSuccess(insertResult -> {
87125
// Find it by msg
88-
conn.query("SELECT * FROM ROWTEST WHERE message = '" + msg + "'")
126+
conn.preparedQuery("SELECT * FROM ROWTEST WHERE message = '" + msg + "'")
89127
.execute(ctx.asyncAssertSuccess(rows -> {
90128
RowId rowId = verifyRowId(ctx, rows, msg);
91129
// Now find it by rowid

0 commit comments

Comments
 (0)