From 41adc4d0d5c7eaa1004c4f2646e1b904acd76a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Tue, 10 Jan 2023 13:02:56 +0100 Subject: [PATCH 01/13] Fix: Remove that EqualsAndHashCode of Message only uses the name, Change MessageHelper to not expose the TreeSet that is used for deduplication because a TreeSet breaks equals --- .../springwolf/asyncapi/MessageHelper.java | 16 +++++- .../channel/operation/message/Message.java | 3 +- .../asyncapi/MessageHelperTest.java | 56 ++++++++++++++++++- .../scanners/channels/ChannelMergerTest.java | 5 +- .../AsyncListenerAnnotationScannerTest.java | 4 +- .../AsyncPublisherAnnotationScannerTest.java | 4 +- .../ClassLevelKafkaListenerScannerTest.java | 2 + 7 files changed, 79 insertions(+), 11 deletions(-) diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java index 77f13230b..ee3017bad 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java @@ -4,7 +4,13 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; import lombok.extern.slf4j.Slf4j; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -13,6 +19,10 @@ public class MessageHelper { private static final String ONE_OF = "oneOf"; private static final Comparator byMessageName = Comparator.comparing(Message::getName); + + // TODO Why do we need a SortedSet here? Using a comparator with only the message name will break deep equals on the Set + // Unfortunately there are Tests relying on deep equals + // see https://docs.oracle.com/javase/7/docs/api/java/util/TreeSet.html private static final Supplier> messageSupplier = () -> new TreeSet<>(byMessageName); public static Object toMessageObjectOrComposition(Set messages) { @@ -22,14 +32,14 @@ public static Object toMessageObjectOrComposition(Set messages) { case 1: return messages.toArray()[0]; default: - return ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier))); + return ImmutableMap.of(ONE_OF, new HashSet<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))); } } @SuppressWarnings("unchecked") public static Set messageObjectToSet(Object messageObject) { if (messageObject instanceof Message) { - return new HashSet<>(Arrays.asList((Message) messageObject)); + return new HashSet<>(Collections.singletonList((Message) messageObject)); } if (messageObject instanceof Map) { diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java index 930ec1add..89fe72461 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java @@ -10,7 +10,8 @@ */ @Data @Builder -@EqualsAndHashCode(of = {"name"}) +// TODO Why ignore other fields? +//@EqualsAndHashCode(of = {"name"}) @NoArgsConstructor @AllArgsConstructor public class Message { diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java index 79ac061e3..c23a40232 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java @@ -53,6 +53,60 @@ public void toMessageObjectOrComposition_multipleMessages() { .isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2))); } + @Test + public void toMessageObjectOrComposition_multipleMessages_remove_duplicates() { + Message message1 = Message.builder() + .name("foo") + .description("This is message 1") + .build(); + + Message message2 = Message.builder() + .name("bar") + .description("This is message 2") + .build(); + + Message message3 = Message.builder() + .name("bar") + .description("This is message 3, but in essence the same payload type as message 2") + .build(); + + Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message1, message2, message3)); + + // Message3 is not included as it is identical in terms of payload type (Message#name) to message 2 + assertThat(asObject) + .isInstanceOf(Map.class) + .isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2))); + } + + @Test + public void toMessageObjectOrComposition_multipleMessages_should_not_break_deep_equals() { + Message actualMessage1 = Message.builder() + .name("foo") + .description("This is actual message 1") + .build(); + + Message actualMessage2 = Message.builder() + .name("bar") + .description("This is actual message 2") + .build(); + + Object actualObject = toMessageObjectOrComposition(ImmutableSet.of(actualMessage1, actualMessage2)); + + Message expectedMessage1 = Message.builder() + .name("foo") + .description("This is expected message 1") + .build(); + + Message expectedMessage2 = Message.builder() + .name("bar") + .description("This is expected message 2") + .build(); + + Object expectedObject = toMessageObjectOrComposition(ImmutableSet.of(expectedMessage1, expectedMessage2)); + + assertThat(actualObject).isNotEqualTo(expectedObject); + } + @Test public void messageObjectToSet_notAMessageOrAMap() { Object string = "foo"; @@ -94,4 +148,4 @@ public void messageObjectToSet_SetOfMessage() { .containsExactlyInAnyOrder(message1, message2); } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java index b812b85ad..77630f381 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java @@ -110,7 +110,8 @@ public void shouldMergeDifferentMessageForSameOperation() { // then expectedMessage only includes message1 and message2. // Message3 is not included as it is identical in terms of payload type (Message#name) to message 2 - Object expectedMessages = MessageHelper.toMessageObjectOrComposition(Sets.newHashSet(message1, message2)); + // TODO Is it really expected that the first occurrence is used? (no test in Message Helper...) + Object expectedMessages = MessageHelper.toMessageObjectOrComposition(Sets.newHashSet(message1, message3)); assertThat(mergedChannels).hasSize(1) .hasEntrySatisfying(channelName, it -> { assertThat(it.getPublish()).isEqualTo(Operation.builder().operationId("publisher1").message(expectedMessages).build()); @@ -141,4 +142,4 @@ public void shouldUseOtherMessageIfFirstMessageIsMissing() { assertThat(it.getSubscribe()).isNull(); }); } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java index f123a663c..e6f0a5983 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java @@ -66,7 +66,7 @@ public void scan_componentHasListenerMethod() { Message message = Message.builder() .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) - .description(null) + .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) .build(); @@ -164,4 +164,4 @@ private static class SimpleFoo { private String s; private boolean b; } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java index 2c1b6b23f..21be92eec 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java @@ -66,7 +66,7 @@ public void scan_componentHasPublisherMethod() { Message message = Message.builder() .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) - .description(null) + .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) .build(); @@ -164,4 +164,4 @@ private static class SimpleFoo { private String s; private boolean b; } -} \ No newline at end of file +} diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java index badcbc16e..1222e595d 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java @@ -76,12 +76,14 @@ public void scan_componentWithMultipleKafkaListenersAndHandlers() { .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) + .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) .build(); Message barMessage = Message.builder() .name(SimpleBar.class.getName()) .title(SimpleBar.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName())) + .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleBar.class.getSimpleName())) .build(); Operation operation = Operation.builder() From 1afcf2e3c1a4c5dca979798fac5c7262011bbe2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Tue, 10 Jan 2023 16:07:51 +0100 Subject: [PATCH 02/13] Refactor code to merge messages in ChannelMerger to respect the priority without relying on equals based on message name --- .../springwolf/asyncapi/MessageHelper.java | 8 +++--- .../scanners/channels/ChannelMerger.java | 26 +++++++++++++++---- .../asyncapi/MessageHelperTest.java | 5 ++-- .../scanners/channels/ChannelMergerTest.java | 4 +-- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java index ee3017bad..e48bd3ce0 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java @@ -4,10 +4,11 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; import lombok.extern.slf4j.Slf4j; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -32,7 +33,7 @@ public static Object toMessageObjectOrComposition(Set messages) { case 1: return messages.toArray()[0]; default: - return ImmutableMap.of(ONE_OF, new HashSet<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))); + return ImmutableMap.of(ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))); } } @@ -43,12 +44,11 @@ public static Set messageObjectToSet(Object messageObject) { } if (messageObject instanceof Map) { - Set messages = ((Map>) messageObject).get(ONE_OF); + List messages = ((Map>) messageObject).get(ONE_OF); return new HashSet<>(messages); } log.warn("Message object must contain either a Message or a Map, but contained: {}", messageObject.getClass()); return new HashSet<>(); } - } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java index 32fedb10c..03ddfc47f 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java @@ -5,7 +5,15 @@ import io.github.stavshamir.springwolf.asyncapi.MessageHelper; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Collectors; import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition; @@ -42,17 +50,25 @@ public static Map merge(List } private static Operation mergeOperation(Operation operation, Operation otherOperation) { - Set mergedMessages = getMessages(operation); - Set currentEntryMessages = getMessages(otherOperation); - mergedMessages.addAll(currentEntryMessages); - Operation mergedOperation = operation != null ? operation : otherOperation; + + Set mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation)); if (!mergedMessages.isEmpty()) { mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages)); } return mergedOperation; } + private static Set mergeMessages(Set messages, Set otherMessages) { + Map nameToMessage = messages.stream().collect(Collectors.toMap(Message::getName, Function.identity())); + + for (Message otherMessage : otherMessages) { + nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage); + } + + return new HashSet<>(nameToMessage.values()); + } + private static Set getMessages(Operation operation) { return Optional .ofNullable(operation) diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java index c23a40232..593cde01c 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; @@ -50,7 +51,7 @@ public void toMessageObjectOrComposition_multipleMessages() { assertThat(asObject) .isInstanceOf(Map.class) - .isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2))); + .isEqualTo(ImmutableMap.of("oneOf", ImmutableList.of(message2, message1))); } @Test @@ -75,7 +76,7 @@ public void toMessageObjectOrComposition_multipleMessages_remove_duplicates() { // Message3 is not included as it is identical in terms of payload type (Message#name) to message 2 assertThat(asObject) .isInstanceOf(Map.class) - .isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2))); + .isEqualTo(ImmutableMap.of("oneOf", ImmutableList.of(message2, message1))); } @Test diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java index 77630f381..1d1410d82 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java @@ -16,7 +16,6 @@ public class ChannelMergerTest { - @Test public void shouldNotMergeDifferentChannelNames() { // given @@ -110,8 +109,7 @@ public void shouldMergeDifferentMessageForSameOperation() { // then expectedMessage only includes message1 and message2. // Message3 is not included as it is identical in terms of payload type (Message#name) to message 2 - // TODO Is it really expected that the first occurrence is used? (no test in Message Helper...) - Object expectedMessages = MessageHelper.toMessageObjectOrComposition(Sets.newHashSet(message1, message3)); + Object expectedMessages = MessageHelper.toMessageObjectOrComposition(Sets.newHashSet(message1, message2)); assertThat(mergedChannels).hasSize(1) .hasEntrySatisfying(channelName, it -> { assertThat(it.getPublish()).isEqualTo(Operation.builder().operationId("publisher1").message(expectedMessages).build()); From fe5a9b62513f3c06b78621fc8dec5c4734ccdf69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Tue, 10 Jan 2023 17:56:53 +0100 Subject: [PATCH 03/13] Add extension point to supply protocol specific message bindings in plugins --- .../AbstractOperationDataScanner.java | 1 + .../operationdata/ProcessedMessageBinding.java | 10 ++++++++++ .../annotation/AsyncAnnotationScannerUtil.java | 12 +++++++++++- .../AsyncListenerAnnotationScanner.java | 7 ++++++- .../AsyncPublisherAnnotationScanner.java | 7 ++++++- .../annotation/MessageBindingProcessor.java | 18 ++++++++++++++++++ .../asyncapi/types/ConsumerData.java | 10 ++++++++++ .../asyncapi/types/OperationData.java | 7 +++++++ .../asyncapi/types/ProducerData.java | 11 +++++++++++ .../channel/operation/message/Message.java | 5 +++++ .../DefaultAsyncApiSerializerServiceTest.java | 5 ++++- .../asyncapi/DefaultAsyncApiServiceTest.java | 8 +++++++- .../ConsumerOperationDataScannerTest.java | 7 +++++++ .../ProducerOperationDataScannerTest.java | 10 ++++++++-- .../AsyncListenerAnnotationScannerTest.java | 2 ++ .../AsyncPublisherAnnotationScannerTest.java | 2 ++ .../src/test/resources/asyncapi/asyncapi.json | 14 +++++++++++++- 17 files changed, 128 insertions(+), 8 deletions(-) create mode 100644 springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java create mode 100644 springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java index d3aac185e..da5981131 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java @@ -99,6 +99,7 @@ private Message buildMessage(OperationData operationData) { .description(operationData.getDescription()) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(operationData.getMessageBinding()) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java new file mode 100644 index 000000000..2414b9c20 --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java @@ -0,0 +1,10 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; + +import com.asyncapi.v2.binding.MessageBinding; +import lombok.Data; + +@Data +public class ProcessedMessageBinding { + private final String type; + private final MessageBinding binding; +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java index 4e9473f05..6160a6a58 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java @@ -1,6 +1,8 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedOperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -59,11 +61,19 @@ private static String getDescription(List value) .orElse(null); } - public static Map processBindingFromAnnotation(Method method, List operationBindingProcessors) { + public static Map processOperationBindingFromAnnotation(Method method, List operationBindingProcessors) { return operationBindingProcessors.stream() .map(operationBindingProcessor -> operationBindingProcessor.process(method)) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toMap(ProcessedOperationBinding::getType, ProcessedOperationBinding::getBinding)); } + + public static Map processMessageBindingFromAnnotation(Method method, List messageBindingProcessors) { + return messageBindingProcessors.stream() + .map(messageBindingProcessor -> messageBindingProcessor.process(method)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding)); + } } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java index 3e7b94a70..38548c812 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor; @@ -32,6 +33,8 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner private final List operationBindingProcessors; + private final List messageBindingProcessors; + @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { this.resolver = resolver; @@ -63,7 +66,8 @@ private Set getAnnotatedMethods(Class type) { private OperationData mapMethodToOperationData(Method method) { log.debug("Mapping method \"{}\" to channels", method.getName()); - Map operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors); + Map operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors); + Map messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors); Class annotationClass = AsyncListener.class; AsyncListener annotation = Optional.of(method.getAnnotation(annotationClass)) @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) { .headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op)) .payloadType(payloadType) .operationBinding(operationBindings) + .messageBinding(messageBindings) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java index cc6c2f358..1084bf8a3 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor; @@ -32,6 +33,8 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne private final List operationBindingProcessors; + private final List messageBindingProcessors; + @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { this.resolver = resolver; @@ -63,7 +66,8 @@ private Set getAnnotatedMethods(Class type) { private OperationData mapMethodToOperationData(Method method) { log.debug("Mapping method \"{}\" to channels", method.getName()); - Map operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors); + Map operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors); + Map messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors); Class annotationClass = AsyncPublisher.class; AsyncPublisher annotation = Optional.of(method.getAnnotation(annotationClass)) @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) { .headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op)) .payloadType(payloadType) .operationBinding(operationBindings) + .messageBinding(messageBindings) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java new file mode 100644 index 000000000..c8f11b1ae --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java @@ -0,0 +1,18 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; + +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding; + +import java.lang.reflect.Method; +import java.util.Optional; + +public interface MessageBindingProcessor { + + /** + * Process the methods annotated with {@link AsyncPublisher} and {@link AsyncSubscriber} + * for protocol specific messageBinding annotations, method parameters, etc + * + * @param method The method being annotated + * @return A message binding, if found + */ + Optional process(Method method); +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java index 5f26420ca..7a0b8006e 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; import lombok.AllArgsConstructor; @@ -61,4 +62,13 @@ public class ConsumerData implements OperationData { */ protected Map operationBinding; + /** + * The message binding of the consumer. + *
+ * For example: + * + * ImmutableMap.of("kafka", new KafkaMessageBinding()) + * + */ + protected Map messageBinding; } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java index 98a18ca7f..25c71c7bd 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -40,10 +41,16 @@ public interface OperationData { */ Map getOperationBinding(); + /** + * The message binding. + */ + Map getMessageBinding(); + enum OperationType { PUBLISH("publish"), SUBSCRIBE("subscribe"); public final String operationName; + OperationType(String operationName) { this.operationName = operationName; } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java index 8b3836fb5..780870479 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; import lombok.AllArgsConstructor; @@ -61,4 +62,14 @@ public class ProducerData implements OperationData { */ protected Map operationBinding; + /** + * The message binding of the producer. + *
+ * For example: + * + * ImmutableMap.of("kafka", new KafkaMessageBinding()) + * + */ + protected Map messageBinding; + } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java index 89fe72461..27b8054f6 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java @@ -1,8 +1,11 @@ package io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message; +import com.asyncapi.v2.binding.MessageBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference; import lombok.*; +import java.util.Map; + /** * Describes a message received on a given channel and operation. * @@ -34,4 +37,6 @@ public class Message { private PayloadReference payload; private HeaderReference headers; + + private Map bindings; } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java index eab9f0f18..8db3774d9 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi; import com.asyncapi.v2.binding.OperationBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -15,6 +16,7 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference; import io.swagger.v3.core.converter.ModelConverters; import io.swagger.v3.oas.models.media.Schema; +import io.swagger.v3.oas.models.media.StringSchema; import lombok.Data; import org.apache.commons.io.IOUtils; import org.json.JSONException; @@ -67,6 +69,7 @@ public void AsyncAPI_should_map_to_a_valid_asyncapi_json() throws IOException, J .name("io.github.stavshamir.springwolf.ExamplePayload") .title("Example Payload") .payload(PayloadReference.fromModelName("ExamplePayload")) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding(new StringSchema(), "binding-version-1"))) .build(); OperationBinding operationBinding = KafkaOperationBinding.builder().groupId("myGroupId").build(); @@ -105,4 +108,4 @@ static class ExamplePayload { String s; } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java index e7d964b7a..74cd6adef 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.info.Info; @@ -53,13 +54,16 @@ public AsyncApiDocket docket() { .description("producer-topic-description") .payloadType(String.class) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); ConsumerData kafkaConsumerData = ConsumerData.builder() .channelName("consumer-topic") .description("consumer-topic-description") .payloadType(String.class) - .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())).build(); + .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) + .build(); return AsyncApiDocket.builder() .info(info) @@ -106,6 +110,7 @@ public void getAsyncAPI_producers_should_be_correct() { assertThat(channel.getSubscribe()).isNotNull(); final Message message = (Message) channel.getSubscribe().getMessage(); assertThat(message.getDescription()).isEqualTo("producer-topic-description"); + assertThat(message.getBindings()).isEqualTo(ImmutableMap.of("kafka", new KafkaMessageBinding())); } @Test @@ -120,6 +125,7 @@ public void getAsyncAPI_consumers_should_be_correct() { assertThat(channel.getPublish()).isNotNull(); final Message message = (Message) channel.getPublish().getMessage(); assertThat(message.getDescription()).isEqualTo("consumer-topic-description"); + assertThat(message.getBindings()).isEqualTo(ImmutableMap.of("kafka", new KafkaMessageBinding())); } } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java index 95fe3d926..76e734152 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -51,6 +52,7 @@ public void allFieldsConsumerData() { .description(description) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -73,6 +75,7 @@ public void allFieldsConsumerData() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build()) .build(); @@ -114,6 +117,7 @@ public void multipleConsumersForSameTopic() { .description(description1) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -122,6 +126,7 @@ public void multipleConsumersForSameTopic() { .description(description2) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(AnotherExamplePayloadDto.class) .headers(AsyncHeaders.NOT_USED) .build(); @@ -143,6 +148,7 @@ public void multipleConsumersForSameTopic() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(), Message.builder() .name(AnotherExamplePayloadDto.class.getName()) @@ -150,6 +156,7 @@ public void multipleConsumersForSameTopic() { .title(AnotherExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_USED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build() ); diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java index 00f5a8333..419376311 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -51,6 +52,7 @@ public void allFieldsProducerData() { .description(description) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -73,6 +75,7 @@ public void allFieldsProducerData() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build()) .build(); @@ -114,6 +117,7 @@ public void multipleProducersForSameTopic() { .description(description1) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -122,6 +126,7 @@ public void multipleProducersForSameTopic() { .description(description2) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(AnotherExamplePayloadDto.class) .headers(AsyncHeaders.NOT_USED) .build(); @@ -143,6 +148,7 @@ public void multipleProducersForSameTopic() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(), Message.builder() .name(AnotherExamplePayloadDto.class.getName()) @@ -150,6 +156,7 @@ public void multipleProducersForSameTopic() { .title(AnotherExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_USED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build() ); @@ -165,8 +172,7 @@ public void multipleProducersForSameTopic() { .subscribe(operation) .build(); - assertThat(producerChannels.get(channelName)) - .isEqualTo(expectedChannel); + assertThat(producerChannels.get(channelName)).isEqualTo(expectedChannel); } private void mockProducers(Collection producers) { diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java index e6f0a5983..d35b84d8e 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java @@ -69,6 +69,7 @@ public void scan_componentHasListenerMethod() { .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -102,6 +103,7 @@ public void scan_componentHasListenerMethodWithAllAttributes() { .description("description") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("TestSchema")) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java index 21be92eec..10776cf66 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java @@ -69,6 +69,7 @@ public void scan_componentHasPublisherMethod() { .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -102,6 +103,7 @@ public void scan_componentHasPublisherMethodWithAllAttributes() { .description("description") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("TestSchema")) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() diff --git a/springwolf-core/src/test/resources/asyncapi/asyncapi.json b/springwolf-core/src/test/resources/asyncapi/asyncapi.json index 082ed7d65..8e3dafd4b 100644 --- a/springwolf-core/src/test/resources/asyncapi/asyncapi.json +++ b/springwolf-core/src/test/resources/asyncapi/asyncapi.json @@ -43,6 +43,18 @@ "title" : "Example Payload", "payload" : { "$ref" : "#/components/schemas/ExamplePayload" + }, + "bindings" : { + "kafka": { + "key": { + "type": "string", + "exampleSetFlag": false, + "types": [ + "string" + ] + }, + "bindingVersion": "binding-version-1" + } } } } @@ -61,4 +73,4 @@ } }, "tags" : [ ] -} \ No newline at end of file +} From df6b997a42a10c83dd0842d95bdabcb1370a17cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Tue, 10 Jan 2023 18:00:52 +0100 Subject: [PATCH 04/13] Add KafkaMessageBindingProcessor and KafkaAsyncMessageBinding annotation as part of KafkaAsyncOperationBinding to support documentation of kafka keys as message binding --- .../KafkaMessageBindingProcessor.java | 62 +++++++++++++++++++ .../KafkaAsyncOperationBinding.java | 17 +++++ 2 files changed, 79 insertions(+) create mode 100644 springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java new file mode 100644 index 000000000..3ea75a12d --- /dev/null +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java @@ -0,0 +1,62 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; + +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.MessageBindingProcessor; +import io.swagger.v3.oas.models.media.Schema; +import io.swagger.v3.oas.models.media.StringSchema; +import org.springframework.context.EmbeddedValueResolverAware; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.springframework.util.StringValueResolver; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Optional; + +@Component +public class KafkaMessageBindingProcessor implements MessageBindingProcessor, EmbeddedValueResolverAware { + private StringValueResolver resolver; + + @Override + public void setEmbeddedValueResolver(StringValueResolver resolver) { + this.resolver = resolver; + } + + @Override + public Optional process(Method method) { + return Arrays.stream(method.getAnnotations()) + .filter(annotation -> annotation instanceof KafkaAsyncOperationBinding) + .map(annotation -> (KafkaAsyncOperationBinding) annotation) + .findAny() + .map(this::mapToMessageBinding); + } + + private ProcessedMessageBinding mapToMessageBinding(KafkaAsyncOperationBinding bindingAnnotation) { + KafkaAsyncMessageBinding messageBinding = bindingAnnotation.messageBinding(); + KafkaMessageBinding kafkaMessageBinding = KafkaMessageBinding.builder() + .bindingVersion(resolveOrNull(messageBinding.bindingVersion())) + .key(resolveSchemaOrNull(messageBinding)) + .build(); + + return new ProcessedMessageBinding(bindingAnnotation.type(), kafkaMessageBinding); + } + + private String resolveOrNull(String stringValue) { + return StringUtils.isEmpty(stringValue) ? null : resolver.resolveStringValue(stringValue); + } + + private Schema resolveSchemaOrNull(KafkaAsyncMessageBinding messageBinding) { + Schema schemaDefinition = null; + switch (messageBinding.keyType()) { + case NO_KEY: + break; + case STRING_KEY: + schemaDefinition = new StringSchema() + .description(resolveOrNull(messageBinding.description())); + } + + return schemaDefinition; + } +} diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java index 699e5baba..950f18fa9 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java @@ -20,4 +20,21 @@ String clientId() default ""; String bindingVersion() default ""; + KafkaAsyncMessageBinding messageBinding() default @KafkaAsyncMessageBinding(); + + @Retention(RetentionPolicy.CLASS) + @Target({}) + @interface KafkaAsyncMessageBinding { + + KafkaKeyTypes keyType() default KafkaKeyTypes.NO_KEY; + + String description() default ""; + + String bindingVersion() default ""; + + enum KafkaKeyTypes { + NO_KEY, + STRING_KEY + } + } } From f758433153fa41639fff0ffef9b9cf62288db44a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Tue, 10 Jan 2023 18:03:17 +0100 Subject: [PATCH 05/13] Extend springwolf-kafka-example to include documentation of kafka keys as message binding by using annotations --- .../consumers/ExampleClassLevelKafkaListener.java | 9 ++++++++- .../example/producers/ExampleProducer.java | 9 ++++++++- .../src/test/resources/asyncapi.json | 13 ++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java index 1274f9f0a..6ec50c890 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java @@ -3,6 +3,7 @@ import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListener; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto; import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto; import lombok.extern.slf4j.Slf4j; @@ -12,6 +13,7 @@ import javax.money.MonetaryAmount; +import static io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding.KafkaKeyTypes.STRING_KEY; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; @@ -48,7 +50,12 @@ public void receiveAnotherPayload(AnotherPayloadDto payload) { @KafkaAsyncOperationBinding( bindingVersion = "1", clientId = "foo-clientId", - groupId = "#{'foo-groupId'}" + groupId = "#{'foo-groupId'}", + messageBinding = @KafkaAsyncMessageBinding( + keyType = STRING_KEY, + bindingVersion = "1", + description = "Kafka Consumer Message Key" + ) ) public void receiveMonetaryAmount(MonetaryAmount payload) { log.info("Received new message in multi-payload-topic: {}", payload.toString()); diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java index b872dacc0..667e970d2 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java @@ -3,11 +3,13 @@ import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisher; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import static io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding.KafkaKeyTypes.STRING_KEY; import static io.github.stavshamir.springwolf.example.configuration.KafkaConfiguration.PRODUCER_TOPIC; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; @@ -38,7 +40,12 @@ public class ExampleProducer { )) @KafkaAsyncOperationBinding( bindingVersion = "1", - clientId = "foo-clientId" + clientId = "foo-clientId", + messageBinding = @KafkaAsyncMessageBinding( + keyType = STRING_KEY, + bindingVersion = "1", + description = "Kafka Producer Message Key" + ) ) public void sendMessage(ExamplePayloadDto msg) { kafkaTemplate.send(PRODUCER_TOPIC, msg); diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json index 3355b72f0..116a7873a 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json @@ -167,6 +167,17 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-MonetaryAmount" + }, + "bindings" : { + "kafka" : { + "key" : { + "type" : "string", + "description" : "Kafka Consumer Message Key", + "exampleSetFlag" : false, + "types" : [ "string" ] + }, + "bindingVersion" : "1" + } } } ] } @@ -414,4 +425,4 @@ } }, "tags" : [ ] -} \ No newline at end of file +} From acb923e1b579b4854d0b713ac3364b16c89af8a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Wed, 11 Jan 2023 17:22:26 +0100 Subject: [PATCH 06/13] Extend abstract class and method level listener scanners to include an empty message binding so that there is always a message binding --- .../springwolf/asyncapi/MessageHelper.java | 3 --- .../AbstractClassLevelListenerScanner.java | 20 ++++++++++++++++--- .../AbstractMethodLevelListenerScanner.java | 18 +++++++++++++++-- .../channel/operation/message/Message.java | 2 -- .../TestMethodLevelListenerScanner.java | 10 ++++++++++ .../TestMethodLevelListenerScannerTest.java | 1 + 6 files changed, 44 insertions(+), 10 deletions(-) diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java index e48bd3ce0..9599f565a 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java @@ -21,9 +21,6 @@ public class MessageHelper { private static final Comparator byMessageName = Comparator.comparing(Message::getName); - // TODO Why do we need a SortedSet here? Using a comparator with only the message name will break deep equals on the Set - // Unfortunately there are Tests relying on deep equals - // see https://docs.oracle.com/javase/7/docs/api/java/util/TreeSet.html private static final Supplier> messageSupplier = () -> new TreeSet<>(byMessageName); public static Object toMessageObjectOrComposition(Set messages) { diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java index d62625d4c..38f314357 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -18,7 +19,13 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,12 +73,18 @@ public abstract class AbstractClassLevelListenerScanner buildChannelBinding(ClassAnnotation annotation); /** - * Can be overriden by implementations + * @param method The specific method. Can be used to extract message binding from an annotation. + * @return A map containing a message binding pointed to by the protocol binding name. + */ + protected abstract Map buildMessageBinding(Method method); + + /** + * Can be overridden by implementations * * @param method The specific method. Can be used to extract the payload type * @return The AsyncHeaders @@ -159,6 +172,7 @@ private Message buildMessage(Method method) { .title(modelName) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(buildMessageBinding(method)) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java index e323b8c91..e407d4fad 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -17,7 +18,11 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; @@ -62,6 +67,12 @@ public Map scan() { */ protected abstract Map buildOperationBinding(T annotation); + /** + * @param annotation An instance of a listener annotation. + * @return A map containing a message binding pointed to by the protocol binding name. + */ + protected abstract Map buildMessageBinding(T annotation); + /** * @param method The listener method. * @return The class object of the payload received by the listener. @@ -88,9 +99,10 @@ private Map.Entry mapMethodToChannel(Method method) { Map channelBinding = buildChannelBinding(annotation); Map operationBinding = buildOperationBinding(annotation); + Map messageBinding = buildMessageBinding(annotation); Class payload = getPayloadType(method); String operationId = channelName + "_publish_" + method.getName(); - ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, operationId); + ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, messageBinding, operationId); return Maps.immutableEntry(channelName, channel); } @@ -98,6 +110,7 @@ private Map.Entry mapMethodToChannel(Method method) { private ChannelItem buildChannel(Map channelBinding, Class payloadType, Map operationBinding, + Map messageBinding, String operationId) { String modelName = schemasService.register(payloadType); String headerModelName = schemasService.register(AsyncHeaders.NOT_DOCUMENTED); @@ -107,6 +120,7 @@ private ChannelItem buildChannel(Map channelBi .title(modelName) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(messageBinding) .build(); Operation operation = Operation.builder() diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java index 27b8054f6..73f632420 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java @@ -13,8 +13,6 @@ */ @Data @Builder -// TODO Why ignore other fields? -//@EqualsAndHashCode(of = {"name"}) @NoArgsConstructor @AllArgsConstructor public class Message { diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java index 2c8e7c83b..f1ae979bd 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.google.common.collect.ImmutableMap; import lombok.EqualsAndHashCode; @@ -30,6 +31,11 @@ protected String getChannelName(TestMethodLevelListenerScannerTest.TestChannelLi return ImmutableMap.of("test-operation-binding", new TestOperationBinding()); } + @Override + protected Map buildMessageBinding(TestMethodLevelListenerScannerTest.TestChannelListener annotation) { + return ImmutableMap.of("test-message-binding", new TestMessageBinding()); + } + @Override protected Class getPayloadType(Method method) { Class[] parameterTypes = method.getParameterTypes(); @@ -49,4 +55,8 @@ public static class TestChannelBinding extends ChannelBinding { public static class TestOperationBinding extends OperationBinding { } + @EqualsAndHashCode(callSuper = true) + public static class TestMessageBinding extends MessageBinding { + } + } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java index d6c041a03..e1692fab7 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java @@ -72,6 +72,7 @@ public void scan_componentHasListenerMethod() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("test-message-binding", new TestMethodLevelListenerScanner.TestMessageBinding())) .build(); Operation operation = Operation.builder() From 1dde1d90aaa3e9400fcb02a35fdb9b4d17459d70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Wed, 11 Jan 2023 17:27:26 +0100 Subject: [PATCH 07/13] Extend springwolf-kafka-plugin to implement the new methods in abstract class and method level listener scanners to include an empty message binding so that there is always a message binding --- .../annotation/ClassLevelKafkaListenerScanner.java | 8 ++++++++ .../scanners/channels/annotation/KafkaListenerUtil.java | 6 ++++++ .../annotation/MethodLevelKafkaListenerScanner.java | 8 ++++++++ .../annotation/ClassLevelKafkaListenerScannerTest.java | 7 +++++++ .../annotation/MethodLevelKafkaListenerScannerTest.java | 5 +++++ 5 files changed, 34 insertions(+) diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java index af641a220..85d98c423 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner; @@ -59,6 +60,13 @@ protected String getChannelName(KafkaListener annotation) { return KafkaListenerUtil.buildChannelBinding(); } + @Override + protected Map buildMessageBinding(Method method) { + // Currently there is no interesting data in the KafkaListener annotation, but we keep it for the sake of + // consistency in the code and in the serialized specification (always have at least an empty binding for kafka) + return KafkaListenerUtil.buildMessageBinding(); + } + @Override protected AsyncHeaders buildHeaders(Method method) { Class payloadType = getPayloadType(method); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java index 2349322fb..2a5120cf3 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java @@ -1,8 +1,10 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import lombok.extern.slf4j.Slf4j; @@ -44,4 +46,8 @@ public static String getChannelName(KafkaListener annotation, StringValueResolve binding.setGroupId(groupId); return ImmutableMap.of("kafka", binding); } + + public static Map buildMessageBinding() { + return ImmutableMap.of("kafka", new KafkaMessageBinding()); + } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java index 76cd29386..338c0c1f8 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner; @@ -49,6 +50,13 @@ protected String getChannelName(KafkaListener annotation) { return KafkaListenerUtil.buildOperationBinding(annotation, resolver); } + @Override + protected Map buildMessageBinding(KafkaListener annotation) { + // Currently there is no interesting data in the KafkaListener annotation, but we keep it for the sake of + // consistency in the code and in the serialized specification (always have at least an empty binding for kafka) + return KafkaListenerUtil.buildMessageBinding(); + } + @Override protected Class getPayloadType(Method method) { return SpringPayloadAnnotationTypeExtractor.getPayloadType(method); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java index 1222e595d..1edd8e5f3 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -77,6 +78,7 @@ public void scan_componentWithMultipleKafkaListenersAndHandlers() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Message barMessage = Message.builder() @@ -84,6 +86,7 @@ public void scan_componentWithMultipleKafkaListenersAndHandlers() { .title(SimpleBar.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleBar.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -139,6 +142,7 @@ public void scan_componentWithSingleKafkaHandlerMethod() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -171,6 +175,7 @@ public void scan_componentWithMultipleKafkaHandlerMethods() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Message barMessage = Message.builder() @@ -178,6 +183,7 @@ public void scan_componentWithMultipleKafkaHandlerMethods() { .title(SimpleBar.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleBar.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -211,6 +217,7 @@ public void scan_componentWithSingleKafkaHandlerMethod_batchPayload() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java index b2cff4b05..9a312c347 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java @@ -2,6 +2,7 @@ import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -80,6 +81,7 @@ public void scan_componentHasKafkaListenerMethods_hardCodedTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -112,6 +114,7 @@ public void scan_componentHasKafkaListenerMethods_embeddedValueTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -179,6 +182,7 @@ public void scan_componentHasKafkaListenerMethods_multipleParamsWithPayloadAnnot .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -211,6 +215,7 @@ public void scan_componentHasKafkaListenerMethods_batchPayload() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() From 1df13390f9351ae4eecadc93bde9ecff051d25f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Wed, 11 Jan 2023 17:28:02 +0100 Subject: [PATCH 08/13] Extend springwolf-amqp-plugin to implement the new methods in abstract class and method level listener scanners to include an empty message binding so that there is always a message binding --- .../annotation/MethodLevelRabbitListenerScanner.java | 8 ++++++++ .../annotation/MethodLevelRabbitListenerScannerTest.java | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java index c7d15c852..04aa54976 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java @@ -1,8 +1,10 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.amqp.AMQPChannelBinding; +import com.asyncapi.v2.binding.amqp.AMQPMessageBinding; import com.asyncapi.v2.binding.amqp.AMQPOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; @@ -107,6 +109,12 @@ private String getExchangeName(RabbitListener annotation) { return ImmutableMap.of("amqp", AMQPOperationBinding.builder().cc(getRoutingKeys(annotation)).build()); } + @Override + protected Map buildMessageBinding(RabbitListener annotation) { + // currently the feature to define amqp message binding is not implemented. + return ImmutableMap.of("amqp", new AMQPMessageBinding()); + } + private List getRoutingKeys(RabbitListener annotation) { /* The routing key is taken from the binding. As the key field in the @QueueBinding can be an empty array, diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java index 9797cc8ab..24277785e 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java @@ -2,6 +2,7 @@ import com.asyncapi.v2.binding.ChannelBinding; import com.asyncapi.v2.binding.amqp.AMQPChannelBinding; +import com.asyncapi.v2.binding.amqp.AMQPMessageBinding; import com.asyncapi.v2.binding.amqp.AMQPOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -99,6 +100,7 @@ public void scan_componentHasRabbitListenerMethods_hardCodedTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -140,6 +142,7 @@ public void scan_componentHasRabbitListenerMethods_embeddedValueTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -178,6 +181,7 @@ public void scan_componentHasRabbitListenerMethods_bindingsAnnotation() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -216,6 +220,7 @@ public void scan_componentHasRabbitListenerMethods_bindingBean() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -271,6 +276,7 @@ public void scan_componentHasRabbitListenerMethods_multipleParamsWithPayloadAnno .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() From 23098d1813369808bcd03325d3d5b4fc60a89eca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Wed, 11 Jan 2023 17:32:40 +0100 Subject: [PATCH 09/13] Add option to document an example for kafka key bindings, change structure of KafkaAsyncMessageBinding to contain a new sub-annotation KafkaAsyncKey to make it more obvious what is part of the key documentation and which is part of the kafka message binding, Add default values for message binding to KafkaConsumer- and KafkaProducerData --- .../KafkaMessageBindingProcessor.java | 7 ++++--- .../annotation/KafkaAsyncOperationBinding.java | 16 ++++++++++++---- .../asyncapi/types/KafkaConsumerData.java | 2 ++ .../asyncapi/types/KafkaProducerData.java | 2 ++ 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java index 3ea75a12d..2a2507051 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java @@ -49,12 +49,13 @@ private String resolveOrNull(String stringValue) { private Schema resolveSchemaOrNull(KafkaAsyncMessageBinding messageBinding) { Schema schemaDefinition = null; - switch (messageBinding.keyType()) { - case NO_KEY: + switch (messageBinding.key().type()) { + case UNDEFINED_KEY: break; case STRING_KEY: schemaDefinition = new StringSchema() - .description(resolveOrNull(messageBinding.description())); + .example(messageBinding.key().example()) + .description(resolveOrNull(messageBinding.key().description())); } return schemaDefinition; diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java index 950f18fa9..61ec5e319 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java @@ -26,14 +26,22 @@ @Target({}) @interface KafkaAsyncMessageBinding { - KafkaKeyTypes keyType() default KafkaKeyTypes.NO_KEY; - - String description() default ""; + KafkaAsyncKey key() default @KafkaAsyncKey(type = KafkaAsyncKey.KafkaKeyTypes.UNDEFINED_KEY); String bindingVersion() default ""; + } + @Retention(RetentionPolicy.CLASS) + @Target({}) + @interface KafkaAsyncKey { + + KafkaKeyTypes type() default KafkaKeyTypes.STRING_KEY; + + String example() default ""; + + String description() default ""; enum KafkaKeyTypes { - NO_KEY, + UNDEFINED_KEY, STRING_KEY } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java index f963a0fbf..60bc98adf 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -15,5 +16,6 @@ public KafkaConsumerData(String topicName, Class payloadType, String descript this.payloadType = payloadType; this.headers = headers != null ? headers : this.headers; this.operationBinding = ImmutableMap.of("kafka", new KafkaOperationBinding()); + this.messageBinding = ImmutableMap.of("kafka", new KafkaMessageBinding()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java index 21caf044b..a7df15ca7 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -16,6 +17,7 @@ public KafkaProducerData(String topicName, Class payloadType, String descript this.payloadType = payloadType; this.headers = headers != null ? headers : this.headers; this.operationBinding = ImmutableMap.of("kafka", new KafkaOperationBinding()); + this.messageBinding = ImmutableMap.of("kafka", new KafkaMessageBinding()); } } From a724f182adc67af08b3b93bb62203b9dcfd40bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Wed, 11 Jan 2023 17:35:02 +0100 Subject: [PATCH 10/13] Adapt springwolf-kafka-example to the new annotation structure, to contain an example for the key and to the change that a kafka message binding is always present --- .../ExampleClassLevelKafkaListener.java | 9 +++--- .../example/producers/ExampleProducer.java | 10 ++++--- .../src/test/resources/asyncapi.json | 29 ++++++++++++++++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java index 6ec50c890..57e1082b8 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java @@ -13,7 +13,6 @@ import javax.money.MonetaryAmount; -import static io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding.KafkaKeyTypes.STRING_KEY; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; @@ -52,9 +51,11 @@ public void receiveAnotherPayload(AnotherPayloadDto payload) { clientId = "foo-clientId", groupId = "#{'foo-groupId'}", messageBinding = @KafkaAsyncMessageBinding( - keyType = STRING_KEY, - bindingVersion = "1", - description = "Kafka Consumer Message Key" + key = @KafkaAsyncOperationBinding.KafkaAsyncKey( + description = "Kafka Consumer Message Key", + example = "example-key" + ), + bindingVersion = "1" ) ) public void receiveMonetaryAmount(MonetaryAmount payload) { diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java index 667e970d2..6d0e9bf28 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java @@ -3,13 +3,13 @@ import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisher; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncKey; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import static io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding.KafkaKeyTypes.STRING_KEY; import static io.github.stavshamir.springwolf.example.configuration.KafkaConfiguration.PRODUCER_TOPIC; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; @@ -42,9 +42,11 @@ public class ExampleProducer { bindingVersion = "1", clientId = "foo-clientId", messageBinding = @KafkaAsyncMessageBinding( - keyType = STRING_KEY, - bindingVersion = "1", - description = "Kafka Producer Message Key" + key = @KafkaAsyncKey( + description = "Kafka Producer Message Key", + example = "example-key" + ), + bindingVersion = "1" ) ) public void sendMessage(ExamplePayloadDto msg) { diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json index 116a7873a..f99e919df 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json @@ -63,6 +63,9 @@ }, "headers" : { "$ref" : "#/components/schemas/HeadersNotDocumented" + }, + "bindings" : { + "kafka" : { } } } }, @@ -87,6 +90,9 @@ }, "headers" : { "$ref" : "#/components/schemas/CloudEventHeadersForAnotherPayloadDtoEndpoint" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto", @@ -97,6 +103,17 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders" + }, + "bindings" : { + "kafka" : { + "key" : { + "type" : "string", + "description" : "Kafka Producer Message Key", + "example" : "example-key", + "exampleSetFlag" : true, + "types" : [ "string" ] + } + } } } ] } @@ -120,6 +137,9 @@ }, "headers" : { "$ref" : "#/components/schemas/HeadersNotDocumented" + }, + "bindings" : { + "kafka" : { } } } }, @@ -148,6 +168,9 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-AnotherPayloadDto" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto", @@ -157,6 +180,9 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "javax.money.MonetaryAmount", @@ -173,7 +199,8 @@ "key" : { "type" : "string", "description" : "Kafka Consumer Message Key", - "exampleSetFlag" : false, + "example" : "example-key", + "exampleSetFlag" : true, "types" : [ "string" ] }, "bindingVersion" : "1" From 5d0232f10b7162a3746ec4a78f14633261d7601a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Thu, 12 Jan 2023 18:12:26 +0100 Subject: [PATCH 11/13] Extend producer controllers of springwolf-kafka-plugin and springwolf-amqp-pluging to support message bindings which are sent by springwolf-ui --- .../asyncapi/controller/dtos/MessageDto.java | 8 ++--- .../asyncapi/SpringwolfAmqpController.java | 15 ++++++---- .../producer/SpringwolfAmqpProducer.java | 2 +- .../asyncapi/SpringwolfKafkaController.java | 18 ++++++++---- .../producer/SpringwolfKafkaProducer.java | 8 ++--- .../SpringwolfKafkaControllerTest.java | 29 +++++++++++++++---- .../producer/SpringwolfKafkaProducerTest.java | 4 +-- 7 files changed, 55 insertions(+), 29 deletions(-) rename springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java => springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java (62%) diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java similarity index 62% rename from springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java rename to springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java index 820f58b76..c7667cf1b 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java @@ -1,4 +1,4 @@ -package io.github.stavshamir.springwolf.asyncapi.dtos; +package io.github.stavshamir.springwolf.asyncapi.controller.dtos; import lombok.Builder; import lombok.Data; @@ -6,14 +6,14 @@ import java.util.Map; - @Data @Builder @Jacksonized -public class KafkaMessageDto { +public class MessageDto { + + private final Map bindings; private final Map headers; private final Map payload; - } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java index 1cdfb9b46..b6b604233 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java @@ -1,15 +1,18 @@ package io.github.stavshamir.springwolf.asyncapi; +import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; -import java.util.Map; - import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_CONFIG_PREFIX; import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_PLUGIN_PUBLISHING_ENABLED; @@ -23,14 +26,14 @@ public class SpringwolfAmqpController { private final SpringwolfAmqpProducer amqpProducer; @PostMapping("/publish") - public void publish(@RequestParam String topic, @RequestBody Map payload) { + public void publish(@RequestParam String topic, @RequestBody MessageDto message) { if (amqpProducer.isEnabled()) { log.warn("AMQP producer is not enabled - message will not be published"); throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled"); } - log.info("Publishing to amqp queue {}: {}", topic, payload); - amqpProducer.send(topic, payload); + log.info("Publishing to amqp queue {}: {}", topic, message.getPayload()); + amqpProducer.send(topic, message.getPayload()); } } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java index 4291e922f..e0ecca05d 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java @@ -33,7 +33,7 @@ public SpringwolfAmqpProducer(ChannelsService channelsService, List payload) { + public void send(String channelName, Map payload) { ChannelItem channelItem = channelsService.getChannels().get(channelName); String exchange = getExchangeName(channelItem); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java index 26a155f3e..6f9bd6543 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java @@ -1,12 +1,16 @@ package io.github.stavshamir.springwolf.asyncapi; -import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto; +import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import static io.github.stavshamir.springwolf.SpringWolfKafkaConfigConstants.SPRINGWOLF_KAFKA_CONFIG_PREFIX; @@ -22,8 +26,8 @@ public class SpringwolfKafkaController { private final SpringwolfKafkaProducer kafkaProducer; @PostMapping("/publish") - public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kafkaMessage) { - if(kafkaMessage.getPayload() == null) { + public void publish(@RequestParam String topic, @RequestBody MessageDto message) { + if(message.getPayload() == null) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Message payload is required"); } @@ -32,8 +36,10 @@ public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kaf throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled"); } - log.debug("Publishing to kafka topic {}: {}", topic, kafkaMessage); - kafkaProducer.send(topic, kafkaMessage.getHeaders(), kafkaMessage.getPayload()); + String kafkaKey = message.getBindings() != null ? message.getBindings().get("key") : null; + log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message); + + kafkaProducer.send(topic, kafkaKey, message.getHeaders(), message.getPayload()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java index abb14bd2a..feaf0331e 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java @@ -25,18 +25,18 @@ public boolean isEnabled() { return kafkaTemplate.isPresent(); } - public void send(String topic, Map headers, Map payload) { + public void send(String topic, String key, Map headers, Map payload) { if (kafkaTemplate.isPresent()) { - kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload)); + kafkaTemplate.get().send(buildProducerRecord(topic, key, headers, payload)); } else { log.warn("Kafka producer is not configured"); } } - private ProducerRecord> buildProducerRecord(String topic, Map headers, Map payload) { + private ProducerRecord> buildProducerRecord(String topic, String key, Map headers, Map payload) { List
recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList(); - return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders); + return new ProducerRecord<>(topic, null, null, key, payload, recordHeaders); } private List
buildHeaders(Map headers) { diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java index 97aafff1a..42f99b58a 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java @@ -44,7 +44,7 @@ public class SpringwolfKafkaControllerTest { @Test public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() { try { - String content = "{\"headers\": null, \"payload\": null }"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": null }"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -58,7 +58,7 @@ public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() { public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(false); - String content = "{\"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -69,14 +69,14 @@ public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() throw public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(true); - String content = "{\"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; mvc.perform(post("/springwolf/kafka/publish").param("topic", "test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) .andExpect(status().isOk()); - verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), payloadCaptor.capture()); + verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), isNull(), payloadCaptor.capture()); assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-key", "some-value")); } @@ -85,14 +85,31 @@ public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() throws Ex public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAreSend() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(true); - String content = "{\"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; + String content = "{\"bindings\": null, \"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) .andExpect(status().isOk()); - verify(springwolfKafkaProducer).send(eq("test-topic"), headerCaptor.capture(), payloadCaptor.capture()); + verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), headerCaptor.capture(), payloadCaptor.capture()); + + assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-payload-key", "some-payload-value")); + } + + @Test + public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAndBindingsAreSend() throws Exception{ + when(springwolfKafkaProducer.isEnabled()).thenReturn(true); + + String content = "{\"bindings\": {\"key\": \"kafka-key-value\"}, \"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; + + mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andExpect(status().isOk()); + + verify(springwolfKafkaProducer).send(eq("test-topic"), eq("kafka-key-value"), headerCaptor.capture(), payloadCaptor.capture()); assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-payload-key", "some-payload-value")); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java index 1f0a2a151..17a62c2a7 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java @@ -49,7 +49,7 @@ public void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateCon public void testSendingKafkaMessageWithoutHeaders() { Map payload = Collections.singletonMap("some", "field"); - springwolfKafkaProducer.send("test-topic", null, payload); + springwolfKafkaProducer.send("test-topic", null, null, payload); verify(kafkaTemplate).send(recordArgumentCaptor.capture()); @@ -67,7 +67,7 @@ public void testSendingKafkaMessageWithHeaders() { Map payload = Collections.singletonMap("some", "field"); Map headers = Collections.singletonMap("header-key", "header"); - springwolfKafkaProducer.send("test-topic", headers, payload); + springwolfKafkaProducer.send("test-topic", null, headers, payload); verify(kafkaTemplate).send(recordArgumentCaptor.capture()); From f60a340bed43b6dbcd79060f13b6d41fa84209d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Mon, 16 Jan 2023 10:14:39 +0100 Subject: [PATCH 12/13] Fix java doc of MessageBindingProcessor after rebase --- .../operationdata/annotation/MessageBindingProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java index c8f11b1ae..b481118dd 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java @@ -8,7 +8,7 @@ public interface MessageBindingProcessor { /** - * Process the methods annotated with {@link AsyncPublisher} and {@link AsyncSubscriber} + * Process the methods annotated with {@link AsyncPublisher} and {@link AsyncListener} * for protocol specific messageBinding annotations, method parameters, etc * * @param method The method being annotated From e6954febf5a290bf9a815304fbcd787d39fcb24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20M=C3=BCller?= Date: Mon, 6 Feb 2023 18:17:05 +0100 Subject: [PATCH 13/13] Move KafkaMessageDtoDeserializationTest to core and rename to MessageDtoDeserializationTest --- .../controller/dtos/MessageDtoDeserializationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java => springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java (78%) diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java similarity index 78% rename from springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java rename to springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java index 8999709fa..55462938d 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java @@ -1,4 +1,4 @@ -package io.github.stavshamir.springwolf.asyncapi.dtos; +package io.github.stavshamir.springwolf.asyncapi.controller.dtos; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -8,13 +8,13 @@ import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; -public class KafkaMessageDtoDeserializationTest { +public class MessageDtoDeserializationTest { @Test public void testCanBeSerialized() throws IOException { String content = "{\"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; - KafkaMessageDto value = new ObjectMapper().readValue(content, KafkaMessageDto.class); + MessageDto value = new ObjectMapper().readValue(content, MessageDto.class); assertThat(value).isNotNull(); assertThat(value.getHeaders()).isEqualTo(singletonMap("some-header-key", "some-header-value"));