Skip to content

Commit 0efbde7

Browse files
authored
Feature/add support for message binding (#130)
* Fix: Remove that EqualsAndHashCode of Message only uses the name, Change MessageHelper to not expose the TreeSet that is used for deduplication because a TreeSet breaks equals * Refactor code to merge messages in ChannelMerger to respect the priority without relying on equals based on message name * Add extension point to supply protocol specific message bindings in plugins * Add KafkaMessageBindingProcessor and KafkaAsyncMessageBinding annotation as part of KafkaAsyncOperationBinding to support documentation of kafka keys as message binding * Extend springwolf-kafka-example to include documentation of kafka keys as message binding by using annotations * Extend abstract class and method level listener scanners to include an empty message binding so that there is always a message binding * Extend springwolf-kafka-plugin to implement the new methods in abstract class and method level listener scanners to include an empty message binding so that there is always a message binding * Extend springwolf-amqp-plugin to implement the new methods in abstract class and method level listener scanners to include an empty message binding so that there is always a message binding * Add option to document an example for kafka key bindings, change structure of KafkaAsyncMessageBinding to contain a new sub-annotation KafkaAsyncKey to make it more obvious what is part of the key documentation and which is part of the kafka message binding, Add default values for message binding to KafkaConsumer- and KafkaProducerData * Adapt springwolf-kafka-example to the new annotation structure, to contain an example for the key and to the change that a kafka message binding is always present * Extend producer controllers of springwolf-kafka-plugin and springwolf-amqp-pluging to support message bindings which are sent by springwolf-ui * Fix java doc of MessageBindingProcessor after rebase * Move KafkaMessageDtoDeserializationTest to core and rename to MessageDtoDeserializationTest
1 parent 8b55094 commit 0efbde7

File tree

47 files changed

+525
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+525
-67
lines changed

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/MessageHelper.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@
44
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
55
import lombok.extern.slf4j.Slf4j;
66

7-
import java.util.*;
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.Comparator;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Set;
14+
import java.util.TreeSet;
815
import java.util.function.Supplier;
916
import java.util.stream.Collectors;
1017

@@ -13,6 +20,7 @@ public class MessageHelper {
1320
private static final String ONE_OF = "oneOf";
1421

1522
private static final Comparator<Message> byMessageName = Comparator.comparing(Message::getName);
23+
1624
private static final Supplier<Set<Message>> messageSupplier = () -> new TreeSet<>(byMessageName);
1725

1826
public static Object toMessageObjectOrComposition(Set<Message> messages) {
@@ -22,23 +30,22 @@ public static Object toMessageObjectOrComposition(Set<Message> messages) {
2230
case 1:
2331
return messages.toArray()[0];
2432
default:
25-
return ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier)));
33+
return ImmutableMap.of(ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier))));
2634
}
2735
}
2836

2937
@SuppressWarnings("unchecked")
3038
public static Set<Message> messageObjectToSet(Object messageObject) {
3139
if (messageObject instanceof Message) {
32-
return new HashSet<>(Arrays.asList((Message) messageObject));
40+
return new HashSet<>(Collections.singletonList((Message) messageObject));
3341
}
3442

3543
if (messageObject instanceof Map) {
36-
Set<Message> messages = ((Map<String, Set<Message>>) messageObject).get(ONE_OF);
44+
List<Message> messages = ((Map<String, List<Message>>) messageObject).get(ONE_OF);
3745
return new HashSet<>(messages);
3846
}
3947

4048
log.warn("Message object must contain either a Message or a Map<String, Set<Message>, but contained: {}", messageObject.getClass());
4149
return new HashSet<>();
4250
}
43-
4451
}
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
package io.github.stavshamir.springwolf.asyncapi.dtos;
1+
package io.github.stavshamir.springwolf.asyncapi.controller.dtos;
22

33
import lombok.Builder;
44
import lombok.Data;
55
import lombok.extern.jackson.Jacksonized;
66

77
import java.util.Map;
88

9-
109
@Data
1110
@Builder
1211
@Jacksonized
13-
public class KafkaMessageDto {
12+
public class MessageDto {
13+
14+
private final Map<String, String> bindings;
1415

1516
private final Map<String, String> headers;
1617

1718
private final Map<String, ?> payload;
18-
1919
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ChannelMerger.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@
55
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
66
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
77

8-
import java.util.*;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Optional;
12+
import java.util.Set;
13+
import java.util.TreeMap;
14+
import java.util.TreeSet;
15+
import java.util.function.Function;
16+
import java.util.stream.Collectors;
917

1018
import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
1119

@@ -42,17 +50,25 @@ public static Map<String, ChannelItem> merge(List<Map.Entry<String, ChannelItem>
4250
}
4351

4452
private static Operation mergeOperation(Operation operation, Operation otherOperation) {
45-
Set<Message> mergedMessages = getMessages(operation);
46-
Set<Message> currentEntryMessages = getMessages(otherOperation);
47-
mergedMessages.addAll(currentEntryMessages);
48-
4953
Operation mergedOperation = operation != null ? operation : otherOperation;
54+
55+
Set<Message> mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation));
5056
if (!mergedMessages.isEmpty()) {
5157
mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages));
5258
}
5359
return mergedOperation;
5460
}
5561

62+
private static Set<Message> mergeMessages(Set<Message> messages, Set<Message> otherMessages) {
63+
Map<String, Message> nameToMessage = messages.stream().collect(Collectors.toMap(Message::getName, Function.identity()));
64+
65+
for (Message otherMessage : otherMessages) {
66+
nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage);
67+
}
68+
69+
return new HashSet<>(nameToMessage.values());
70+
}
71+
5672
private static Set<Message> getMessages(Operation operation) {
5773
return Optional
5874
.ofNullable(operation)

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation;
22

33
import com.asyncapi.v2.binding.ChannelBinding;
4+
import com.asyncapi.v2.binding.MessageBinding;
45
import com.asyncapi.v2.binding.OperationBinding;
56
import com.asyncapi.v2.model.channel.ChannelItem;
67
import com.asyncapi.v2.model.channel.operation.Operation;
@@ -18,7 +19,13 @@
1819

1920
import java.lang.annotation.Annotation;
2021
import java.lang.reflect.Method;
21-
import java.util.*;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Comparator;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.Set;
28+
import java.util.TreeSet;
2229
import java.util.function.Supplier;
2330
import java.util.stream.Collectors;
2431

@@ -66,12 +73,18 @@ public abstract class AbstractClassLevelListenerScanner<ClassAnnotation extends
6673

6774
/**
6875
* @param annotation An instance of a listener annotation.
69-
* @return A map containing an channel binding pointed to by the protocol binding name.
76+
* @return A map containing a channel binding pointed to by the protocol binding name.
7077
*/
7178
protected abstract Map<String, ? extends ChannelBinding> buildChannelBinding(ClassAnnotation annotation);
7279

7380
/**
74-
* Can be overriden by implementations
81+
* @param method The specific method. Can be used to extract message binding from an annotation.
82+
* @return A map containing a message binding pointed to by the protocol binding name.
83+
*/
84+
protected abstract Map<String, ? extends MessageBinding> buildMessageBinding(Method method);
85+
86+
/**
87+
* Can be overridden by implementations
7588
*
7689
* @param method The specific method. Can be used to extract the payload type
7790
* @return The AsyncHeaders
@@ -159,6 +172,7 @@ private Message buildMessage(Method method) {
159172
.title(modelName)
160173
.payload(PayloadReference.fromModelName(modelName))
161174
.headers(HeaderReference.fromModelName(headerModelName))
175+
.bindings(buildMessageBinding(method))
162176
.build();
163177
}
164178

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractMethodLevelListenerScanner.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation;
22

33
import com.asyncapi.v2.binding.ChannelBinding;
4+
import com.asyncapi.v2.binding.MessageBinding;
45
import com.asyncapi.v2.binding.OperationBinding;
56
import com.asyncapi.v2.model.channel.ChannelItem;
67
import com.asyncapi.v2.model.channel.operation.Operation;
@@ -17,7 +18,11 @@
1718

1819
import java.lang.annotation.Annotation;
1920
import java.lang.reflect.Method;
20-
import java.util.*;
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.Set;
2126

2227
import static java.util.stream.Collectors.toMap;
2328
import static java.util.stream.Collectors.toSet;
@@ -62,6 +67,12 @@ public Map<String, ChannelItem> scan() {
6267
*/
6368
protected abstract Map<String, ? extends OperationBinding> buildOperationBinding(T annotation);
6469

70+
/**
71+
* @param annotation An instance of a listener annotation.
72+
* @return A map containing a message binding pointed to by the protocol binding name.
73+
*/
74+
protected abstract Map<String, ? extends MessageBinding> buildMessageBinding(T annotation);
75+
6576
/**
6677
* @param method The listener method.
6778
* @return The class object of the payload received by the listener.
@@ -88,16 +99,18 @@ private Map.Entry<String, ChannelItem> mapMethodToChannel(Method method) {
8899

89100
Map<String, ? extends ChannelBinding> channelBinding = buildChannelBinding(annotation);
90101
Map<String, ? extends OperationBinding> operationBinding = buildOperationBinding(annotation);
102+
Map<String, ? extends MessageBinding> messageBinding = buildMessageBinding(annotation);
91103
Class<?> payload = getPayloadType(method);
92104
String operationId = channelName + "_publish_" + method.getName();
93-
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, operationId);
105+
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding, messageBinding, operationId);
94106

95107
return Maps.immutableEntry(channelName, channel);
96108
}
97109

98110
private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBinding,
99111
Class<?> payloadType,
100112
Map<String, ? extends OperationBinding> operationBinding,
113+
Map<String, ? extends MessageBinding> messageBinding,
101114
String operationId) {
102115
String modelName = schemasService.register(payloadType);
103116
String headerModelName = schemasService.register(AsyncHeaders.NOT_DOCUMENTED);
@@ -107,6 +120,7 @@ private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBi
107120
.title(modelName)
108121
.payload(PayloadReference.fromModelName(modelName))
109122
.headers(HeaderReference.fromModelName(headerModelName))
123+
.bindings(messageBinding)
110124
.build();
111125

112126
Operation operation = Operation.builder()

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private Message buildMessage(OperationData operationData) {
9999
.description(operationData.getDescription())
100100
.payload(PayloadReference.fromModelName(modelName))
101101
.headers(HeaderReference.fromModelName(headerModelName))
102+
.bindings(operationData.getMessageBinding())
102103
.build();
103104
}
104105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata;
2+
3+
import com.asyncapi.v2.binding.MessageBinding;
4+
import lombok.Data;
5+
6+
@Data
7+
public class ProcessedMessageBinding {
8+
private final String type;
9+
private final MessageBinding binding;
10+
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncAnnotationScannerUtil.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;
22

3+
import com.asyncapi.v2.binding.MessageBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
5+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding;
46
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedOperationBinding;
57
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema;
68
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
@@ -59,11 +61,19 @@ private static String getDescription(List<AsyncOperation.Headers.Header> value)
5961
.orElse(null);
6062
}
6163

62-
public static Map<String, OperationBinding> processBindingFromAnnotation(Method method, List<OperationBindingProcessor> operationBindingProcessors) {
64+
public static Map<String, OperationBinding> processOperationBindingFromAnnotation(Method method, List<OperationBindingProcessor> operationBindingProcessors) {
6365
return operationBindingProcessors.stream()
6466
.map(operationBindingProcessor -> operationBindingProcessor.process(method))
6567
.filter(Optional::isPresent)
6668
.map(Optional::get)
6769
.collect(Collectors.toMap(ProcessedOperationBinding::getType, ProcessedOperationBinding::getBinding));
6870
}
71+
72+
public static Map<String, MessageBinding> processMessageBindingFromAnnotation(Method method, List<MessageBindingProcessor> messageBindingProcessors) {
73+
return messageBindingProcessors.stream()
74+
.map(messageBindingProcessor -> messageBindingProcessor.process(method))
75+
.filter(Optional::isPresent)
76+
.map(Optional::get)
77+
.collect(Collectors.toMap(ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding));
78+
}
6979
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncListenerAnnotationScanner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;
22

3+
import com.asyncapi.v2.binding.MessageBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
56
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
@@ -32,6 +33,8 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner
3233

3334
private final List<OperationBindingProcessor> operationBindingProcessors;
3435

36+
private final List<MessageBindingProcessor> messageBindingProcessors;
37+
3538
@Override
3639
public void setEmbeddedValueResolver(StringValueResolver resolver) {
3740
this.resolver = resolver;
@@ -63,7 +66,8 @@ private Set<Method> getAnnotatedMethods(Class<?> type) {
6366
private OperationData mapMethodToOperationData(Method method) {
6467
log.debug("Mapping method \"{}\" to channels", method.getName());
6568

66-
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors);
69+
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
70+
Map<String, MessageBinding> messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);
6771

6872
Class<AsyncListener> annotationClass = AsyncListener.class;
6973
AsyncListener annotation = Optional.of(method.getAnnotation(annotationClass))
@@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) {
7882
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op))
7983
.payloadType(payloadType)
8084
.operationBinding(operationBindings)
85+
.messageBinding(messageBindings)
8186
.build();
8287
}
8388

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/AsyncPublisherAnnotationScanner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;
22

3+
import com.asyncapi.v2.binding.MessageBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
56
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
@@ -32,6 +33,8 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne
3233

3334
private final List<OperationBindingProcessor> operationBindingProcessors;
3435

36+
private final List<MessageBindingProcessor> messageBindingProcessors;
37+
3538
@Override
3639
public void setEmbeddedValueResolver(StringValueResolver resolver) {
3740
this.resolver = resolver;
@@ -63,7 +66,8 @@ private Set<Method> getAnnotatedMethods(Class<?> type) {
6366
private OperationData mapMethodToOperationData(Method method) {
6467
log.debug("Mapping method \"{}\" to channels", method.getName());
6568

66-
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processBindingFromAnnotation(method, operationBindingProcessors);
69+
Map<String, OperationBinding> operationBindings = AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
70+
Map<String, MessageBinding> messageBindings = AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);
6771

6872
Class<AsyncPublisher> annotationClass = AsyncPublisher.class;
6973
AsyncPublisher annotation = Optional.of(method.getAnnotation(annotationClass))
@@ -78,6 +82,7 @@ private OperationData mapMethodToOperationData(Method method) {
7882
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op))
7983
.payloadType(payloadType)
8084
.operationBinding(operationBindings)
85+
.messageBinding(messageBindings)
8186
.build();
8287
}
8388

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;
2+
3+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedMessageBinding;
4+
5+
import java.lang.reflect.Method;
6+
import java.util.Optional;
7+
8+
public interface MessageBindingProcessor {
9+
10+
/**
11+
* Process the methods annotated with {@link AsyncPublisher} and {@link AsyncListener}
12+
* for protocol specific messageBinding annotations, method parameters, etc
13+
*
14+
* @param method The method being annotated
15+
* @return A message binding, if found
16+
*/
17+
Optional<ProcessedMessageBinding> process(Method method);
18+
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ConsumerData.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.github.stavshamir.springwolf.asyncapi.types;
22

33
import com.asyncapi.v2.binding.ChannelBinding;
4+
import com.asyncapi.v2.binding.MessageBinding;
45
import com.asyncapi.v2.binding.OperationBinding;
56
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
67
import lombok.AllArgsConstructor;
@@ -61,4 +62,13 @@ public class ConsumerData implements OperationData {
6162
*/
6263
protected Map<String, ? extends OperationBinding> operationBinding;
6364

65+
/**
66+
* The message binding of the consumer.
67+
* <br>
68+
* For example:
69+
* <code>
70+
* ImmutableMap.of("kafka", new KafkaMessageBinding())
71+
* </code>
72+
*/
73+
protected Map<String, ? extends MessageBinding> messageBinding;
6474
}

0 commit comments

Comments
 (0)