Skip to content

Commit 2520554

Browse files
add KafkaToMqttTransformer example (#2)
added KafkaToMqttTransformer example version to 4.5.0
1 parent 28bd220 commit 2520554

File tree

4 files changed

+184
-2
lines changed

4 files changed

+184
-2
lines changed

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
version=4.4.3
1+
version=4.5.0
22
#
33
# main dependencies
44
#
5-
hivemq-kakfa-sdk.version=4.4.3
5+
hivemq-kakfa-sdk.version=4.5.0
66
slf4j.version=1.7.30
77
junit.jupiter.version=5.5.1
88
mockito.version=3.4.6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.hivemq.extensions.kafka.customizations.helloworld;
2+
3+
import com.codahale.metrics.Counter;
4+
import com.codahale.metrics.MetricRegistry;
5+
import com.hivemq.extension.sdk.api.annotations.NotNull;
6+
import com.hivemq.extension.sdk.api.annotations.Nullable;
7+
import com.hivemq.extension.sdk.api.services.builder.PublishBuilder;
8+
import com.hivemq.extensions.kafka.api.model.KafkaRecord;
9+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttInitInput;
10+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttInput;
11+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttOutput;
12+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttTransformer;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.util.List;
17+
18+
/**
19+
* This example {@link KafkaToMqttTransformer} accepts a Kafka record and tries to create a new MQTT PUBLISH from it.
20+
* <p>
21+
* It performs the following computational steps:
22+
* <ol>
23+
* <li> Convert the Kafka topic to a MQTT topic.
24+
* <li> Create a new MQTT publish consisting of:
25+
* <ul>
26+
* <li> The kafka topic as MQTT topic.
27+
* <li> The value as payload
28+
* <li> All present kafka header as user properties.
29+
* </ul>
30+
* <li> Give the MQTT publish to the customization framework for publishing.
31+
* </ol>
32+
* <p>
33+
* An example kafka-configuration.xml enabling this transformer is provided in {@code src/main/resources}.
34+
*
35+
* @author Daniel Krüger
36+
*/
37+
public class KafkaToMqttHelloWorldTransformer implements KafkaToMqttTransformer {
38+
39+
private static final @NotNull Logger log = LoggerFactory.getLogger(KafkaToMqttHelloWorldTransformer.class);
40+
41+
public final static String MISSING_VALUE_COUNTER_NAME = "com.hivemq.hello-world-example.missing-value.count";
42+
43+
private @Nullable MetricRegistry metricRegistry;
44+
private @Nullable Counter missingValueCounter;
45+
46+
@Override
47+
public void init(@NotNull final KafkaToMqttInitInput input) {
48+
this.metricRegistry = input.getMetricRegistry();
49+
// build any custom metric based on your business logic and needs
50+
this.missingValueCounter = metricRegistry.counter(MISSING_VALUE_COUNTER_NAME);
51+
}
52+
53+
54+
@Override
55+
public void transformKafkaToMqtt(@NotNull final KafkaToMqttInput kafkaToMqttInput,
56+
@NotNull final KafkaToMqttOutput kafkaToMqttOutput) {
57+
// get the Kafka record from the input
58+
final KafkaRecord kafkaRecord = kafkaToMqttInput.getKafkaRecord();
59+
60+
// get the PublishBuilder object from the output
61+
final PublishBuilder publishBuilder = kafkaToMqttOutput.newPublishBuilder().topic(kafkaRecord.getTopic());
62+
63+
// set kafka record value as payload, if present. Otherwise increase the missing kafka record value counter.
64+
kafkaRecord.getValue().ifPresentOrElse(publishBuilder::payload, missingValueCounter::inc);
65+
66+
// convert Kafka header to MQTT user properties
67+
kafkaRecord.getHeaders()
68+
.asList()
69+
.forEach(kafkaHeader -> publishBuilder.userProperty(kafkaHeader.getKey(), kafkaHeader.getValueAsString()));
70+
71+
kafkaToMqttOutput.setPublishes(List.of(publishBuilder.build()));
72+
}
73+
}

src/main/resources/kafka-configuration.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,14 @@
2020
</mqtt-to-kafka-transformer>
2121
</mqtt-to-kafka-transformers>
2222

23+
<kafka-to-mqtt-transformers>
24+
<kafka-to-mqtt-transformer>
25+
<id>hello-world-kafka-to-mqtt-transformer</id>
26+
<cluster-id>cluster01</cluster-id>
27+
<kafka-topics>
28+
<kafka-topic>transform</kafka-topic>
29+
</kafka-topics>
30+
<transformer>com.hivemq.extensions.kafka.customizations.helloworld.KafkaToMqttHelloWorldTransformer</transformer>
31+
</kafka-to-mqtt-transformer>
32+
</kafka-to-mqtt-transformers>
2333
</kafka-configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.hivemq.extensions.kafka.customizations.helloworld;
2+
3+
import com.codahale.metrics.MetricRegistry;
4+
import com.hivemq.extension.sdk.api.packets.general.UserProperty;
5+
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
6+
import com.hivemq.extension.sdk.api.services.builder.PublishBuilder;
7+
import com.hivemq.extension.sdk.api.services.publish.Publish;
8+
import com.hivemq.extensions.kafka.api.builders.KafkaRecordBuilder;
9+
import com.hivemq.extensions.kafka.api.model.KafkaHeader;
10+
import com.hivemq.extensions.kafka.api.model.KafkaHeaders;
11+
import com.hivemq.extensions.kafka.api.model.KafkaRecord;
12+
import com.hivemq.extensions.kafka.api.services.KafkaTopicService;
13+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttInitInput;
14+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttInput;
15+
import com.hivemq.extensions.kafka.api.transformers.kafkatomqtt.KafkaToMqttOutput;
16+
import com.hivemq.extensions.kafka.api.transformers.mqtttokafka.MqttToKafkaInput;
17+
import com.hivemq.extensions.kafka.api.transformers.mqtttokafka.MqttToKafkaOutput;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.List;
23+
import java.util.Optional;
24+
25+
import static org.junit.jupiter.api.Assertions.*;
26+
import static org.mockito.ArgumentMatchers.eq;
27+
import static org.mockito.Mockito.*;
28+
29+
class KafkaToMqttHelloWorldTransformerTest {
30+
31+
// mock objects
32+
private KafkaToMqttInput input;
33+
private KafkaToMqttInitInput initInput;
34+
private KafkaToMqttOutput output;
35+
private PublishBuilder publishBuilder;
36+
private KafkaRecord kafkaRecord;
37+
private KafkaHeaders kafkaHeaders;
38+
private KafkaHeader kafkaHeader;
39+
private Publish publish;
40+
41+
// test object
42+
private KafkaToMqttHelloWorldTransformer transformer;
43+
44+
private MetricRegistry metricRegistry;
45+
46+
@BeforeEach
47+
void setUp() {
48+
input = mock(KafkaToMqttInput.class);
49+
initInput = mock(KafkaToMqttInitInput.class);
50+
output = mock(KafkaToMqttOutput.class);
51+
kafkaRecord = mock(KafkaRecord.class);
52+
publishBuilder = mock(PublishBuilder.class);
53+
publish = mock(Publish.class);
54+
kafkaHeaders = mock(KafkaHeaders.class);
55+
kafkaHeader = mock(KafkaHeader.class);
56+
metricRegistry = new MetricRegistry();
57+
58+
when(input.getKafkaRecord()).thenReturn(kafkaRecord);
59+
when(output.newPublishBuilder()).thenReturn(publishBuilder);
60+
transformer= new KafkaToMqttHelloWorldTransformer();
61+
62+
when(kafkaRecord.getHeaders()).thenReturn(kafkaHeaders);
63+
when(kafkaRecord.getTopic()).thenReturn("topic");
64+
when(kafkaRecord.getValue()).thenReturn(Optional.of(ByteBuffer.wrap("test".getBytes())));
65+
66+
when(kafkaHeaders.asList()).thenReturn(List.of(kafkaHeader));
67+
when(kafkaHeader.getKey()).thenReturn("test-key");
68+
when(kafkaHeader.getValueAsString()).thenReturn("test-value");
69+
70+
when(publishBuilder.build()).thenReturn(publish);
71+
when(publishBuilder.contentType(any())).thenReturn(publishBuilder);
72+
when(publishBuilder.topic(anyString())).thenReturn(publishBuilder);
73+
when(publishBuilder.correlationData(any())).thenReturn(publishBuilder);
74+
when(publishBuilder.messageExpiryInterval(anyLong())).thenReturn(publishBuilder);
75+
when(publishBuilder.payloadFormatIndicator(any())).thenReturn(publishBuilder);
76+
when(publishBuilder.userProperty(anyString(), anyString())).thenReturn(publishBuilder);
77+
78+
when(initInput.getMetricRegistry()).thenReturn(metricRegistry);
79+
transformer.init(initInput);
80+
}
81+
82+
@Test
83+
void transformMqttToKafka_setsDataFromKafkaRecord() {
84+
transformer.transformKafkaToMqtt(input, output);
85+
verify(publishBuilder).topic(eq(kafkaRecord.getTopic()));
86+
verify(publishBuilder).payload(eq(kafkaRecord.getValue().get()));
87+
// work around for a mockito bug
88+
final String value = kafkaHeader.getValueAsString();
89+
verify(publishBuilder).userProperty(eq(kafkaHeader.getKey()), eq(value));
90+
verify(output).setPublishes(eq(List.of(publish)));
91+
}
92+
93+
@Test
94+
void transformMqttToKafka_missingValueIncCounter() {
95+
when(kafkaRecord.getValue()).thenReturn(Optional.empty());
96+
transformer.transformKafkaToMqtt(input, output);
97+
assertEquals(1, metricRegistry.counter(KafkaToMqttHelloWorldTransformer.MISSING_VALUE_COUNTER_NAME).getCount());
98+
}
99+
}

0 commit comments

Comments
 (0)