-
Notifications
You must be signed in to change notification settings - Fork 21
NPE when header value is null #434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Do you have the full stack trace or a test case that shows the issue? How did you manage to pass a correlation_id of |
Just to be sure this is about RecordHeaders (cfr. https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java). If these headers contain one (or more) key-value pair where the value == null, the The correlation_id in the example above was just an example, you can reproduce this with any arbitrary header. |
What, in your opinion, is the correct function? Should it write an empty string or not writhe the header key at all? |
Hi, that question indeed puzzles me a bit: Writing an empty string feels wrong, because base64(“”) == “” should be respected (decoding that empty string should give us an empty string back, which is different from our original null value. Was there any reason to not encode the entire header structure, instead of encoding key/value header pairs separately? Regards Jeroen |
I don't know of a specific reason for that design decision. Do you see a way to change to using full header serialisation keeping in mind that reading the file back we would need to be able to detect if the header was encoded as a block or as individual entries. |
Uh oh!
There was an error while loading. Please reload this page.
Hi, I'm exploring the option to have a backup to cold storage (azure blob) using the blob sink connector. In general, working great.
However, I'm facing a problem in case of records with headers where the value is null:
org.apache.kafka.connect.errors.ConnectException: Failed to write records to Azure Blob at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:163) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flush(AzureBlobSinkTask.java:130) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at java.base/java.util.Base64$Encoder.encode(Base64.java:267) at io.aiven.kafka.connect.common.output.plainwriter.HeadersPlainWriter.write(HeadersPlainWriter.java:48) at io.aiven.kafka.connect.common.output.plainwriter.PlainOutputStreamWriter.writeOneRecord(PlainOutputStreamWriter.java:51) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecord(OutputWriter.java:88) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecords(OutputWriter.java:76) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:156) ... 15 more
This can be reproduced by sinking following record (with a correlation_id header set to null):
{ "Value": "{ ... }", "Offset": 343, "Key": "{ ... }", "Partition": 0, "Headers": { "correlation_id": null, "Timestamp": "2025-04-10T08:56:44.26Z" }
The HeadersPlainWriter is assuming all header values are non-null. Should we make this null-safe?
The text was updated successfully, but these errors were encountered: