Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 1.2.3
* Tweaking schema validation to allow for UINT

# 1.2.2
* Adding a new property `tolerateStateMismatch` to allow for the connector to continue processing even if the state stored in ClickHouse does not match the current offset in Kafka

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (colTypeName.equalsIgnoreCase("UINT8") && dataTypeName.equals("INT8"))
continue;

if (colTypeName.equalsIgnoreCase("UINT16") && dataTypeName.equals("INT16"))
continue;

if (colTypeName.equalsIgnoreCase("UINT32") && dataTypeName.equals("INT32"))
continue;

if (colTypeName.equalsIgnoreCase("UINT64") && dataTypeName.equals("INT64"))
continue;

if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
continue;

Expand Down Expand Up @@ -686,7 +698,7 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
String database = first.getDatabase();

if (!validateDataSchema(table, first, false))
throw new RuntimeException();
throw new RuntimeException("Data schema validation failed.");
// Let's test first record
// Do we have all elements from the table inside the record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,4 +654,27 @@ public void schemaWithNestedTupleMapArrayAndVariant() {
);
}
}

@Test
public void unsignedIntegers() {
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);

String topic = createTopicName("unsigned-integers-table-test");
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (" +
"`off16` Int16," +
"`uint8` UInt8," +
"`uint16` UInt16," +
"`uint32` UInt32," +
"`uint64` UInt64" +
") Engine = MergeTree ORDER BY `off16`");
Collection<SinkRecord> sr = SchemaTestData.createUnsignedIntegers(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.LongStream;

public class SchemaTestData {
Expand Down Expand Up @@ -1075,4 +1076,41 @@ public static Collection<SinkRecord> createNestedType(String topic, int partitio
return result;
}

public static Collection<SinkRecord> createUnsignedIntegers(String topic, int partition) {
return createUnsignedIntegers(topic, partition, DEFAULT_TOTAL_RECORDS);
}
public static Collection<SinkRecord> createUnsignedIntegers(String topic, int partition, int totalRecords) {
Schema NESTED_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("uint8", Schema.OPTIONAL_INT8_SCHEMA)
.field("uint16", Schema.OPTIONAL_INT16_SCHEMA)
.field("uint32", Schema.OPTIONAL_INT32_SCHEMA)
.field("uint64", Schema.OPTIONAL_INT64_SCHEMA)
.build();

List<SinkRecord> array = new ArrayList<>();
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short) n)
.put("uint8", (byte) ThreadLocalRandom.current().nextInt(0, 127))
.put("uint16", (short) ThreadLocalRandom.current().nextInt(0, 32767))
.put("uint32", ThreadLocalRandom.current().nextInt(0, 2147483647))
.put("uint64", ThreadLocalRandom.current().nextLong(0, 2147483647));

SinkRecord sr = new SinkRecord(
topic,
partition,
null,
null, NESTED_SCHEMA,
value_struct,
n,
System.currentTimeMillis(),
TimestampType.CREATE_TIME
);

array.add(sr);
});

return array;
}
}
Loading