Skip to content

Decimal field throws an error #448

@justas200

Description

@justas200

Describe the bug

I have a kafka topic test-decimal where I place messages in json format with a decimal field

{
	"RandomNumber": 100.0
}

Kafka Schema for this topic is

{
	"$schema": "http://json-schema.org/draft-04/schema#",
	"title": "Test",
	"type": "object",
	"additionalProperties": false,
	"properties": {
		"RandomNumber": {
			"type": "number",
			"format": "decimal"
		}
	}
}

I then created a table for this in Clickhouse

CREATE TABLE test.test
(
    `RandomNumber` Decimal(28,18)
)
ENGINE = MergeTree
    PRIMARY KEY ()
    ORDER BY ();

and a connector

{
	"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
	"topic2TableMap": "test-decimal=test",
	"topics": "test-decimal",
	"tasks.max": "1",
	"keeperOnCluster": "main",
	"consumer.override.fetch.min.bytes": "50000000",
	"consumer.override.fetch.max.bytes": "200000000",
	"ssl": "false",
	"consumer.override.fetch.max.wait.ms": "25000",
	"database": "test",
	"hostname": "<hidden>",
	"password": "******",
	"consumer.override.auto.offset.reset": "earliest",
	"port": "8123",
	"consumer.override.max.poll.records": "100000",
	"value.converter.schemas.enable": "true",
	"name": "test-decimal",
	"consumer.override.max.partition.fetch.bytes": "200000000",
	"errors.tolerance": "none",
	"exactlyOnce": "false",
	"username": "general"
}

and the connector keeps throwing an error

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.RuntimeException: Number of records: 3 at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:71) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: java.lang.RuntimeException: Topic: [test-decimal], Partition: [0], MinOffset: [0], MaxOffset: [2], (QueryId: [4d408091-d810-4675-8c82-c96b1d652378]) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:68) at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:133) at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:65) ... 12 more Caused by: java.lang.NullPointerException: Cannot invoke "String.equals(Object)" because the return value of "org.apache.kafka.connect.data.Schema.name()" is null at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.validateDataSchema(ClickHouseWriter.java:247) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:742) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:676) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:196) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:66) ... 15 more

which appears to be this line here https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L247 due to objSchema.name() being null.

Expected behaviour

I exepect to see the decimal value go to clickhouse.

Configuration

Environment

  • Kafka-Connect version: 3.6.1
  • Kafka version: 3.6.1
  • OS: Linux

ClickHouse server

  • ClickHouse Server version: clickhouse/clickhouse-server:24.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions