Skip to content

NPE when header value is null #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jeroenvekemans opened this issue Apr 11, 2025 · 5 comments
Open

NPE when header value is null #434

jeroenvekemans opened this issue Apr 11, 2025 · 5 comments
Assignees

Comments

@jeroenvekemans
Copy link

jeroenvekemans commented Apr 11, 2025

Hi, I'm exploring the option to have a backup to cold storage (azure blob) using the blob sink connector. In general, working great.

However, I'm facing a problem in case of records with headers where the value is null:

org.apache.kafka.connect.errors.ConnectException: Failed to write records to Azure Blob at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:163) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flush(AzureBlobSinkTask.java:130) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at java.base/java.util.Base64$Encoder.encode(Base64.java:267) at io.aiven.kafka.connect.common.output.plainwriter.HeadersPlainWriter.write(HeadersPlainWriter.java:48) at io.aiven.kafka.connect.common.output.plainwriter.PlainOutputStreamWriter.writeOneRecord(PlainOutputStreamWriter.java:51) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecord(OutputWriter.java:88) at io.aiven.kafka.connect.common.output.OutputWriter.writeRecords(OutputWriter.java:76) at io.aiven.kafka.connect.azure.sink.AzureBlobSinkTask.flushFile(AzureBlobSinkTask.java:156) ... 15 more

This can be reproduced by sinking following record (with a correlation_id header set to null):

{ "Value": "{ ... }", "Offset": 343, "Key": "{ ... }", "Partition": 0, "Headers": { "correlation_id": null, "Timestamp": "2025-04-10T08:56:44.26Z" }

The HeadersPlainWriter is assuming all header values are non-null. Should we make this null-safe?

@Claudenw
Copy link
Contributor

Claudenw commented Apr 14, 2025

Do you have the full stack trace or a test case that shows the issue?

How did you manage to pass a correlation_id of null? The struct does not allow null values in the RequestHeader.

@Claudenw Claudenw self-assigned this Apr 14, 2025
@jeroenvekemans
Copy link
Author

Just to be sure this is about RecordHeaders (cfr. https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java). If these headers contain one (or more) key-value pair where the value == null, the HeadersPlainWriter throws a NPE because its logic is going to Base64 encode the Header value.

The correlation_id in the example above was just an example, you can reproduce this with any arbitrary header.

@Claudenw
Copy link
Contributor

What, in your opinion, is the correct function? Should it write an empty string or not writhe the header key at all?

@jeroenvekemans
Copy link
Author

Hi, that question indeed puzzles me a bit:

Writing an empty string feels wrong, because base64(“”) == “” should be respected (decoding that empty string should give us an empty string back, which is different from our original null value.
Not writing the header at all might surprise users of this connector as well, in the end, the header entry was present.

Was there any reason to not encode the entire header structure, instead of encoding key/value header pairs separately?

Regards

Jeroen

@Claudenw
Copy link
Contributor

Claudenw commented May 9, 2025

I don't know of a specific reason for that design decision.

Do you see a way to change to using full header serialisation keeping in mind that reading the file back we would need to be able to detect if the header was encoded as a block or as individual entries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants