Skip to content

Commit 624f3f6

Browse files
committed
Added more methods.
1 parent 7a0f3ab commit 624f3f6

File tree

11 files changed

+68
-15
lines changed

11 files changed

+68
-15
lines changed

src/main/java/com/hivemq/extensions/kafka/api/builders/KafkaRecordBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.hivemq.extensions.kafka.api.model.KafkaRecord;
2020

2121
import java.nio.ByteBuffer;
22+
import java.nio.charset.Charset;
2223

2324
/**
2425
* @author Christoph Schäbel
@@ -33,16 +34,24 @@ public interface KafkaRecordBuilder {
3334

3435
@NotNull KafkaRecordBuilder header(@NotNull String key, @NotNull String value);
3536

37+
@NotNull KafkaRecordBuilder header(@NotNull String key, @NotNull String value, @NotNull Charset charset);
38+
3639
@NotNull KafkaRecordBuilder key(@NotNull ByteBuffer key);
3740

3841
@NotNull KafkaRecordBuilder key(@NotNull byte[] key);
3942

4043
@NotNull KafkaRecordBuilder key(@NotNull String key);
4144

45+
@NotNull KafkaRecordBuilder key(@NotNull String key, @NotNull Charset charset);
46+
4247
@NotNull KafkaRecordBuilder value(@NotNull ByteBuffer value);
4348

4449
@NotNull KafkaRecordBuilder value(@NotNull byte[] value);
4550

51+
@NotNull KafkaRecordBuilder value(@NotNull String value);
52+
53+
@NotNull KafkaRecordBuilder value(@NotNull String value, @NotNull Charset charset);
54+
4655
@NotNull KafkaRecordBuilder timestamp(long timestamp);
4756

4857
@NotNull KafkaRecordBuilder partition(int partition);

src/main/java/com/hivemq/extensions/kafka/api/model/KafkaCluster.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
*/
1616
package com.hivemq.extensions.kafka.api.model;
1717

18+
import com.hivemq.extension.sdk.api.annotations.NotNull;
19+
1820
/**
1921
* @author Christoph Schäbel
2022
*/
2123
public
2224
interface KafkaCluster {
2325

24-
String getId();
26+
@NotNull String getId();
2527

26-
String getBootstrapServers();
28+
@NotNull String getBootstrapServers();
2729

2830
}

src/main/java/com/hivemq/extensions/kafka/api/model/KafkaHeader.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,24 @@
1515
*/
1616
package com.hivemq.extensions.kafka.api.model;
1717

18+
import com.hivemq.extension.sdk.api.annotations.NotNull;
19+
1820
import java.nio.ByteBuffer;
21+
import java.nio.charset.Charset;
1922

2023
/**
2124
* @author Christoph Schäbel
2225
*/
2326
public interface KafkaHeader {
2427

25-
String getKey();
28+
@NotNull String getKey();
29+
30+
@NotNull ByteBuffer getValue();
2631

27-
ByteBuffer getValue();
32+
@NotNull String getValueAsString();
2833

29-
String getValueAsString();
34+
@NotNull String getValueAsString(@NotNull Charset charset);
3035

31-
byte[] getValueAsByteArray();
36+
@NotNull byte[] getValueAsByteArray();
3237

3338
}

src/main/java/com/hivemq/extensions/kafka/api/model/KafkaHeaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*/
2727
public interface KafkaHeaders {
2828

29-
List<KafkaHeader> asList();
29+
@NotNull List<KafkaHeader> asList();
3030

3131
/**
3232
* @param name The name of the user property to get.

src/main/java/com/hivemq/extensions/kafka/api/model/KafkaRecord.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public interface KafkaRecord {
3333

3434
@NotNull Optional<byte[]> getKeyAsByteArray();
3535

36-
@NotNull ByteBuffer getValue();
36+
@NotNull Optional<ByteBuffer> getValue();
3737

38-
@NotNull byte[] getValueAsByteArray();
38+
@NotNull Optional<byte[]> getValueAsByteArray();
3939

40-
@NotNull Long getTimestamp();
40+
@NotNull Optional<Long> getTimestamp();
4141

4242
@NotNull Optional<Integer> getPartition();
4343

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.hivemq.extensions.kafka.api.services;
2+
3+
import com.hivemq.extension.sdk.api.annotations.NotNull;
4+
5+
import java.util.List;
6+
7+
public interface KafkaTopicService {
8+
9+
@NotNull KafkaTopicState getKafkaTopicState(@NotNull String topic);
10+
11+
@NotNull List<@NotNull KafkaTopicState> getKafkaTopicStates(@NotNull List<@NotNull String> topics);
12+
13+
@NotNull KafkaTopicState createKafkaTopic(@NotNull String topic);
14+
15+
@NotNull List<@NotNull KafkaTopicState> createKafkaTopics(@NotNull List<@NotNull String> topics);
16+
17+
enum KafkaTopicState {
18+
FAILURE, EXISTS, CREATED, MISSING,
19+
}
20+
}

src/main/java/com/hivemq/extensions/kafka/api/transformers/Transformer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
/**
2121
* @author Christoph Schäbel
2222
*/
23-
public interface Transformer {
24-
default void init(@NotNull TransformerInitInput input) {
23+
public interface Transformer<I extends TransformerInitInput> {
24+
default void init(@NotNull I input) {
2525
}
2626
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.hivemq.extensions.kafka.api.transformers.mqtttokafka;
2+
3+
import com.hivemq.extension.sdk.api.annotations.NotNull;
4+
import com.hivemq.extensions.kafka.api.services.KafkaTopicService;
5+
import com.hivemq.extensions.kafka.api.transformers.TransformerInitInput;
6+
7+
/**
8+
* @author Georg Held
9+
*/
10+
public interface MqttToKafkaInitInput extends TransformerInitInput {
11+
12+
@NotNull KafkaTopicService getKafkaTopicService();
13+
}

src/main/java/com/hivemq/extensions/kafka/api/transformers/mqtttokafka/MqttToKafkaInput.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
import com.hivemq.extension.sdk.api.annotations.NotNull;
1919
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
2020
import com.hivemq.extensions.kafka.api.model.KafkaCluster;
21+
import com.hivemq.extensions.kafka.api.services.KafkaTopicService;
2122

2223
/**
2324
* @author Christoph Schäbel
2425
*/
2526
public interface MqttToKafkaInput {
2627

28+
@NotNull KafkaTopicService getKafkaTopicService();
29+
2730
@NotNull PublishPacket getPublishPacket();
2831

2932
@NotNull KafkaCluster getKafkaCluster();

src/main/java/com/hivemq/extensions/kafka/api/transformers/mqtttokafka/MqttToKafkaOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ public interface MqttToKafkaOutput {
2828

2929
@NotNull KafkaRecordBuilder newKafkaRecordBuilder();
3030

31-
void setKafkaRecords(@NotNull List<KafkaRecord> kafkaRecord);
31+
void setKafkaRecords(@NotNull List<@NotNull KafkaRecord> kafkaRecord);
3232
}

0 commit comments

Comments
 (0)