diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java index 77f13230b..9599f565a 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java @@ -4,7 +4,14 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; import lombok.extern.slf4j.Slf4j; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -13,6 +20,7 @@ public class MessageHelper { private static final String ONE_OF = "oneOf"; private static final Comparator byMessageName = Comparator.comparing(Message::getName); + private static final Supplier> messageSupplier = () -> new TreeSet<>(byMessageName); public static Object toMessageObjectOrComposition(Set messages) { @@ -22,23 +30,22 @@ public static Object toMessageObjectOrComposition(Set messages) { case 1: return messages.toArray()[0]; default: - return ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier))); + return ImmutableMap.of(ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))); } } @SuppressWarnings("unchecked") public static Set messageObjectToSet(Object messageObject) { if (messageObject instanceof Message) { - return new HashSet<>(Arrays.asList((Message) messageObject)); + return new HashSet<>(Collections.singletonList((Message) messageObject)); } if (messageObject instanceof Map) { - Set messages = ((Map>) messageObject).get(ONE_OF); + List messages = ((Map>) messageObject).get(ONE_OF); return new HashSet<>(messages); } log.warn("Message object must contain either a Message or a Map, but contained: {}", messageObject.getClass()); return new HashSet<>(); } - } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java similarity index 62% rename from springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java rename to springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java index 820f58b76..c7667cf1b 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDto.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDto.java @@ -1,4 +1,4 @@ -package io.github.stavshamir.springwolf.asyncapi.dtos; +package io.github.stavshamir.springwolf.asyncapi.controller.dtos; import lombok.Builder; import lombok.Data; @@ -6,14 +6,14 @@ import java.util.Map; - @Data @Builder @Jacksonized -public class KafkaMessageDto { +public class MessageDto { + + private final Map bindings; private final Map headers; private final Map payload; - } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java index 32fedb10c..03ddfc47f 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java @@ -5,7 +5,15 @@ import io.github.stavshamir.springwolf.asyncapi.MessageHelper; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Collectors; import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition; @@ -42,17 +50,25 @@ public static Map merge(List } private static Operation mergeOperation(Operation operation, Operation otherOperation) { - Set mergedMessages = getMessages(operation); - Set currentEntryMessages = getMessages(otherOperation); - mergedMessages.addAll(currentEntryMessages); - Operation mergedOperation = operation != null ? operation : otherOperation; + + Set mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation)); if (!mergedMessages.isEmpty()) { mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages)); } return mergedOperation; } + private static Set mergeMessages(Set messages, Set otherMessages) { + Map nameToMessage = messages.stream().collect(Collectors.toMap(Message::getName, Function.identity())); + + for (Message otherMessage : otherMessages) { + nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage); + } + + return new HashSet<>(nameToMessage.values()); + } + private static Set getMessages(Operation operation) { return Optional .ofNullable(operation) diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java index d62625d4c..38f314357 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -18,7 +19,13 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,12 +73,18 @@ public abstract class AbstractClassLevelListenerScanner buildChannelBinding(ClassAnnotation annotation); /** - * Can be overriden by implementations + * @param method The specific method. Can be used to extract message binding from an annotation. + * @return A map containing a message binding pointed to by the protocol binding name. + */ + protected abstract Map buildMessageBinding(Method method); + + /** + * Can be overridden by implementations * * @param method The specific method. Can be used to extract the payload type * @return The AsyncHeaders @@ -159,6 +172,7 @@ private Message buildMessage(Method method) { .title(modelName) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(buildMessageBinding(method)) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java index e323b8c91..e407d4fad 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -17,7 +18,11 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; @@ -62,6 +67,12 @@ public Map scan() { */ protected abstract Map buildOperationBinding(T annotation); + /** + * @param annotation An instance of a listener annotation. + * @return A map containing a message binding pointed to by the protocol binding name. + */ + protected abstract Map buildMessageBinding(T annotation); + /** * @param method The listener method. * @return The class object of the payload received by the listener. @@ -88,9 +99,10 @@ private Map.Entry mapMethodToChannel(Method method) { Map channelBinding = buildChannelBinding(annotation); Map operationBinding = buildOperationBinding(annotation); + Map messageBinding = buildMessageBinding(annotation); Class payload = getPayloadType(method); String operationId = channelName + "_publish_" + method.getName(); - ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, operationId); + ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, messageBinding, operationId); return Maps.immutableEntry(channelName, channel); } @@ -98,6 +110,7 @@ private Map.Entry mapMethodToChannel(Method method) { private ChannelItem buildChannel(Map channelBinding, Class payloadType, Map operationBinding, + Map messageBinding, String operationId) { String modelName = schemasService.register(payloadType); String headerModelName = schemasService.register(AsyncHeaders.NOT_DOCUMENTED); @@ -107,6 +120,7 @@ private ChannelItem buildChannel(Map channelBi .title(modelName) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(messageBinding) .build(); Operation operation = Operation.builder() diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java index d3aac185e..da5981131 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java @@ -99,6 +99,7 @@ private Message buildMessage(OperationData operationData) { .description(operationData.getDescription()) .payload(PayloadReference.fromModelName(modelName)) .headers(HeaderReference.fromModelName(headerModelName)) + .bindings(operationData.getMessageBinding()) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java new file mode 100644 index 000000000..2414b9c20 --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProcessedMessageBinding.java @@ -0,0 +1,10 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; + +import com.asyncapi.v2.binding.MessageBinding; +import lombok.Data; + +@Data +public class ProcessedMessageBinding { + private final String type; + private final MessageBinding binding; +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java index 4e9473f05..6160a6a58 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java @@ -1,6 +1,8 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedOperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -59,11 +61,19 @@ private static String getDescription(List value) .orElse(null); } - public static Map processBindingFromAnnotation(Method method, List operationBindingProcessors) { + public static Map processOperationBindingFromAnnotation(Method method, List operationBindingProcessors) { return operationBindingProcessors.stream() .map(operationBindingProcessor -> operationBindingProcessor.process(method)) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toMap(ProcessedOperationBinding::getType, ProcessedOperationBinding::getBinding)); } + + public static Map processMessageBindingFromAnnotation(Method method, List messageBindingProcessors) { + return messageBindingProcessors.stream() + .map(messageBindingProcessor -> messageBindingProcessor.process(method)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding)); + } } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java index 3e7b94a70..38548c812 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor; @@ -32,6 +33,8 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner private final List operationBindingProcessors; + private final List messageBindingProcessors; + @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { this.resolver = resolver; @@ -63,7 +66,8 @@ private Set getAnnotatedMethods(Class type) { private OperationData mapMethodToOperationData(Method method) { log.debug("Mapping method \"{}\" to channels", method.getName()); - Map operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors); + Map operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors); + Map messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors); Class annotationClass = AsyncListener.class; AsyncListener annotation = Optional.of(method.getAnnotation(annotationClass)) @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) { .headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op)) .payloadType(payloadType) .operationBinding(operationBindings) + .messageBinding(messageBindings) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java index cc6c2f358..1084bf8a3 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor; @@ -32,6 +33,8 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne private final List operationBindingProcessors; + private final List messageBindingProcessors; + @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { this.resolver = resolver; @@ -63,7 +66,8 @@ private Set getAnnotatedMethods(Class type) { private OperationData mapMethodToOperationData(Method method) { log.debug("Mapping method \"{}\" to channels", method.getName()); - Map operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors); + Map operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors); + Map messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors); Class annotationClass = AsyncPublisher.class; AsyncPublisher annotation = Optional.of(method.getAnnotation(annotationClass)) @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) { .headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op)) .payloadType(payloadType) .operationBinding(operationBindings) + .messageBinding(messageBindings) .build(); } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java new file mode 100644 index 000000000..b481118dd --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/MessageBindingProcessor.java @@ -0,0 +1,18 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation; + +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding; + +import java.lang.reflect.Method; +import java.util.Optional; + +public interface MessageBindingProcessor { + + /** + * Process the methods annotated with {@link AsyncPublisher} and {@link AsyncListener} + * for protocol specific messageBinding annotations, method parameters, etc + * + * @param method The method being annotated + * @return A message binding, if found + */ + Optional process(Method method); +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java index 5f26420ca..7a0b8006e 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; import lombok.AllArgsConstructor; @@ -61,4 +62,13 @@ public class ConsumerData implements OperationData { */ protected Map operationBinding; + /** + * The message binding of the consumer. + *
+ * For example: + * + * ImmutableMap.of("kafka", new KafkaMessageBinding()) + * + */ + protected Map messageBinding; } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java index 98a18ca7f..25c71c7bd 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/OperationData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -40,10 +41,16 @@ public interface OperationData { */ Map getOperationBinding(); + /** + * The message binding. + */ + Map getMessageBinding(); + enum OperationType { PUBLISH("publish"), SUBSCRIBE("subscribe"); public final String operationName; + OperationType(String operationName) { this.operationName = operationName; } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java index 8b3836fb5..780870479 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; import lombok.AllArgsConstructor; @@ -61,4 +62,14 @@ public class ProducerData implements OperationData { */ protected Map operationBinding; + /** + * The message binding of the producer. + *
+ * For example: + * + * ImmutableMap.of("kafka", new KafkaMessageBinding()) + * + */ + protected Map messageBinding; + } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java index 930ec1add..73f632420 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/channel/operation/message/Message.java @@ -1,8 +1,11 @@ package io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message; +import com.asyncapi.v2.binding.MessageBinding; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference; import lombok.*; +import java.util.Map; + /** * Describes a message received on a given channel and operation. * @@ -10,7 +13,6 @@ */ @Data @Builder -@EqualsAndHashCode(of = {"name"}) @NoArgsConstructor @AllArgsConstructor public class Message { @@ -33,4 +35,6 @@ public class Message { private PayloadReference payload; private HeaderReference headers; + + private Map bindings; } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java index eab9f0f18..8db3774d9 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerServiceTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi; import com.asyncapi.v2.binding.OperationBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -15,6 +16,7 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference; import io.swagger.v3.core.converter.ModelConverters; import io.swagger.v3.oas.models.media.Schema; +import io.swagger.v3.oas.models.media.StringSchema; import lombok.Data; import org.apache.commons.io.IOUtils; import org.json.JSONException; @@ -67,6 +69,7 @@ public void AsyncAPI_should_map_to_a_valid_asyncapi_json() throws IOException, J .name("io.github.stavshamir.springwolf.ExamplePayload") .title("Example Payload") .payload(PayloadReference.fromModelName("ExamplePayload")) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding(new StringSchema(), "binding-version-1"))) .build(); OperationBinding operationBinding = KafkaOperationBinding.builder().groupId("myGroupId").build(); @@ -105,4 +108,4 @@ static class ExamplePayload { String s; } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java index e7d964b7a..74cd6adef 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.info.Info; @@ -53,13 +54,16 @@ public AsyncApiDocket docket() { .description("producer-topic-description") .payloadType(String.class) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); ConsumerData kafkaConsumerData = ConsumerData.builder() .channelName("consumer-topic") .description("consumer-topic-description") .payloadType(String.class) - .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())).build(); + .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) + .build(); return AsyncApiDocket.builder() .info(info) @@ -106,6 +110,7 @@ public void getAsyncAPI_producers_should_be_correct() { assertThat(channel.getSubscribe()).isNotNull(); final Message message = (Message) channel.getSubscribe().getMessage(); assertThat(message.getDescription()).isEqualTo("producer-topic-description"); + assertThat(message.getBindings()).isEqualTo(ImmutableMap.of("kafka", new KafkaMessageBinding())); } @Test @@ -120,6 +125,7 @@ public void getAsyncAPI_consumers_should_be_correct() { assertThat(channel.getPublish()).isNotNull(); final Message message = (Message) channel.getPublish().getMessage(); assertThat(message.getDescription()).isEqualTo("consumer-topic-description"); + assertThat(message.getBindings()).isEqualTo(ImmutableMap.of("kafka", new KafkaMessageBinding())); } } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java index 79ac061e3..593cde01c 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/MessageHelperTest.java @@ -1,5 +1,6 @@ package io.github.stavshamir.springwolf.asyncapi; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message; @@ -50,7 +51,61 @@ public void toMessageObjectOrComposition_multipleMessages() { assertThat(asObject) .isInstanceOf(Map.class) - .isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2))); + .isEqualTo(ImmutableMap.of("oneOf", ImmutableList.of(message2, message1))); + } + + @Test + public void toMessageObjectOrComposition_multipleMessages_remove_duplicates() { + Message message1 = Message.builder() + .name("foo") + .description("This is message 1") + .build(); + + Message message2 = Message.builder() + .name("bar") + .description("This is message 2") + .build(); + + Message message3 = Message.builder() + .name("bar") + .description("This is message 3, but in essence the same payload type as message 2") + .build(); + + Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message1, message2, message3)); + + // Message3 is not included as it is identical in terms of payload type (Message#name) to message 2 + assertThat(asObject) + .isInstanceOf(Map.class) + .isEqualTo(ImmutableMap.of("oneOf", ImmutableList.of(message2, message1))); + } + + @Test + public void toMessageObjectOrComposition_multipleMessages_should_not_break_deep_equals() { + Message actualMessage1 = Message.builder() + .name("foo") + .description("This is actual message 1") + .build(); + + Message actualMessage2 = Message.builder() + .name("bar") + .description("This is actual message 2") + .build(); + + Object actualObject = toMessageObjectOrComposition(ImmutableSet.of(actualMessage1, actualMessage2)); + + Message expectedMessage1 = Message.builder() + .name("foo") + .description("This is expected message 1") + .build(); + + Message expectedMessage2 = Message.builder() + .name("bar") + .description("This is expected message 2") + .build(); + + Object expectedObject = toMessageObjectOrComposition(ImmutableSet.of(expectedMessage1, expectedMessage2)); + + assertThat(actualObject).isNotEqualTo(expectedObject); } @Test @@ -94,4 +149,4 @@ public void messageObjectToSet_SetOfMessage() { .containsExactlyInAnyOrder(message1, message2); } -} \ No newline at end of file +} diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java similarity index 78% rename from springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java rename to springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java index 8999709fa..55462938d 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/dtos/KafkaMessageDtoDeserializationTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/controller/dtos/MessageDtoDeserializationTest.java @@ -1,4 +1,4 @@ -package io.github.stavshamir.springwolf.asyncapi.dtos; +package io.github.stavshamir.springwolf.asyncapi.controller.dtos; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -8,13 +8,13 @@ import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; -public class KafkaMessageDtoDeserializationTest { +public class MessageDtoDeserializationTest { @Test public void testCanBeSerialized() throws IOException { String content = "{\"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; - KafkaMessageDto value = new ObjectMapper().readValue(content, KafkaMessageDto.class); + MessageDto value = new ObjectMapper().readValue(content, MessageDto.class); assertThat(value).isNotNull(); assertThat(value.getHeaders()).isEqualTo(singletonMap("some-header-key", "some-header-value")); diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java index b812b85ad..1d1410d82 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMergerTest.java @@ -16,7 +16,6 @@ public class ChannelMergerTest { - @Test public void shouldNotMergeDifferentChannelNames() { // given @@ -141,4 +140,4 @@ public void shouldUseOtherMessageIfFirstMessageIsMissing() { assertThat(it.getSubscribe()).isNull(); }); } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java index 2c8e7c83b..f1ae979bd 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.google.common.collect.ImmutableMap; import lombok.EqualsAndHashCode; @@ -30,6 +31,11 @@ protected String getChannelName(TestMethodLevelListenerScannerTest.TestChannelLi return ImmutableMap.of("test-operation-binding", new TestOperationBinding()); } + @Override + protected Map buildMessageBinding(TestMethodLevelListenerScannerTest.TestChannelListener annotation) { + return ImmutableMap.of("test-message-binding", new TestMessageBinding()); + } + @Override protected Class getPayloadType(Method method) { Class[] parameterTypes = method.getParameterTypes(); @@ -49,4 +55,8 @@ public static class TestChannelBinding extends ChannelBinding { public static class TestOperationBinding extends OperationBinding { } + @EqualsAndHashCode(callSuper = true) + public static class TestMessageBinding extends MessageBinding { + } + } diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java index d6c041a03..e1692fab7 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/TestMethodLevelListenerScannerTest.java @@ -72,6 +72,7 @@ public void scan_componentHasListenerMethod() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("test-message-binding", new TestMethodLevelListenerScanner.TestMessageBinding())) .build(); Operation operation = Operation.builder() diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java index 95fe3d926..76e734152 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ConsumerOperationDataScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -51,6 +52,7 @@ public void allFieldsConsumerData() { .description(description) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -73,6 +75,7 @@ public void allFieldsConsumerData() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build()) .build(); @@ -114,6 +117,7 @@ public void multipleConsumersForSameTopic() { .description(description1) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -122,6 +126,7 @@ public void multipleConsumersForSameTopic() { .description(description2) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(AnotherExamplePayloadDto.class) .headers(AsyncHeaders.NOT_USED) .build(); @@ -143,6 +148,7 @@ public void multipleConsumersForSameTopic() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(), Message.builder() .name(AnotherExamplePayloadDto.class.getName()) @@ -150,6 +156,7 @@ public void multipleConsumersForSameTopic() { .title(AnotherExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_USED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build() ); diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java index 00f5a8333..419376311 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/ProducerOperationDataScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -51,6 +52,7 @@ public void allFieldsProducerData() { .description(description) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -73,6 +75,7 @@ public void allFieldsProducerData() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build()) .build(); @@ -114,6 +117,7 @@ public void multipleProducersForSameTopic() { .description(description1) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(ExamplePayloadDto.class) .build(); @@ -122,6 +126,7 @@ public void multipleProducersForSameTopic() { .description(description2) .channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding())) .operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())) + .messageBinding(ImmutableMap.of("kafka", new KafkaMessageBinding())) .payloadType(AnotherExamplePayloadDto.class) .headers(AsyncHeaders.NOT_USED) .build(); @@ -143,6 +148,7 @@ public void multipleProducersForSameTopic() { .title(ExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(), Message.builder() .name(AnotherExamplePayloadDto.class.getName()) @@ -150,6 +156,7 @@ public void multipleProducersForSameTopic() { .title(AnotherExamplePayloadDto.class.getSimpleName()) .payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_USED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build() ); @@ -165,8 +172,7 @@ public void multipleProducersForSameTopic() { .subscribe(operation) .build(); - assertThat(producerChannels.get(channelName)) - .isEqualTo(expectedChannel); + assertThat(producerChannels.get(channelName)).isEqualTo(expectedChannel); } private void mockProducers(Collection producers) { diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java index f123a663c..d35b84d8e 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScannerTest.java @@ -66,9 +66,10 @@ public void scan_componentHasListenerMethod() { Message message = Message.builder() .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) - .description(null) + .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -102,6 +103,7 @@ public void scan_componentHasListenerMethodWithAllAttributes() { .description("description") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("TestSchema")) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -164,4 +166,4 @@ private static class SimpleFoo { private String s; private boolean b; } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java index 2c1b6b23f..10776cf66 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScannerTest.java @@ -66,9 +66,10 @@ public void scan_componentHasPublisherMethod() { Message message = Message.builder() .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) - .description(null) + .description("") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -102,6 +103,7 @@ public void scan_componentHasPublisherMethodWithAllAttributes() { .description("description") .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("TestSchema")) + .bindings(EMPTY_MAP) .build(); Operation operation = Operation.builder() @@ -164,4 +166,4 @@ private static class SimpleFoo { private String s; private boolean b; } -} \ No newline at end of file +} diff --git a/springwolf-core/src/test/resources/asyncapi/asyncapi.json b/springwolf-core/src/test/resources/asyncapi/asyncapi.json index 082ed7d65..8e3dafd4b 100644 --- a/springwolf-core/src/test/resources/asyncapi/asyncapi.json +++ b/springwolf-core/src/test/resources/asyncapi/asyncapi.json @@ -43,6 +43,18 @@ "title" : "Example Payload", "payload" : { "$ref" : "#/components/schemas/ExamplePayload" + }, + "bindings" : { + "kafka": { + "key": { + "type": "string", + "exampleSetFlag": false, + "types": [ + "string" + ] + }, + "bindingVersion": "binding-version-1" + } } } } @@ -61,4 +73,4 @@ } }, "tags" : [ ] -} \ No newline at end of file +} diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java index 1274f9f0a..57e1082b8 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/consumers/ExampleClassLevelKafkaListener.java @@ -3,6 +3,7 @@ import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListener; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto; import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto; import lombok.extern.slf4j.Slf4j; @@ -48,7 +49,14 @@ public void receiveAnotherPayload(AnotherPayloadDto payload) { @KafkaAsyncOperationBinding( bindingVersion = "1", clientId = "foo-clientId", - groupId = "#{'foo-groupId'}" + groupId = "#{'foo-groupId'}", + messageBinding = @KafkaAsyncMessageBinding( + key = @KafkaAsyncOperationBinding.KafkaAsyncKey( + description = "Kafka Consumer Message Key", + example = "example-key" + ), + bindingVersion = "1" + ) ) public void receiveMonetaryAmount(MonetaryAmount payload) { log.info("Received new message in multi-payload-topic: {}", payload.toString()); diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java index b872dacc0..6d0e9bf28 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/producers/ExampleProducer.java @@ -3,6 +3,8 @@ import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisher; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncKey; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; @@ -38,7 +40,14 @@ public class ExampleProducer { )) @KafkaAsyncOperationBinding( bindingVersion = "1", - clientId = "foo-clientId" + clientId = "foo-clientId", + messageBinding = @KafkaAsyncMessageBinding( + key = @KafkaAsyncKey( + description = "Kafka Producer Message Key", + example = "example-key" + ), + bindingVersion = "1" + ) ) public void sendMessage(ExamplePayloadDto msg) { kafkaTemplate.send(PRODUCER_TOPIC, msg); diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json index 3355b72f0..f99e919df 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json @@ -63,6 +63,9 @@ }, "headers" : { "$ref" : "#/components/schemas/HeadersNotDocumented" + }, + "bindings" : { + "kafka" : { } } } }, @@ -87,6 +90,9 @@ }, "headers" : { "$ref" : "#/components/schemas/CloudEventHeadersForAnotherPayloadDtoEndpoint" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto", @@ -97,6 +103,17 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders" + }, + "bindings" : { + "kafka" : { + "key" : { + "type" : "string", + "description" : "Kafka Producer Message Key", + "example" : "example-key", + "exampleSetFlag" : true, + "types" : [ "string" ] + } + } } } ] } @@ -120,6 +137,9 @@ }, "headers" : { "$ref" : "#/components/schemas/HeadersNotDocumented" + }, + "bindings" : { + "kafka" : { } } } }, @@ -148,6 +168,9 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-AnotherPayloadDto" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto", @@ -157,6 +180,9 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto" + }, + "bindings" : { + "kafka" : { } } }, { "name" : "javax.money.MonetaryAmount", @@ -167,6 +193,18 @@ }, "headers" : { "$ref" : "#/components/schemas/SpringKafkaDefaultHeaders-MonetaryAmount" + }, + "bindings" : { + "kafka" : { + "key" : { + "type" : "string", + "description" : "Kafka Consumer Message Key", + "example" : "example-key", + "exampleSetFlag" : true, + "types" : [ "string" ] + }, + "bindingVersion" : "1" + } } } ] } @@ -414,4 +452,4 @@ } }, "tags" : [ ] -} \ No newline at end of file +} diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java index 1cdfb9b46..b6b604233 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfAmqpController.java @@ -1,15 +1,18 @@ package io.github.stavshamir.springwolf.asyncapi; +import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; -import java.util.Map; - import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_CONFIG_PREFIX; import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_PLUGIN_PUBLISHING_ENABLED; @@ -23,14 +26,14 @@ public class SpringwolfAmqpController { private final SpringwolfAmqpProducer amqpProducer; @PostMapping("/publish") - public void publish(@RequestParam String topic, @RequestBody Map payload) { + public void publish(@RequestParam String topic, @RequestBody MessageDto message) { if (amqpProducer.isEnabled()) { log.warn("AMQP producer is not enabled - message will not be published"); throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled"); } - log.info("Publishing to amqp queue {}: {}", topic, payload); - amqpProducer.send(topic, payload); + log.info("Publishing to amqp queue {}: {}", topic, message.getPayload()); + amqpProducer.send(topic, message.getPayload()); } } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java index c7d15c852..04aa54976 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScanner.java @@ -1,8 +1,10 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.amqp.AMQPChannelBinding; +import com.asyncapi.v2.binding.amqp.AMQPMessageBinding; import com.asyncapi.v2.binding.amqp.AMQPOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; @@ -107,6 +109,12 @@ private String getExchangeName(RabbitListener annotation) { return ImmutableMap.of("amqp", AMQPOperationBinding.builder().cc(getRoutingKeys(annotation)).build()); } + @Override + protected Map buildMessageBinding(RabbitListener annotation) { + // currently the feature to define amqp message binding is not implemented. + return ImmutableMap.of("amqp", new AMQPMessageBinding()); + } + private List getRoutingKeys(RabbitListener annotation) { /* The routing key is taken from the binding. As the key field in the @QueueBinding can be an empty array, diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java index 4291e922f..e0ecca05d 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java @@ -33,7 +33,7 @@ public SpringwolfAmqpProducer(ChannelsService channelsService, List payload) { + public void send(String channelName, Map payload) { ChannelItem channelItem = channelsService.getChannels().get(channelName); String exchange = getExchangeName(channelItem); diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java index 9797cc8ab..24277785e 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelRabbitListenerScannerTest.java @@ -2,6 +2,7 @@ import com.asyncapi.v2.binding.ChannelBinding; import com.asyncapi.v2.binding.amqp.AMQPChannelBinding; +import com.asyncapi.v2.binding.amqp.AMQPMessageBinding; import com.asyncapi.v2.binding.amqp.AMQPOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -99,6 +100,7 @@ public void scan_componentHasRabbitListenerMethods_hardCodedTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -140,6 +142,7 @@ public void scan_componentHasRabbitListenerMethods_embeddedValueTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -178,6 +181,7 @@ public void scan_componentHasRabbitListenerMethods_bindingsAnnotation() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -216,6 +220,7 @@ public void scan_componentHasRabbitListenerMethods_bindingBean() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() @@ -271,6 +276,7 @@ public void scan_componentHasRabbitListenerMethods_multipleParamsWithPayloadAnno .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("amqp", new AMQPMessageBinding())) .build(); Operation operation = Operation.builder() diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java index 26a155f3e..6f9bd6543 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java @@ -1,12 +1,16 @@ package io.github.stavshamir.springwolf.asyncapi; -import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto; +import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import static io.github.stavshamir.springwolf.SpringWolfKafkaConfigConstants.SPRINGWOLF_KAFKA_CONFIG_PREFIX; @@ -22,8 +26,8 @@ public class SpringwolfKafkaController { private final SpringwolfKafkaProducer kafkaProducer; @PostMapping("/publish") - public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kafkaMessage) { - if(kafkaMessage.getPayload() == null) { + public void publish(@RequestParam String topic, @RequestBody MessageDto message) { + if(message.getPayload() == null) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Message payload is required"); } @@ -32,8 +36,10 @@ public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kaf throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled"); } - log.debug("Publishing to kafka topic {}: {}", topic, kafkaMessage); - kafkaProducer.send(topic, kafkaMessage.getHeaders(), kafkaMessage.getPayload()); + String kafkaKey = message.getBindings() != null ? message.getBindings().get("key") : null; + log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message); + + kafkaProducer.send(topic, kafkaKey, message.getHeaders(), message.getPayload()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java index af641a220..85d98c423 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner; @@ -59,6 +60,13 @@ protected String getChannelName(KafkaListener annotation) { return KafkaListenerUtil.buildChannelBinding(); } + @Override + protected Map buildMessageBinding(Method method) { + // Currently there is no interesting data in the KafkaListener annotation, but we keep it for the sake of + // consistency in the code and in the serialized specification (always have at least an empty binding for kafka) + return KafkaListenerUtil.buildMessageBinding(); + } + @Override protected AsyncHeaders buildHeaders(Method method) { Class payloadType = getPayloadType(method); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java index 2349322fb..2a5120cf3 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/KafkaListenerUtil.java @@ -1,8 +1,10 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import lombok.extern.slf4j.Slf4j; @@ -44,4 +46,8 @@ public static String getChannelName(KafkaListener annotation, StringValueResolve binding.setGroupId(groupId); return ImmutableMap.of("kafka", binding); } + + public static Map buildMessageBinding() { + return ImmutableMap.of("kafka", new KafkaMessageBinding()); + } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java index 76cd29386..338c0c1f8 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScanner.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.ChannelBinding; +import com.asyncapi.v2.binding.MessageBinding; import com.asyncapi.v2.binding.OperationBinding; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority; import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner; @@ -49,6 +50,13 @@ protected String getChannelName(KafkaListener annotation) { return KafkaListenerUtil.buildOperationBinding(annotation, resolver); } + @Override + protected Map buildMessageBinding(KafkaListener annotation) { + // Currently there is no interesting data in the KafkaListener annotation, but we keep it for the sake of + // consistency in the code and in the serialized specification (always have at least an empty binding for kafka) + return KafkaListenerUtil.buildMessageBinding(); + } + @Override protected Class getPayloadType(Method method) { return SpringPayloadAnnotationTypeExtractor.getPayloadType(method); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java new file mode 100644 index 000000000..2a2507051 --- /dev/null +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/KafkaMessageBindingProcessor.java @@ -0,0 +1,63 @@ +package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata; + +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.MessageBindingProcessor; +import io.swagger.v3.oas.models.media.Schema; +import io.swagger.v3.oas.models.media.StringSchema; +import org.springframework.context.EmbeddedValueResolverAware; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.springframework.util.StringValueResolver; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Optional; + +@Component +public class KafkaMessageBindingProcessor implements MessageBindingProcessor, EmbeddedValueResolverAware { + private StringValueResolver resolver; + + @Override + public void setEmbeddedValueResolver(StringValueResolver resolver) { + this.resolver = resolver; + } + + @Override + public Optional process(Method method) { + return Arrays.stream(method.getAnnotations()) + .filter(annotation -> annotation instanceof KafkaAsyncOperationBinding) + .map(annotation -> (KafkaAsyncOperationBinding) annotation) + .findAny() + .map(this::mapToMessageBinding); + } + + private ProcessedMessageBinding mapToMessageBinding(KafkaAsyncOperationBinding bindingAnnotation) { + KafkaAsyncMessageBinding messageBinding = bindingAnnotation.messageBinding(); + KafkaMessageBinding kafkaMessageBinding = KafkaMessageBinding.builder() + .bindingVersion(resolveOrNull(messageBinding.bindingVersion())) + .key(resolveSchemaOrNull(messageBinding)) + .build(); + + return new ProcessedMessageBinding(bindingAnnotation.type(), kafkaMessageBinding); + } + + private String resolveOrNull(String stringValue) { + return StringUtils.isEmpty(stringValue) ? null : resolver.resolveStringValue(stringValue); + } + + private Schema resolveSchemaOrNull(KafkaAsyncMessageBinding messageBinding) { + Schema schemaDefinition = null; + switch (messageBinding.key().type()) { + case UNDEFINED_KEY: + break; + case STRING_KEY: + schemaDefinition = new StringSchema() + .example(messageBinding.key().example()) + .description(resolveOrNull(messageBinding.key().description())); + } + + return schemaDefinition; + } +} diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java index 699e5baba..61ec5e319 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java @@ -20,4 +20,29 @@ String clientId() default ""; String bindingVersion() default ""; + KafkaAsyncMessageBinding messageBinding() default @KafkaAsyncMessageBinding(); + + @Retention(RetentionPolicy.CLASS) + @Target({}) + @interface KafkaAsyncMessageBinding { + + KafkaAsyncKey key() default @KafkaAsyncKey(type = KafkaAsyncKey.KafkaKeyTypes.UNDEFINED_KEY); + + String bindingVersion() default ""; + } + @Retention(RetentionPolicy.CLASS) + @Target({}) + @interface KafkaAsyncKey { + + KafkaKeyTypes type() default KafkaKeyTypes.STRING_KEY; + + String example() default ""; + + String description() default ""; + + enum KafkaKeyTypes { + UNDEFINED_KEY, + STRING_KEY + } + } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java index f963a0fbf..60bc98adf 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaConsumerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -15,5 +16,6 @@ public KafkaConsumerData(String topicName, Class payloadType, String descript this.payloadType = payloadType; this.headers = headers != null ? headers : this.headers; this.operationBinding = ImmutableMap.of("kafka", new KafkaOperationBinding()); + this.messageBinding = ImmutableMap.of("kafka", new KafkaMessageBinding()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java index 21caf044b..a7df15ca7 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/KafkaProducerData.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.types; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.google.common.collect.ImmutableMap; import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; @@ -16,6 +17,7 @@ public KafkaProducerData(String topicName, Class payloadType, String descript this.payloadType = payloadType; this.headers = headers != null ? headers : this.headers; this.operationBinding = ImmutableMap.of("kafka", new KafkaOperationBinding()); + this.messageBinding = ImmutableMap.of("kafka", new KafkaMessageBinding()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java index abb14bd2a..feaf0331e 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java @@ -25,18 +25,18 @@ public boolean isEnabled() { return kafkaTemplate.isPresent(); } - public void send(String topic, Map headers, Map payload) { + public void send(String topic, String key, Map headers, Map payload) { if (kafkaTemplate.isPresent()) { - kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload)); + kafkaTemplate.get().send(buildProducerRecord(topic, key, headers, payload)); } else { log.warn("Kafka producer is not configured"); } } - private ProducerRecord> buildProducerRecord(String topic, Map headers, Map payload) { + private ProducerRecord> buildProducerRecord(String topic, String key, Map headers, Map payload) { List
recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList(); - return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders); + return new ProducerRecord<>(topic, null, null, key, payload, recordHeaders); } private List
buildHeaders(Map headers) { diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java index 97aafff1a..42f99b58a 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java @@ -44,7 +44,7 @@ public class SpringwolfKafkaControllerTest { @Test public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() { try { - String content = "{\"headers\": null, \"payload\": null }"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": null }"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -58,7 +58,7 @@ public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() { public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(false); - String content = "{\"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -69,14 +69,14 @@ public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() throw public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(true); - String content = "{\"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; + String content = "{\"bindings\": null, \"headers\": null, \"payload\": { \"some-key\" : \"some-value\" }}"; mvc.perform(post("/springwolf/kafka/publish").param("topic", "test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) .andExpect(status().isOk()); - verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), payloadCaptor.capture()); + verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), isNull(), payloadCaptor.capture()); assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-key", "some-value")); } @@ -85,14 +85,31 @@ public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() throws Ex public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAreSend() throws Exception { when(springwolfKafkaProducer.isEnabled()).thenReturn(true); - String content = "{\"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; + String content = "{\"bindings\": null, \"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") .contentType(MediaType.APPLICATION_JSON) .content(content)) .andExpect(status().isOk()); - verify(springwolfKafkaProducer).send(eq("test-topic"), headerCaptor.capture(), payloadCaptor.capture()); + verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), headerCaptor.capture(), payloadCaptor.capture()); + + assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-payload-key", "some-payload-value")); + } + + @Test + public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAndBindingsAreSend() throws Exception{ + when(springwolfKafkaProducer.isEnabled()).thenReturn(true); + + String content = "{\"bindings\": {\"key\": \"kafka-key-value\"}, \"headers\": { \"some-header-key\" : \"some-header-value\" }, \"payload\": { \"some-payload-key\" : \"some-payload-value\" }}"; + + mvc.perform(post("/springwolf/kafka/publish?topic=test-topic") + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andExpect(status().isOk()); + + verify(springwolfKafkaProducer).send(eq("test-topic"), eq("kafka-key-value"), headerCaptor.capture(), payloadCaptor.capture()); assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); assertThat(payloadCaptor.getValue()).isEqualTo(singletonMap("some-payload-key", "some-payload-value")); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java index badcbc16e..1edd8e5f3 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/ClassLevelKafkaListenerScannerTest.java @@ -1,6 +1,7 @@ package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -76,12 +77,16 @@ public void scan_componentWithMultipleKafkaListenersAndHandlers() { .name(SimpleFoo.class.getName()) .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) + .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Message barMessage = Message.builder() .name(SimpleBar.class.getName()) .title(SimpleBar.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName())) + .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleBar.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -137,6 +142,7 @@ public void scan_componentWithSingleKafkaHandlerMethod() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -169,6 +175,7 @@ public void scan_componentWithMultipleKafkaHandlerMethods() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Message barMessage = Message.builder() @@ -176,6 +183,7 @@ public void scan_componentWithMultipleKafkaHandlerMethods() { .title(SimpleBar.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleBar.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -209,6 +217,7 @@ public void scan_componentWithSingleKafkaHandlerMethod_batchPayload() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName("SpringKafkaDefaultHeaders-" + SimpleFoo.class.getSimpleName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java index b2cff4b05..9a312c347 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/MethodLevelKafkaListenerScannerTest.java @@ -2,6 +2,7 @@ import com.asyncapi.v2.binding.OperationBinding; import com.asyncapi.v2.binding.kafka.KafkaChannelBinding; +import com.asyncapi.v2.binding.kafka.KafkaMessageBinding; import com.asyncapi.v2.binding.kafka.KafkaOperationBinding; import com.asyncapi.v2.model.channel.ChannelItem; import com.asyncapi.v2.model.channel.operation.Operation; @@ -80,6 +81,7 @@ public void scan_componentHasKafkaListenerMethods_hardCodedTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -112,6 +114,7 @@ public void scan_componentHasKafkaListenerMethods_embeddedValueTopic() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -179,6 +182,7 @@ public void scan_componentHasKafkaListenerMethods_multipleParamsWithPayloadAnnot .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() @@ -211,6 +215,7 @@ public void scan_componentHasKafkaListenerMethods_batchPayload() { .title(SimpleFoo.class.getSimpleName()) .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName())) .headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName())) + .bindings(ImmutableMap.of("kafka", new KafkaMessageBinding())) .build(); Operation operation = Operation.builder() diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java index 1f0a2a151..17a62c2a7 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java @@ -49,7 +49,7 @@ public void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateCon public void testSendingKafkaMessageWithoutHeaders() { Map payload = Collections.singletonMap("some", "field"); - springwolfKafkaProducer.send("test-topic", null, payload); + springwolfKafkaProducer.send("test-topic", null, null, payload); verify(kafkaTemplate).send(recordArgumentCaptor.capture()); @@ -67,7 +67,7 @@ public void testSendingKafkaMessageWithHeaders() { Map payload = Collections.singletonMap("some", "field"); Map headers = Collections.singletonMap("header-key", "header"); - springwolfKafkaProducer.send("test-topic", headers, payload); + springwolfKafkaProducer.send("test-topic", null, headers, payload); verify(kafkaTemplate).send(recordArgumentCaptor.capture());