-
Notifications
You must be signed in to change notification settings - Fork 377
Open
Description
How to properly configure the Kafka AWS S3 Source Connector so that all files, stored in the location given below, get properly sourced by the connector ?
S3 bucket:
s3://my-bucket:abc/def/year=*/month=*/day=*/part-*.parquet
Here is the snippet of the applied configuration:
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
"tasks.max": "1",
"topics": "topic",
"connect.s3.source.partition.extractor.type": "hierarchical",
"connect.s3.source.partition.search.continuous": "true",
"connect.s3.kcql": "INSERT INTO topic SELECT * from my-bucket:abc/def STOREAS `parquet`",
"connect.s3.ordering.type": "LastModified",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"connect.s3.aws.region": "us-east-1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"connect.s3.source.partition.search.recurse.levels": "0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
The configuration given above seems to be inappropriate as the connector throws the following error
[Worker-07109d9c33fd562b6] [2025-05-31 08:41:30,777] INFO [S3Connector\|task-3] [S3Connector - 3 of 4] Reading next file: my-bucket:abc/def//abc/def/year=2025/month=05/day=31/part-0.parquet/ from line Some(-1) (io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$:95)
--
[Worker-07109d9c33fd562b6] [2025-05-31 08:41:30,778] INFO [S3Connector\|task-2] WorkerSourceTask{id=S3Connector-2} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:490)
[Worker-07109d9c33fd562b6] [2025-05-31 08:41:30,778] INFO [S3Connector\|task-2] WorkerSourceTask{id=S3Connector-2} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:507)
[Worker-07109d9c33fd562b6] [2025-05-31 08:41:30,778] ERROR [S3Connector\|task-2] WorkerSourceTask{id=S3Connector-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:193)
[Worker-07109d9c33fd562b6] java.lang.NumberFormatException: For input string: "part-0"
[Worker-07109d9c33fd562b6] at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
[Worker-07109d9c33fd562b6] at java.base/java.lang.Integer.parseInt(Integer.java:652)
[Worker-07109d9c33fd562b6] at java.base/java.lang.Integer.parseInt(Integer.java:770)
[Worker-07109d9c33fd562b6] at scala.collection.StringOps$.toInt$extension(StringOps.scala:910)
Metadata
Metadata
Assignees
Labels
No labels