-
Notifications
You must be signed in to change notification settings - Fork 377
Description
Hi !
We're doing a backup of all topics using Parquet
and 'store.envelope'=true
. Here are the details:
Stream-reactor-version: 8.1.30
Kafka connect version: based on Docker image confluentinc/cp-kafka-connect:7.8.0
Backup config:
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"tasks.max": "10",
"connect.s3.aws.auth.mode": "Default",
"connect.s3.compression.codec": "zstd",
"topics.regex": ".*",
"key.converter.schemas.enable": "false",
"connect.s3.kcql": "insert into `msk-backup:msk-backup-parquet` select * from `*` PARTITIONBY _topic, _partition STOREAS `PARQUET` PROPERTIES ('store.envelope'=true,'flush.count'=1000,'flush.interval'=300, 'flush.size'=50000000, 'partition.include.keys'=false);",
"name": "msk-s3-backup-sink-all-parquet",
"value.converter.schemas.enable": "false",
"connect.s3.compression.level": "9",
"connect.s3.aws.region": "eu-west-1",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
The Problem:
In situations where consecutive messages have inconsistent headers—whether due to differing keys or the absence of headers—detecting changes in header keys from the previous message is regarded as a schema change. Consequently, a new file is generated, leading to fragmentation of the topic across multiple files that do not adhere to the flush options.
Proposal
Add an option that stores the headers as a ByteArray (just as for keys and values) when 'store.envelope'=true
. This will make the schema consistent across all the messages. So, it will be:
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message value, which can be a primitive or a complex object>,
"headers": <the marshalled headers interpreted as bytes>,
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}