Description
Describe the bug
I tried to publish a kafka message through springwolf-ui on a "compacted" kafka topic. The publish is rejected due to the fact that the producer publishes messages with key value "null".
Dependencies and versions used
springwolf-ui: 0.6.0
springwolf-kafka: 0.9.0
Code example
The problem lies in the producer since the message gets published without a key:
`
package io.github.stavshamir.springwolf.producer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
@slf4j
@requiredargsconstructor
public class SpringwolfKafkaProducer {
private final Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate;
public boolean isEnabled() {
return kafkaTemplate.isPresent();
}
public void send(String topic, Map<String, String> headers, Map<String, ?> payload) {
if (kafkaTemplate.isPresent()) {
kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload));
} else {
log.warn("Kafka producer is not configured");
}
}
private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, Map<String, String> headers, Map<String, ?> payload) {
List<Header> recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList();
return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders);
}
private List<Header> buildHeaders(Map<String, String> headers) {
return headers.entrySet().stream()
.map(header -> new RecordHeader(header.getKey(), header.getValue().getBytes(UTF_8)))
.collect(Collectors.toList());
}
}
`
Stack trace and error logs
org.apache.kafka.common.InvalidRecordException: Compacted topic cannot accept message without key in topic partition xyz.