1
+ package com .hivemq .extensions .kafka .customizations .helloworld ;
2
+
3
+ import com .hivemq .extension .sdk .api .packets .general .UserProperties ;
4
+ import com .hivemq .extension .sdk .api .packets .general .UserProperty ;
5
+ import com .hivemq .extension .sdk .api .packets .publish .PublishPacket ;
6
+ import com .hivemq .extensions .kafka .api .builders .KafkaRecordBuilder ;
7
+ import com .hivemq .extensions .kafka .api .model .KafkaCluster ;
8
+ import com .hivemq .extensions .kafka .api .model .KafkaRecord ;
9
+ import com .hivemq .extensions .kafka .api .services .KafkaTopicService ;
10
+ import com .hivemq .extensions .kafka .api .transformers .mqtttokafka .MqttToKafkaInput ;
11
+ import com .hivemq .extensions .kafka .api .transformers .mqtttokafka .MqttToKafkaOutput ;
12
+ import org .junit .jupiter .api .BeforeEach ;
13
+ import org .junit .jupiter .api .Test ;
14
+
15
+ import java .nio .ByteBuffer ;
16
+ import java .nio .charset .StandardCharsets ;
17
+ import java .util .List ;
18
+ import java .util .Optional ;
19
+
20
+ import static org .mockito .ArgumentMatchers .any ;
21
+ import static org .mockito .ArgumentMatchers .anyString ;
22
+ import static org .mockito .Mockito .*;
23
+
24
+ class HelloWorldTransformerTest {
25
+
26
+ // mock objects
27
+ private MqttToKafkaInput input ;
28
+ private MqttToKafkaOutput output ;
29
+ private KafkaTopicService topicService ;
30
+ private KafkaRecordBuilder recordBuilder ;
31
+ private KafkaRecord kafkaRecord ;
32
+ private PublishPacket publishPacket ;
33
+ private UserProperty userProperty ;
34
+
35
+ // test object
36
+ private HelloWorldTransformer transformer ;
37
+
38
+ @ BeforeEach
39
+ void setUp () {
40
+ input = mock (MqttToKafkaInput .class );
41
+ output = mock (MqttToKafkaOutput .class );
42
+ topicService = mock (KafkaTopicService .class );
43
+ recordBuilder = mock (KafkaRecordBuilder .class );
44
+ kafkaRecord = mock (KafkaRecord .class );
45
+ publishPacket = mock (PublishPacket .class );
46
+
47
+ when (input .getPublishPacket ()).thenReturn (publishPacket );
48
+ final KafkaCluster kafkaCluster = mock (KafkaCluster .class );
49
+ when (input .getKafkaCluster ()).thenReturn (kafkaCluster );
50
+ when (input .getKafkaTopicService ()).thenReturn (topicService );
51
+
52
+ when (output .newKafkaRecordBuilder ()).thenReturn (recordBuilder );
53
+
54
+ when (topicService .getKafkaTopicState (anyString ())).thenReturn (KafkaTopicService .KafkaTopicState .EXISTS );
55
+ when (topicService .createKafkaTopic (anyString ())).thenReturn (KafkaTopicService .KafkaTopicState .CREATED );
56
+
57
+ when (recordBuilder .topic (anyString ())).thenReturn (recordBuilder );
58
+ when (recordBuilder .value (any (ByteBuffer .class ))).thenReturn (recordBuilder );
59
+ when (recordBuilder .header (anyString (), anyString ())).thenReturn (recordBuilder );
60
+
61
+ when (recordBuilder .build ()).thenReturn (kafkaRecord );
62
+
63
+ when (publishPacket .getTopic ()).thenReturn ("test/topic" );
64
+ when (publishPacket .getPayload ()).thenReturn (Optional .of (ByteBuffer .wrap ("test-payload" .getBytes (StandardCharsets .UTF_8 ))));
65
+ final UserProperties userProperties = mock (UserProperties .class );
66
+ userProperty = mock (UserProperty .class );
67
+ when (userProperties .asList ()).thenReturn (List .of (userProperty ));
68
+ when (userProperty .getName ()).thenReturn ("test-name" );
69
+ when (userProperty .getValue ()).thenReturn ("test-value" );
70
+ when (publishPacket .getUserProperties ()).thenReturn (userProperties );
71
+
72
+ transformer = new HelloWorldTransformer ();
73
+ }
74
+
75
+ @ SuppressWarnings ("OptionalGetWithoutIsPresent" )
76
+ @ Test
77
+ void transformMqttToKafka_setsDataFromPublishPacket () {
78
+ transformer .transformMqttToKafka (input , output );
79
+
80
+ verify (recordBuilder ).topic (eq (KafkaTopicUtil .mqttToKafkaTopic (publishPacket .getTopic ())));
81
+ verify (recordBuilder ).value (eq (publishPacket .getPayload ().get ()));
82
+ // work around for a mockito bug
83
+ final String value = userProperty .getValue ();
84
+ verify (recordBuilder ).header (eq (userProperty .getName ()), eq (value ));
85
+ verify (output ).setKafkaRecords (eq (List .of (kafkaRecord )));
86
+ }
87
+
88
+ @ Test
89
+ void transformMqttToKafka_queriesForTopicExistence () {
90
+ transformer .transformMqttToKafka (input , output );
91
+
92
+ verify (topicService ).getKafkaTopicState (eq (KafkaTopicUtil .mqttToKafkaTopic (publishPacket .getTopic ())));
93
+ }
94
+
95
+ @ Test
96
+ void transformMqttToKafka_createsMissingTopic () {
97
+ final String kafkaTopic = KafkaTopicUtil .mqttToKafkaTopic (publishPacket .getTopic ());
98
+ when (topicService .getKafkaTopicState (eq (kafkaTopic ))).thenReturn (KafkaTopicService .KafkaTopicState .MISSING );
99
+ transformer .transformMqttToKafka (input , output );
100
+
101
+ verify (topicService ).createKafkaTopic (eq (kafkaTopic ));
102
+ }
103
+
104
+ @ Test
105
+ void transformMqttToKafka_abortsAfterFailedTopicQuery () {
106
+ final String kafkaTopic = KafkaTopicUtil .mqttToKafkaTopic (publishPacket .getTopic ());
107
+ when (topicService .getKafkaTopicState (eq (kafkaTopic ))).thenReturn (KafkaTopicService .KafkaTopicState .FAILURE );
108
+ transformer .transformMqttToKafka (input , output );
109
+
110
+ verify (topicService , never ()).createKafkaTopic (anyString ());
111
+ verify (output , never ()).setKafkaRecords (anyList ());
112
+ }
113
+
114
+ @ Test
115
+ void transformMqttToKafka_abortsAfterFailedTopicCreation () {
116
+ final String kafkaTopic = KafkaTopicUtil .mqttToKafkaTopic (publishPacket .getTopic ());
117
+ when (topicService .getKafkaTopicState (eq (kafkaTopic ))).thenReturn (KafkaTopicService .KafkaTopicState .MISSING );
118
+ when (topicService .createKafkaTopic (eq (kafkaTopic ))).thenReturn (KafkaTopicService .KafkaTopicState .FAILURE );
119
+ transformer .transformMqttToKafka (input , output );
120
+
121
+ verify (output , never ()).setKafkaRecords (anyList ());
122
+ }
123
+ }
0 commit comments