Skip to content

Cannot Publish Kafka Message on Compacted Kafka Topic due to no Key #116

Closed
@vlad-ge

Description

@vlad-ge

Describe the bug
I tried to publish a kafka message through springwolf-ui on a "compacted" kafka topic. The publish is rejected due to the fact that the producer publishes messages with key value "null".

Dependencies and versions used
springwolf-ui: 0.6.0
springwolf-kafka: 0.9.0

Code example
The problem lies in the producer since the message gets published without a key:
`
package io.github.stavshamir.springwolf.producer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

@slf4j
@requiredargsconstructor
public class SpringwolfKafkaProducer {

private final Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate;

public boolean isEnabled() {
    return kafkaTemplate.isPresent();
}

public void send(String topic, Map<String, String> headers, Map<String, ?> payload) {
    if (kafkaTemplate.isPresent()) {
        kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload));
    } else {
        log.warn("Kafka producer is not configured");
    }
}

private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, Map<String, String> headers, Map<String, ?> payload) {
    List<Header> recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList();

    return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders);
}

private List<Header> buildHeaders(Map<String, String> headers) {
    return headers.entrySet().stream()
            .map(header -> new RecordHeader(header.getKey(), header.getValue().getBytes(UTF_8)))
            .collect(Collectors.toList());
}

}

`

Stack trace and error logs
org.apache.kafka.common.InvalidRecordException: Compacted topic cannot accept message without key in topic partition xyz.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions