Description
Hi, I'm exploring the option to have a backup to cold storage (azure blob) using the blob sink connector. In general, working great.
However, I'm facing a problem in case of records with headers where the value is null:
org.apache.kafka.connect.errors.ConnectException: Failed to write records to Azure Blob at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:163) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flush(AzureBlobSinkTask.java:130) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at java.base/java.util.Base64$Encoder.encode(Base64.java:267) at io.aiven.kafka.connect.common.output.plainwriter.HeadersPlainWriter.write(HeadersPlainWriter.java:48) at io.aiven.kafka.connect.common.output.plainwriter.PlainOutputStreamWriter.writeOneRecord(PlainOutputStreamWriter.java:51) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecord(OutputWriter.java:88) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecords(OutputWriter.java:76) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:156) ... 15 more
This can be reproduced by sinking following record (with a correlation_id header set to null):
{ "Value": "{ ... }", "Offset": 343, "Key": "{ ... }", "Partition": 0, "Headers": { "correlation_id": null, "Timestamp": "2025-04-10T08:56:44.26Z" }
The HeadersPlainWriter is assuming all header values are non-null. Should we make this null-safe?