Description
New description
I was not able to get the codec to work based on a Kafka input, and I created a minimum example using the file input plugin in the original description.
I did then discover the value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
option for the Kafka plugin, which I think is probably a reasonable thing to require, however it was unclear to me that this was required.
Indeed the documentation for this plugin makes no mention of this special requirement for input plugins. I think that should probably be added to the main description and the description for the binary encoding option.
Plugins which do not have an option similar to the one for the Kafka plugin would then be completely unsupported and that should probably also be mentioned.
Old description
Logstash information:
Logstash version 8.12.1, running the official docker image via docker cli.
Description of the problem including expected versus actual behavior:
Deserialization of avro records does not work using the binary (plain) encoding.
It looks like this was well-known back when it was decided that Logstash should only work with base64 encoded data, however we had an expectation from the documentation and the recent addition of the binary encoding that it would also work with plain data.
If the problem is unfixable, then at the very least the documentation should be updated to say in quite large letters that for decoding, still only base64 is supported.
Steps to reproduce:
- Create example schema (referred to as
example-schema.avsc
below):
{
"name": "Repro",
"type": "record",
"fields": [
{"name": "f1", "type": "string"}
]
}
- Produce avro record. Can be done with many language bindings, here is an example with Python (we append newline to work with the file input plugin):
from avro.io import BinaryEncoder, DatumWriter
import avro.schema
schema = avro.schema.parse(open("example-schema.avsc").read())
writer = DatumWriter(schema)
with open("example_record.avro", "wb") as f:
encoder = BinaryEncoder(f)
data = {"f1": "x" * 192}
writer.write(data, encoder)
f.write(b"\n")
- Create the following
logstash.conf
example pipeline:
input {
file {
path => ["/usr/share/logstash/example_record.avro"]
codec => avro {
schema_uri => "/usr/share/logstash/example-schema.avsc"
encoding => binary
}
exit_after_read => true
mode => read
}
}
filter {
ruby {
code => 'logger.error("test")'
}
}
output {
stdout {}
}
- Run logstash with the above minimal configuration:
#!/usr/bin/bash -eu
docker run --rm -it \
-v ./example_record.avro:/usr/share/logstash/example_record.avro:ro \
-v ./example-schema.avsc:/usr/share/logstash/example-schema.avsc:ro \
-v ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro \
logstash:8.12.1 -f /usr/share/logstash/pipeline/logstash.conf
Provide logs (if relevant):
The following error is output:
[2024-02-21T09:19:27,277][ERROR][filewatch.readmode.handlers.readfile][main][3fc59893622e156ef5c5b5e2f684672e512b0a4e50a2117e214b3f6c2d6c8976] controlled_read: general error reading file {:path=>"/usr/share/logstash/example_record.avro", :exception=>ArgumentError, :message=>"negative length -3649528 given", :backtrace=>["org/jruby/ext/stringio/StringIO.java:860:in `read'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:106:in `read'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:93:in `read_bytes'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:99:in `read_string'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:276:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:364:in `block in read_record'", "org/jruby/RubyArray.java:1989:in `each'", "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:361:in `read_record'"]}