Not able to create KafkaConnect --org.apache.kafka.common.config.ConfigException: Topic 'connect-cluster-offsets' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets #4838
iamrsaravana
started this conversation in
General
Replies: 1 comment 15 replies
-
I guess the problem is what the error suggests? The topics needed by Connect already exists but does not have the right settings? So you should either change it or delete it so that it is created with the right settings. |
Beta Was this translation helpful? Give feedback.
15 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
I have create kafka cluster using kafka-persistent.yaml file:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
i try to create AMPQ source connector using below yaml file
topics:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
connector.yaml file:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: camelamqpsourceconnector
labels:
strimzi.io/cluster: my-connect-cluster-new
spec:
class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
#tasksMax: 1
topics: mytopic
config:
camel.component.amqp.includeAmqpAnnotations: true
camel.component.amqp.connectionFactory: '#class:org.apache.qpid.jms.JmsConnectionFactory'
camel.component.amqp.connectionFactory.remoteURI: amqp://10.97.217.229:15672
camel.component.amqp.username: consumer@HONO
camel.component.amqp.password: verysecret
camel.component.amqp.testConnectionOnStartup: true
camel.source.path.destinationType: queue
camel.source.path.destinationName: telemetry/LTTS_TENANT
topics: mytopic
tasksMax: 1
amqp_connector.yaml:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster-new
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: 10.128.0.6:5000/my-connector-amqppostsql
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster-new
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
I am getting below error messages:
Error Message:
2021-04-28 12:21:37,670 ERROR [Worker clientId=connect-1, groupId=connect-cluster-new] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
org.apache.kafka.common.config.ConfigException: Topic 'connect-cluster-offsets' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:420)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:116)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:129)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:128)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:197)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:128)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:288)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-28 12:21:37,716 INFO Adding admin resources to main listener (org.apache.kafka.connect.runtime.rest.RestServer) [main]
could you help me how to create connector without any error
Beta Was this translation helpful? Give feedback.
All reactions