Skip to content

Feature/add support for message binding #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -13,6 +20,7 @@ public class MessageHelper {
private static final String ONE_OF = "oneOf";

private static final Comparator<Message> byMessageName = Comparator.comparing(Message::getName);

private static final Supplier<Set<Message>> messageSupplier = () -> new TreeSet<>(byMessageName);

public static Object toMessageObjectOrComposition(Set<Message> messages) {
Expand All @@ -22,23 +30,22 @@ public static Object toMessageObjectOrComposition(Set<Message> messages) {
case 1:
return messages.toArray()[0];
default:
return ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier)));
return ImmutableMap.of(ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier))));
}
}

@SuppressWarnings("unchecked")
public static Set<Message> messageObjectToSet(Object messageObject) {
if (messageObject instanceof Message) {
return new HashSet<>(Arrays.asList((Message) messageObject));
return new HashSet<>(Collections.singletonList((Message) messageObject));
}

if (messageObject instanceof Map) {
Set<Message> messages = ((Map<String, Set<Message>>) messageObject).get(ONE_OF);
List<Message> messages = ((Map<String, List<Message>>) messageObject).get(ONE_OF);
return new HashSet<>(messages);
}

log.warn("Message object must contain either a Message or a Map<String, Set<Message>, but contained: {}", messageObject.getClass());
return new HashSet<>();
}

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package io.github.stavshamir.springwolf.asyncapi.dtos;
package io.github.stavshamir.springwolf.asyncapi.controller.dtos;

import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;

import java.util.Map;


@Data
@Builder
@Jacksonized
public class KafkaMessageDto {
public class MessageDto {

private final Map<String, String> bindings;

private final Map<String, String> headers;

private final Map<String, ?> payload;

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;

import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;

Expand Down Expand Up @@ -42,17 +50,25 @@ public static Map<String, ChannelItem> merge(List<Map.Entry<String, ChannelItem>
}

private static Operation mergeOperation(Operation operation, Operation otherOperation) {
Set<Message> mergedMessages = getMessages(operation);
Set<Message> currentEntryMessages = getMessages(otherOperation);
mergedMessages.addAll(currentEntryMessages);

Operation mergedOperation = operation != null ? operation : otherOperation;

Set<Message> mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation));
if (!mergedMessages.isEmpty()) {
mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages));
}
return mergedOperation;
}

private static Set<Message> mergeMessages(Set<Message> messages, Set<Message> otherMessages) {
Map<String, Message> nameToMessage = messages.stream().collect(Collectors.toMap(Message::getName, Function.identity()));

for (Message otherMessage : otherMessages) {
nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage);
}

return new HashSet<>(nameToMessage.values());
}

private static Set<Message> getMessages(Operation operation) {
return Optional
.ofNullable(operation)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
Expand All @@ -18,7 +19,13 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -66,12 +73,18 @@ public abstract class AbstractClassLevelListenerScanner<ClassAnnotation extends

/**
* @param annotation An instance of a listener annotation.
* @return A map containing an channel binding pointed to by the protocol binding name.
* @return A map containing a channel binding pointed to by the protocol binding name.
*/
protected abstract Map<String, ? extends ChannelBinding> buildChannelBinding(ClassAnnotation annotation);

/**
* Can be overriden by implementations
* @param method The specific method. Can be used to extract message binding from an annotation.
* @return A map containing a message binding pointed to by the protocol binding name.
*/
protected abstract Map<String, ? extends MessageBinding> buildMessageBinding(Method method);

/**
* Can be overridden by implementations
*
* @param method The specific method. Can be used to extract the payload type
* @return The AsyncHeaders
Expand Down Expand Up @@ -159,6 +172,7 @@ private Message buildMessage(Method method) {
.title(modelName)
.payload(PayloadReference.fromModelName(modelName))
.headers(HeaderReference.fromModelName(headerModelName))
.bindings(buildMessageBinding(method))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
Expand All @@ -17,7 +18,11 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
Expand Down Expand Up @@ -62,6 +67,12 @@ public Map<String, ChannelItem> scan() {
*/
protected abstract Map<String, ? extends OperationBinding> buildOperationBinding(T annotation);

/**
* @param annotation An instance of a listener annotation.
* @return A map containing a message binding pointed to by the protocol binding name.
*/
protected abstract Map<String, ? extends MessageBinding> buildMessageBinding(T annotation);

/**
* @param method The listener method.
* @return The class object of the payload received by the listener.
Expand All @@ -88,16 +99,18 @@ private Map.Entry<String, ChannelItem> mapMethodToChannel(Method method) {

Map<String, ? extends ChannelBinding> channelBinding = buildChannelBinding(annotation);
Map<String, ? extends OperationBinding> operationBinding = buildOperationBinding(annotation);
Map<String, ? extends MessageBinding> messageBinding = buildMessageBinding(annotation);
Class<?> payload = getPayloadType(method);
String operationId = channelName + "_publish_" + method.getName();
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, operationId);
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, messageBinding, operationId);

return Maps.immutableEntry(channelName, channel);
}

private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBinding,
Class<?> payloadType,
Map<String, ? extends OperationBinding> operationBinding,
Map<String, ? extends MessageBinding> messageBinding,
String operationId) {
String modelName = schemasService.register(payloadType);
String headerModelName = schemasService.register(AsyncHeaders.NOT_DOCUMENTED);
Expand All @@ -107,6 +120,7 @@ private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBi
.title(modelName)
.payload(PayloadReference.fromModelName(modelName))
.headers(HeaderReference.fromModelName(headerModelName))
.bindings(messageBinding)
.build();

Operation operation = Operation.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private Message buildMessage(OperationData operationData) {
.description(operationData.getDescription())
.payload(PayloadReference.fromModelName(modelName))
.headers(HeaderReference.fromModelName(headerModelName))
.bindings(operationData.getMessageBinding())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata;

import com.asyncapi.v2.binding.MessageBinding;
import lombok.Data;

@Data
public class ProcessedMessageBinding {
private final String type;
private final MessageBinding binding;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;

import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedOperationBinding;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
Expand Down Expand Up @@ -59,11 +61,19 @@ private static String getDescription(List<AsyncOperation.Headers.Header> value)
.orElse(null);
}

public static Map<String, OperationBinding> processBindingFromAnnotation(Method method, List<OperationBindingProcessor> operationBindingProcessors) {
public static Map<String, OperationBinding> processOperationBindingFromAnnotation(Method method, List<OperationBindingProcessor> operationBindingProcessors) {
return operationBindingProcessors.stream()
.map(operationBindingProcessor -> operationBindingProcessor.process(method))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ProcessedOperationBinding::getType, ProcessedOperationBinding::getBinding));
}

public static Map<String, MessageBinding> processMessageBindingFromAnnotation(Method method, List<MessageBindingProcessor> messageBindingProcessors) {
return messageBindingProcessors.stream()
.map(messageBindingProcessor -> messageBindingProcessor.process(method))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;

import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
Expand Down Expand Up @@ -32,6 +33,8 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner

private final List<OperationBindingProcessor> operationBindingProcessors;

private final List<MessageBindingProcessor> messageBindingProcessors;

@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
this.resolver = resolver;
Expand Down Expand Up @@ -63,7 +66,8 @@ private Set<Method> getAnnotatedMethods(Class<?> type) {
private OperationData mapMethodToOperationData(Method method) {
log.debug("Mapping method \"{}\" to channels", method.getName());

Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors);
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
Map<String, MessageBinding> messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);

Class<AsyncListener> annotationClass = AsyncListener.class;
AsyncListener annotation = Optional.of(method.getAnnotation(annotationClass))
Expand All @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) {
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op))
.payloadType(payloadType)
.operationBinding(operationBindings)
.messageBinding(messageBindings)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;

import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
Expand Down Expand Up @@ -32,6 +33,8 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne

private final List<OperationBindingProcessor> operationBindingProcessors;

private final List<MessageBindingProcessor> messageBindingProcessors;

@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
this.resolver = resolver;
Expand Down Expand Up @@ -63,7 +66,8 @@ private Set<Method> getAnnotatedMethods(Class<?> type) {
private OperationData mapMethodToOperationData(Method method) {
log.debug("Mapping method \"{}\" to channels", method.getName());

Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors);
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
Map<String, MessageBinding> messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);

Class<AsyncPublisher> annotationClass = AsyncPublisher.class;
AsyncPublisher annotation = Optional.of(method.getAnnotation(annotationClass))
Expand All @@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) {
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op))
.payloadType(payloadType)
.operationBinding(operationBindings)
.messageBinding(messageBindings)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;

import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding;

import java.lang.reflect.Method;
import java.util.Optional;

public interface MessageBindingProcessor {

/**
* Process the methods annotated with {@link AsyncPublisher} and {@link AsyncListener}
* for protocol specific messageBinding annotations, method parameters, etc
*
* @param method The method being annotated
* @return A message binding, if found
*/
Optional<ProcessedMessageBinding> process(Method method);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.stavshamir.springwolf.asyncapi.types;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.MessageBinding;
import com.asyncapi.v2.binding.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -61,4 +62,13 @@ public class ConsumerData implements OperationData {
*/
protected Map<String, ? extends OperationBinding> operationBinding;

/**
* The message binding of the consumer.
* <br>
* For example:
* <code>
* ImmutableMap.of("kafka", new KafkaMessageBinding())
* </code>
*/
protected Map<String, ? extends MessageBinding> messageBinding;
}
Loading