Skip to content

Commit 39fdd35

Browse files
DC2-DanielKruegerSgtSilviosauroterMicWalter
authored
Feature/id 517/kafka to mqtt transformer (#13)
Added the KafkaToMqttTransformer interfaces. Added get method for metric registry Add link mechanism and extend readme Co-authored-by: Silvio Giebl <silvio.giebl@hivemq.com> Co-authored-by: Georg <georg.held@hivemq.com> Co-authored-by: Silvio Giebl <silvio.giebl@hivemq.com> Co-authored-by: Georg Held <10121608+sauroter@users.noreply.github.com> Co-authored-by: Michael Walter <michael.walter@dc-square.de>
1 parent aa62270 commit 39fdd35

File tree

13 files changed

+249
-11
lines changed

13 files changed

+249
-11
lines changed

README.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,11 @@ To use the SDK, add the following dependency to your Maven project:
3131

3232
The HiveMQ Kafka Extension SDK is licensed under the `APACHE LICENSE, VERSION 2.0`.
3333
A copy of the license can be found link:LICENSE[here].
34+
35+
== Setup
36+
To be able to use the latest changes of the `hivemq-extension-sdk` please check out the https://github.com/hivemq/hivemq-extension-sdk repository in the same folder as the current project.
37+
(See line `includeBuild("../hivemq-extension-sdk")` in `settings.gradle.kts`).
38+
39+
NOTE: You can also change the path in the `settings.gradle.kts` to the path of your `hivemq-extension-sdk`.
40+
41+
Otherwise, it will not be possible to publish develop snapshots to your local repository.

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ tasks.withType<Jar>().configureEach {
9191

9292
tasks.javadoc {
9393
title = "${metadata.readableName} ${project.version} API"
94-
94+
isFailOnError = false
9595
doLast { // javadoc search fix for jdk 11 https://bugs.openjdk.java.net/browse/JDK-8215291
9696
copy {
9797
from("${buildDir}/docs/javadoc/search.js")

settings.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,9 @@ pluginManagement {
99
}
1010

1111
rootProject.name = "hivemq-kafka-extension-customization-sdk"
12+
13+
if (file("../hivemq-extension-sdk").exists()) {
14+
if (gradle.parent == null) { // not part of a composite build
15+
includeBuild("../hivemq-extension-sdk")
16+
}
17+
}

src/main/java/com/hivemq/extensions/kafka/api/model/KafkaHeader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package com.hivemq.extensions.kafka.api.model;
1718

1819
import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
@@ -52,11 +53,11 @@ public interface KafkaHeader {
5253
* @return the value of this header.
5354
* @since 4.4.0
5455
*/
55-
@NotNull byte[] getValueAsByteArray();
56+
byte @NotNull [] getValueAsByteArray();
5657

5758
/**
58-
* @return the value of this header as a string. {@link java.nio.charset.StandardCharsets#UTF_8} is used for the
59-
* decoding.
59+
* @return the value of this header as a string. {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used for
60+
* the decoding.
6061
* @since 4.4.0
6162
*/
6263
@NotNull String getValueAsString();

src/main/java/com/hivemq/extensions/kafka/api/services/KafkaTopicService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public interface KafkaTopicService {
9191
* @throws NullPointerException if {@code topics} is or contains null.
9292
* @since 4.4.0
9393
*/
94-
@NotNull Map<String, @NotNull KafkaTopicState> createKafkaTopics(@NotNull Set<@NotNull String> topics);
94+
@Immutable @NotNull Map<String, @NotNull KafkaTopicState> createKafkaTopics(@NotNull Set<@NotNull String> topics);
9595

9696
/**
9797
* KafkaTopicState encodes the current known state of a Kafka topic on the associated cluster.

src/main/java/com/hivemq/extensions/kafka/api/transformers/TransformerInitInput.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package com.hivemq.extensions.kafka.api.transformers;
1818

19+
import com.codahale.metrics.MetricRegistry;
1920
import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
2021
import com.hivemq.extension.sdk.api.annotations.Immutable;
22+
import com.hivemq.extension.sdk.api.annotations.NotNull;
2123

2224
/**
2325
* A marker interface for the input object of the {@link Transformer#init(TransformerInitInput)} method.
@@ -28,4 +30,15 @@
2830
*/
2931
@Immutable
3032
@DoNotImplement
31-
public interface TransformerInitInput {}
33+
public interface TransformerInitInput {
34+
35+
/**
36+
* Get the {@link MetricRegistry} of this HiveMQ node. It is possible to add own metrics to monitor
37+
* custom business logic.
38+
*
39+
* @return the {@link MetricRegistry} of the HiveMQ node this Enterprise Extension for Kafka is running on.
40+
* @since 4.5.0
41+
*/
42+
@NotNull MetricRegistry getMetricRegistry();
43+
44+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2020-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.extensions.kafka.api.transformers.kafkatomqtt;
18+
19+
import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
20+
import com.hivemq.extension.sdk.api.annotations.Immutable;
21+
import com.hivemq.extensions.kafka.api.transformers.TransformerInitInput;
22+
23+
/**
24+
* Provides context for the set up of a {@link KafkaToMqttTransformer}.
25+
*
26+
* @author Georg Held
27+
* @since 4.5.0
28+
*/
29+
@Immutable
30+
@DoNotImplement
31+
public interface KafkaToMqttInitInput extends TransformerInitInput {}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2020-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.extensions.kafka.api.transformers.kafkatomqtt;
18+
19+
import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
20+
import com.hivemq.extension.sdk.api.annotations.Immutable;
21+
import com.hivemq.extension.sdk.api.annotations.NotNull;
22+
import com.hivemq.extensions.kafka.api.model.KafkaCluster;
23+
import com.hivemq.extensions.kafka.api.model.KafkaRecord;
24+
25+
/**
26+
* The input parameter of the {@link KafkaToMqttTransformer}. It contains the information of the to be transformed
27+
* {@link KafkaRecord}.
28+
* <p>
29+
* The MqttToKafkaInput allows access to the {@link KafkaCluster}.
30+
*
31+
* @author Christoph Schäbel
32+
* @author Georg Held
33+
* @since 4.5.0
34+
*/
35+
@Immutable
36+
@DoNotImplement
37+
public interface KafkaToMqttInput {
38+
39+
/**
40+
* @return the {@link KafkaRecord} that triggered this transformer call.
41+
* @since 4.5.0
42+
*/
43+
@NotNull KafkaRecord getKafkaRecord();
44+
45+
/**
46+
* @return the {@link KafkaCluster} the transformer is associated with.
47+
* @since 4.5.0
48+
*/
49+
@NotNull KafkaCluster getKafkaCluster();
50+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2020-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.extensions.kafka.api.transformers.kafkatomqtt;
18+
19+
import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
20+
import com.hivemq.extension.sdk.api.annotations.NotNull;
21+
import com.hivemq.extension.sdk.api.services.builder.PublishBuilder;
22+
import com.hivemq.extension.sdk.api.services.publish.Publish;
23+
24+
import java.util.List;
25+
26+
/**
27+
* The output parameter of the {@link KafkaToMqttTransformer}. It allows access to the {@link PublishBuilder}.
28+
* <p>
29+
* After the {@link KafkaToMqttTransformer#transformKafkaToMqtt(KafkaToMqttInput, KafkaToMqttOutput)} method returns the
30+
* {@link Publish}es given to this output will be published by HiveMQ.
31+
*
32+
* @author Christoph Schäbel
33+
* @author Georg Held
34+
* @since 4.5.0
35+
*/
36+
@DoNotImplement
37+
public interface KafkaToMqttOutput {
38+
39+
/**
40+
* @return a new {@link PublishBuilder}.
41+
* @since 4.5.0
42+
*/
43+
@NotNull PublishBuilder newPublishBuilder();
44+
45+
/**
46+
* Sets the {@link Publish}es, that will be published by HiveMQ after the {@link
47+
* KafkaToMqttTransformer#transformKafkaToMqtt(KafkaToMqttInput, KafkaToMqttOutput)} call returns. The HiveMQ
48+
* Enterprise Extension for Kafka will publish these publishes in the order provided by the {@code publishes}
49+
* argument.
50+
* <p>
51+
* If desired, the same publish can occupy multiple places in the {@code publishes} list. When no publish shall be
52+
* published by HiveMQ for a {@link com.hivemq.extensions.kafka.api.model.KafkaRecord}, call this method with an
53+
* empty list.
54+
* <p>
55+
* Use the {@link #newPublishBuilder() PublishBuilder} to create new publishes as desired.
56+
* <p>
57+
* Each additional call of this method will overwrite the previous one.
58+
*
59+
* @param publishes a list of to be published {@link Publish}es.
60+
* @throws NullPointerException if {@code publishes} or any element of it is null.
61+
* @throws IllegalArgumentException if any element in {@code publishes} was not created via a {@link
62+
* PublishBuilder}.
63+
* @since 4.5.0
64+
*/
65+
void setPublishes(@NotNull List<@NotNull Publish> publishes);
66+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2020-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.extensions.kafka.api.transformers.kafkatomqtt;
18+
19+
import com.hivemq.extension.sdk.api.annotations.NotNull;
20+
import com.hivemq.extension.sdk.api.annotations.ThreadSafe;
21+
import com.hivemq.extensions.kafka.api.transformers.Transformer;
22+
23+
/**
24+
* Implement this transformer for the programmatic creation of {@link com.hivemq.extension.sdk.api.services.publish.Publish
25+
* Publishes} from {@link com.hivemq.extensions.kafka.api.model.KafkaRecord KafkaRecords}. One instance of the
26+
* implementing class is created per reference in the kafka-configuration.xml. The methods of this interface may be
27+
* called concurrently and must be thread-safe.
28+
* <p>
29+
* Your implementation of the KafkaToMqttTransformer must be placed in a Java archive (.jar) together with all its
30+
* dependencies in the {@code customizations} folder of the HiveMQ Enterprise Extension for Kafka. In addition a {@code
31+
* <kafka-to-mqtt-transformer>} referencing the implementing class via its canonical name must be configured in the
32+
* {@code kafka-extension.xml} file.
33+
*
34+
* @author Christoph Schäbel
35+
* @author Georg Held
36+
* @author Daniel Krüger
37+
* @since 4.5.0
38+
*/
39+
@FunctionalInterface
40+
public interface KafkaToMqttTransformer extends Transformer<KafkaToMqttInitInput> {
41+
42+
/**
43+
* This callback is executed for every {@link com.hivemq.extensions.kafka.api.model.KafkaRecord KafkaRecord} that is
44+
* polled by the HiveMQ Enterprise Extension for Kafka and matches the {@code <mqtt-to-kafka-transformer>} tag
45+
* configured in the {@code <mqtt-topic-filters>}. It allows the publication of any number of {@link
46+
* com.hivemq.extension.sdk.api.services.publish.Publish Publishes} via the {@link KafkaToMqttOutput} object. This
47+
* method is called by multiple threads concurrently. Extensions are responsible for their own exception handling
48+
* and this method must not throw any {@link Exception}.
49+
*
50+
* @param input the {@link KafkaToMqttInput} contains the triggering {@link com.hivemq.extensions.kafka.api.model.KafkaRecord
51+
* KafkaRecord} and the {@link com.hivemq.extensions.kafka.api.model.KafkaCluster KafkaCluster}
52+
* information.
53+
* @param output the {@link KafkaToMqttOutput} allows to {@link KafkaToMqttOutput#setPublishes(java.util.List)
54+
* provide a list of Publishes}. If no output is set, a empty List is used as default and the kafka
55+
* record will not be processed again, but ignored.
56+
* @since 4.5.0
57+
*/
58+
@ThreadSafe
59+
void transformKafkaToMqtt(@NotNull KafkaToMqttInput input, @NotNull KafkaToMqttOutput output);
60+
}

0 commit comments

Comments
 (0)