-
Couldn't load subscription status.
- Fork 56
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
I have a kafka topic test-decimal where I place messages in json format with a decimal field
{
"RandomNumber": 100.0
}
Kafka Schema for this topic is
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Test",
"type": "object",
"additionalProperties": false,
"properties": {
"RandomNumber": {
"type": "number",
"format": "decimal"
}
}
}
I then created a table for this in Clickhouse
CREATE TABLE test.test
(
`RandomNumber` Decimal(28,18)
)
ENGINE = MergeTree
PRIMARY KEY ()
ORDER BY ();
and a connector
{
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"topic2TableMap": "test-decimal=test",
"topics": "test-decimal",
"tasks.max": "1",
"keeperOnCluster": "main",
"consumer.override.fetch.min.bytes": "50000000",
"consumer.override.fetch.max.bytes": "200000000",
"ssl": "false",
"consumer.override.fetch.max.wait.ms": "25000",
"database": "test",
"hostname": "<hidden>",
"password": "******",
"consumer.override.auto.offset.reset": "earliest",
"port": "8123",
"consumer.override.max.poll.records": "100000",
"value.converter.schemas.enable": "true",
"name": "test-decimal",
"consumer.override.max.partition.fetch.bytes": "200000000",
"errors.tolerance": "none",
"exactlyOnce": "false",
"username": "general"
}
and the connector keeps throwing an error
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.RuntimeException: Number of records: 3 at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:71) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: java.lang.RuntimeException: Topic: [test-decimal], Partition: [0], MinOffset: [0], MaxOffset: [2], (QueryId: [4d408091-d810-4675-8c82-c96b1d652378]) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:68) at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:133) at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:65) ... 12 more Caused by: java.lang.NullPointerException: Cannot invoke "String.equals(Object)" because the return value of "org.apache.kafka.connect.data.Schema.name()" is null at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.validateDataSchema(ClickHouseWriter.java:247) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:742) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:676) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:196) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:66) ... 15 more
which appears to be this line here https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L247 due to objSchema.name() being null.
Expected behaviour
I exepect to see the decimal value go to clickhouse.
Configuration
Environment
- Kafka-Connect version: 3.6.1
- Kafka version: 3.6.1
- OS: Linux
ClickHouse server
- ClickHouse Server version: clickhouse/clickhouse-server:24.6
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working