How to configure 2 kafka cluster bootstrap server for same kafka topic and table #444
shubhamshinde7
started this conversation in
General
Replies: 1 comment 6 replies
-
Could you paste more of the logs? |
Beta Was this translation helpful? Give feedback.
6 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I have 2 kafka clusters with same topic name as table name. I created below config using API. Now I am getting
│ valk-kafka-connect [2024-09-25 19:19:26,281] WARN Sending [20] records to DLQ for exception: Topic: [sna.valk.sn_valk.user_ │ │ data], Partition: [0], MinOffset: [125], MaxOffset: [144], (QueryId: [994c936b-19eb-4d25-9964-8b48ae28a2f3]) (com.clickhous │ │ e.kafka.connect.sink.ClickHouseSinkTask)
error every timeBelow is kafka connect config
`
curl -k -X PUT -H "Content-Type: application/json"
--data '{
"consumer.override.bootstrap.servers": "kafka1:3000",
"consumer.override.group.id": "sna.valk.sn_valk.user_data_shubham-group",
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"consumer.override.max.poll.records": 100000,
"consumer.override.fetch.max.wait.ms": 60000,
"consumer.override.fetch.min.bytes": 200615200,
"consumer.override.request.timeout.ms": 75000,
"consumer.override.max.partition.fetch.bytes": 200615200,
"tasks.max": "4",
"topics": "sna.valk.sn_valk.user_data",
"clickhouse.table": "sna.valk.sn_valk.user_data",
"ssl": "true",
"security.protocol": "SSL",
"hostname": "valk-clickhouse-svc",
"port": "8443",
"database": "valk",
"errors.retry.timeout": "60",
"exactlyOnce": "true",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"username": "default",
"schemas.enable": "false",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "sna.valk.sn_valk.user_data_dlq",
"errors.tolerance": "all",
"clickhouseSettings": "date_time_input_format=best_effort"
}'
https://valk-kafka-connect-svc:8443/connectors/valk-shubham/config
curl -k -X PUT -H "Content-Type: application/json"
--data '{
"consumer.override.bootstrap.servers": "kafka2:3000",
"consumer.override.group.id": "sna.valk.sn_valk.user_data_release-group",
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"consumer.override.max.poll.records": 100000,
"consumer.override.fetch.max.wait.ms": 60000,
"consumer.override.fetch.min.bytes": 200615200,
"consumer.override.request.timeout.ms": 75000,
"consumer.override.max.partition.fetch.bytes": 200615200,
"tasks.max": "4",
"topics": "sna.valk.sn_valk.user_data",
"clickhouse.table": "sna.valk.sn_valk.user_data",
"ssl": "true",
"security.protocol": "SSL",
"hostname": "valk-clickhouse-svc",
"port": "8443",
"database": "valk",
"keeperOnCluster": "ON CLUSTER valk_global",
"errors.retry.timeout": "60",
"exactlyOnce": "true",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"username": "default",
"schemas.enable": "false",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "sna.valk.sn_valk.user_data_dlq",
"errors.tolerance": "all",
"clickhouseSettings": "date_time_input_format=best_effort"
}'
https://valk-kafka-connect-svc:8443/connectors/valk-release/config
`
Beta Was this translation helpful? Give feedback.
All reactions