-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Hello,
I deployed Elastic search version 7.4.0 on Azure, and I am using Aiven Kafka + Aiven Schema Registry + Aiven Kafka Connect for copying my messages from Kafka topic to the deployed Elastic Search.
connector configuration:
{
"name": "engagement-aggregate-sink",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "24",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "ExtractTenantId,IndexRouter",
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms": "60000",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "customer-journey-engagement-aggregate",
"errors.deadletterqueue.topic.name": "customer-journey-engagement-aggregate-deadletterqueue",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"connection.url": "ES_URL",
"connection.username":"USERNAME",
"connection.password":"PASSWORD",
"batch.size": "10000",
"max.in.flight.requests": "3",
"max.buffered.records": "1000",
"linger.ms": "100",
"flush.timeout.ms": "20000",
"max.retries": "10",
"retry.backoff.ms": "1000",
"connection.timeout.ms": "1000",
"read.timeout.ms": "1000",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"compact.map.entries": "false",
"drop.invalid.message": "true",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"transforms.IndexRouter.timestamp.format": "YYYYMM",
"transforms.ExtractTenantId.type": "io.aiven.kafka.connect.transforms.ExtractTopic$Value",
"transforms.ExtractTenantId.skip.missing.or.null": "false",
"transforms.ExtractTenantId.field.name": "tenantId",
"transforms.IndexRouter.topic.format": "engagement-${topic}-${timestamp}",
"transforms.IndexRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"value.converter.schema.registry.url": "KAFKA_URL",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"UNM:PWD",
"value.converter.max.schemas.per.subject": "1000",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"value.converter.auto.register.schemas": "false"
}
Even if I use both the sink connector versions 5.0.3 and 6.0.3 it throws the same exception as shown below:
java.lang.NullPointerException
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:179)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:148)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:117)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:123)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:52)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:195)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
If I use the Aiven Managed Elastic Search everything works fine.
Could you please help figure out, what is the issue in using a elastic search service deployed on Azure?
Metadata
Metadata
Assignees
Labels
No labels