Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ dependencies {
testImplementation("com.clickhouse:clickhouse-client:${project.extra["clickHouseDriverVersion"]}")
testImplementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
testImplementation("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}")

testImplementation("org.slf4j:slf4j-simple:2.0.17")

// // Schema Registry client for testing
testImplementation("io.confluent:kafka-schema-registry-client:${project.extra["kafkaPlatformSchemaRegistry"]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,27 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
case "Enum8":
case "Enum16":
break;//I notice we just break here, rather than actually validate the type
case "STRING": {
if (dataTypeName.equals("BYTES")) {
continue;
} else if (obj.getFieldType().equals(Schema.Type.STRUCT)) {
for (Field field : objSchema.fields()) {
if (!(field.schema().type().equals(Schema.Type.STRING) || field.schema().type().equals(Schema.Type.BYTES))) {
validSchema = false;
break;
}
}
if (!validSchema) {
LOGGER.error(String.format("Cannot write field of union schema '%s' to column [%s] of `String`: only unions of `string` and `bytes` are allowed in this case",
objSchema.schema().fields(), colName));
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is calling objSchema.schema().fields() but objSchema is already a Schema object, so this should be objSchema.fields() to avoid a potential NullPointerException or incorrect field access.

Suggested change
objSchema.schema().fields(), colName));
objSchema.fields(), colName));

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if it is primitive type it will have empty field.

}
}
break;
}
default:
if (!colTypeName.equals(dataTypeName)) {
LOGGER.debug("Data schema name: {}", objSchema.name());

if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))
continue;

if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

Expand Down Expand Up @@ -632,6 +646,20 @@ protected void doWritePrimitive(Type columnType, Schema.Type dataType, OutputStr
case STRING:
if (Schema.Type.BYTES.equals(dataType)) {
BinaryStreamUtils.writeString(stream, (byte[]) value);
} else if (Schema.Type.STRUCT.equals(dataType)) {
Map<String, Data> map = (Map<String, Data>) value;
for (Data unionData : map.values()) {
if (unionData != null && unionData.getObject() != null) {
if (unionData.getObject() instanceof String) {
BinaryStreamUtils.writeString(stream, ((String) unionData.getObject()).getBytes(StandardCharsets.UTF_8));
} else if (unionData.getObject() instanceof byte[]) {
BinaryStreamUtils.writeString(stream, (byte[]) unionData.getObject());
} else {
throw new DataException("Not implemented conversion from " + unionData.getObject().getClass() + " to String");
}
break;
}
}
} else {
BinaryStreamUtils.writeString(stream, ((String) value).getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(FromVersionConditionExtension.class)
Expand Down Expand Up @@ -1118,8 +1119,9 @@ public void testAvroWithUnion() throws Exception {
Image image2 = Image.newBuilder()
.setName("image2")
.setContent(ByteBuffer.wrap("content2".getBytes()))
.setDescription("description")
.build();
String topic = createTopicName("test");
String topic = createTopicName("test_avro_union_string");

MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
// Register your test schema
Expand All @@ -1144,5 +1146,25 @@ public void testAvroWithUnion() throws Exception {
new SinkRecord(topic, 0, null, null,
image2ConnectData.schema(), image2ConnectData.value(), 1)
);
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic,
"CREATE TABLE `%s` (`name` String, `content` String, `description` Nullable(String)) Engine = MergeTree ORDER BY ()");

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(records);
chst.stop();

List<JSONObject> rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic);
JSONObject row = rows.get(0);
assertEquals("image1", row.getString("name"));
assertEquals("content", row.getString("content"));
assertTrue(row.isNull("description"));
row = rows.get(1);
assertEquals("image2", row.getString("name"));
assertEquals("description", row.getString("description"));
assertEquals("content2", row.getString("content"));
}
}
1 change: 1 addition & 0 deletions src/testFixtures/avro/test.idl
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ schema Image;
record Image {
string name;
union {string, bytes} content;
union {null, string, bytes} description = null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.clickhouse.kafka.connect.avro.test;

import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
Expand All @@ -13,19 +14,17 @@

@org.apache.avro.specific.AvroGenerated
public class Image extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 2747036010697240683L;


public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Image\",\"namespace\":\"com.clickhouse.kafka.connect.avro.test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"string\",\"bytes\"]}]}");
private static final long serialVersionUID = -4478037929845636151L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Image\",\"namespace\":\"com.clickhouse.kafka.connect.avro.test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"string\",\"bytes\"]},{\"name\":\"description\",\"type\":[\"null\",\"string\",\"bytes\"],\"default\":null}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

private static final SpecificData MODEL$ = new SpecificData();
private static SpecificData MODEL$ = new SpecificData();

private static final BinaryMessageEncoder<Image> ENCODER =
new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
new BinaryMessageEncoder<Image>(MODEL$, SCHEMA$);

private static final BinaryMessageDecoder<Image> DECODER =
new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
new BinaryMessageDecoder<Image>(MODEL$, SCHEMA$);

/**
* Return the BinaryMessageEncoder instance used by this class.
Expand All @@ -49,7 +48,7 @@ public static BinaryMessageDecoder<Image> getDecoder() {
* @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore
*/
public static BinaryMessageDecoder<Image> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
return new BinaryMessageDecoder<Image>(MODEL$, SCHEMA$, resolver);
}

/**
Expand All @@ -72,8 +71,9 @@ public static Image fromByteBuffer(
return DECODER.decode(b);
}

private java.lang.CharSequence name;
private java.lang.Object content;
private java.lang.CharSequence name;
private java.lang.Object content;
private java.lang.Object description;

/**
* Default constructor. Note that this does not initialize fields
Expand All @@ -86,36 +86,34 @@ public Image() {}
* All-args constructor.
* @param name The new value for name
* @param content The new value for content
* @param description The new value for description
*/
public Image(java.lang.CharSequence name, java.lang.Object content) {
public Image(java.lang.CharSequence name, java.lang.Object content, java.lang.Object description) {
this.name = name;
this.content = content;
this.description = description;
}

@Override
public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }

@Override
public org.apache.avro.Schema getSchema() { return SCHEMA$; }

// Used by DatumWriter. Applications should not call.
@Override
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return name;
case 1: return content;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
case 2: return description;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

// Used by DatumReader. Applications should not call.
@Override
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: name = (java.lang.CharSequence)value$; break;
case 1: content = value$; break;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
case 2: description = value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

Expand Down Expand Up @@ -153,6 +151,23 @@ public void setContent(java.lang.Object value) {
this.content = value;
}

/**
* Gets the value of the 'description' field.
* @return The value of the 'description' field.
*/
public java.lang.Object getDescription() {
return description;
}


/**
* Sets the value of the 'description' field.
* @param value the value to set.
*/
public void setDescription(java.lang.Object value) {
this.description = value;
}

/**
* Creates a new Image RecordBuilder.
* @return A new Image RecordBuilder
Expand Down Expand Up @@ -190,16 +205,16 @@ public static com.clickhouse.kafka.connect.avro.test.Image.Builder newBuilder(co
/**
* RecordBuilder for Image instances.
*/
@org.apache.avro.specific.AvroGenerated
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Image>
implements org.apache.avro.data.RecordBuilder<Image> {

private java.lang.CharSequence name;
private java.lang.Object content;
private java.lang.Object description;

/** Creates a new Builder */
private Builder() {
super(SCHEMA$, MODEL$);
super(SCHEMA$);
}

/**
Expand All @@ -216,14 +231,18 @@ private Builder(com.clickhouse.kafka.connect.avro.test.Image.Builder other) {
this.content = data().deepCopy(fields()[1].schema(), other.content);
fieldSetFlags()[1] = other.fieldSetFlags()[1];
}
if (isValidValue(fields()[2], other.description)) {
this.description = data().deepCopy(fields()[2].schema(), other.description);
fieldSetFlags()[2] = other.fieldSetFlags()[2];
}
}

/**
* Creates a Builder by copying an existing Image instance
* @param other The existing instance to copy.
*/
private Builder(com.clickhouse.kafka.connect.avro.test.Image other) {
super(SCHEMA$, MODEL$);
super(SCHEMA$);
if (isValidValue(fields()[0], other.name)) {
this.name = data().deepCopy(fields()[0].schema(), other.name);
fieldSetFlags()[0] = true;
Expand All @@ -232,6 +251,10 @@ private Builder(com.clickhouse.kafka.connect.avro.test.Image other) {
this.content = data().deepCopy(fields()[1].schema(), other.content);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.description)) {
this.description = data().deepCopy(fields()[2].schema(), other.description);
fieldSetFlags()[2] = true;
}
}

/**
Expand Down Expand Up @@ -314,13 +337,54 @@ public com.clickhouse.kafka.connect.avro.test.Image.Builder clearContent() {
return this;
}

/**
* Gets the value of the 'description' field.
* @return The value.
*/
public java.lang.Object getDescription() {
return description;
}


/**
* Sets the value of the 'description' field.
* @param value The value of 'description'.
* @return This builder.
*/
public com.clickhouse.kafka.connect.avro.test.Image.Builder setDescription(java.lang.Object value) {
validate(fields()[2], value);
this.description = value;
fieldSetFlags()[2] = true;
return this;
}

/**
* Checks whether the 'description' field has been set.
* @return True if the 'description' field has been set, false otherwise.
*/
public boolean hasDescription() {
return fieldSetFlags()[2];
}


/**
* Clears the value of the 'description' field.
* @return This builder.
*/
public com.clickhouse.kafka.connect.avro.test.Image.Builder clearDescription() {
description = null;
fieldSetFlags()[2] = false;
return this;
}

@Override
@SuppressWarnings("unchecked")
public Image build() {
try {
Image record = new Image();
record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
record.content = fieldSetFlags()[1] ? this.content : defaultValue(fields()[1]);
record.description = fieldSetFlags()[2] ? this.description : defaultValue(fields()[2]);
return record;
} catch (org.apache.avro.AvroMissingFieldException e) {
throw e;
Expand Down
Loading