Description
Getting the following errors while trying to decode an avro kafka payloads:
���T.2016-02-18T21:04:14.182MadisonMadison._sp80UM-LEeWzy4JC2MRxcg._dM83cNa1EeWNn7pTE75EYA jconlon@mudshark"Condor Industries"Condor Industries._iqWIsM9sEeWzy4JC2MRxcg2cdo://mudshark:2036/repo1�� {:exception=>#<NoMethodError: undefined method
decode' for #Array:0x61a3ee90>, :backtrace=>["/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:178:in queue_event'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:148:in
run'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.2.2-java/lib/logstash/pipeline.rb:331:in inputworker'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.2.2-java/lib/logstash/pipeline.rb:325:in
start_input'"], :level=>:error}`
Logstash configuration
input {
kafka {
topic_id => "Observe"
type => "Observe"
reset_beginning => true
auto_offset_reset => smallest
codec => {
avro => {
schema_uri => "/home/jconlon/git/com.verticon.irouter/com.verticon.im.avro/avroSchema/observeEvent.avsc"
}
}
}
}
Payloads created with Avro 1.7.7 in Java:
Observe event = (Observe) abstractEvent;
ObserveEvent observeEvent = ObserveEvent.newBuilder().setComments(event.getComments())
.setEntityKey(entity.getKey()).setEntityName(entity.getName()).setEntityTag(entity.getTag())
.setEventKey(event.getKey())
.setLocalTime(event.getLocalDateTime() != null ? event.getLocalDateTime().toString() : null)
.setModelRepository(modelRepo).setCurrentParentKey(parent.getKey())
.setCurrentParentName(parent.getName()).setCurrentParentTag(parent.getTag())
.setTimeStamp(event.getUtc() != null ? event.getUtc().toEpochMilli() : null)
.setUrl(event.getUrl() != null ? event.getUrl().toString() : null).setUser(event.getUser()).build();
logger.debug("Transformed payload {}", observeEvent);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
// null);
BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(out, null);
DatumWriter<ObserveEvent> writer = new SpecificDatumWriter<ObserveEvent>(ObserveEvent.class);
try {
writer.write(observeEvent, encoder);
encoder.flush();
out.close();
serializedBytes = out.toByteArray();
} catch (IOException e) {
logger.error("Failed to write byte array", e);
}
Tried both:
// BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(out, null);
Appreciated any tips on solving this.
thanks,
John