Skip to content

Commit 6dbd839

Browse files
authored
Annotations for publisher and subscriber documentation (#118)
1 parent 80cc469 commit 6dbd839

File tree

43 files changed

+1419
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1419
-125
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
springwolf-core/build/
22
springwolf-ui/build/
33
springwolf-examples/springwolf-amqp-example/build/
4+
springwolf-examples/springwolf-cloud-stream-example/build/
45
springwolf-examples/springwolf-kafka-example/build/
56
springwolf-plugins/springwolf-amqp-plugin/build/
7+
springwolf-plugins/springwolf-cloud-stream-plugin/build/
68
springwolf-plugins/springwolf-kafka-plugin/build/
79
springwolf-add-ons/springwolf-common-model-converters/build/
810

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

3+
import com.asyncapi.v2.binding.amqp.AMQPOperationBinding;
34
import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
45
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
56
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -11,6 +12,7 @@
1112
import com.fasterxml.jackson.databind.module.SimpleModule;
1213
import io.github.stavshamir.springwolf.asyncapi.serializers.EmptyChannelBindingSerializer;
1314
import io.github.stavshamir.springwolf.asyncapi.serializers.EmptyOperationBindingSerializer;
15+
import io.github.stavshamir.springwolf.asyncapi.serializers.AmqpOperationBindingSerializer;
1416
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaChannelBindingSerializer;
1517
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaOperationBindingSerializer;
1618
import io.github.stavshamir.springwolf.asyncapi.types.AsyncAPI;
@@ -36,6 +38,7 @@ private void registerKafkaOperationBindingSerializer() {
3638
SimpleModule module = new SimpleModule();
3739
module.addSerializer(EmptyChannelBinding.class, new EmptyChannelBindingSerializer());
3840
module.addSerializer(EmptyOperationBinding.class, new EmptyOperationBindingSerializer());
41+
module.addSerializer(AMQPOperationBinding.class, new AmqpOperationBindingSerializer());
3942
module.addSerializer(KafkaChannelBinding.class, new KafkaChannelBindingSerializer());
4043
module.addSerializer(KafkaOperationBinding.class, new KafkaOperationBindingSerializer());
4144
jsonMapper.registerModule(module);

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/DefaultChannelsService.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,40 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

33
import com.asyncapi.v2.model.channel.ChannelItem;
4-
import com.asyncapi.v2.model.channel.operation.Operation;
4+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
55
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
6-
import java.util.Map;
7-
import java.util.Set;
8-
import java.util.TreeMap;
9-
106
import lombok.RequiredArgsConstructor;
117
import lombok.extern.slf4j.Slf4j;
128
import org.springframework.stereotype.Service;
139

1410
import javax.annotation.PostConstruct;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.TreeMap;
1515

1616
@Slf4j
1717
@Service
1818
@RequiredArgsConstructor
1919
public class DefaultChannelsService implements ChannelsService {
2020

21-
private final Set<? extends ChannelsScanner> channelsScanners;
21+
private final List<? extends ChannelsScanner> channelsScanners;
2222
private final Map<String, ChannelItem> channels = new TreeMap<>();
2323

2424
@PostConstruct
2525
void findChannels() {
26+
List<Map.Entry<String, ChannelItem>> foundChannelItems = new ArrayList<>();
2627

2728
for (ChannelsScanner scanner : channelsScanners) {
2829
try {
2930
Map<String, ChannelItem> channels = scanner.scan();
30-
processFoundChannels(channels);
31+
foundChannelItems.addAll(channels.entrySet());
3132
} catch (Exception e) {
3233
log.error("An error was encountered during channel scanning with {}: {}", scanner, e.getMessage());
3334
}
3435
}
35-
}
3636

37-
private void processFoundChannels(Map<String, ChannelItem> foundChannels) {
38-
for (Map.Entry<String, ChannelItem> foundChannel: foundChannels.entrySet()) {
39-
if (!this.channels.containsKey(foundChannel.getKey())) {
40-
this.channels.put(foundChannel.getKey(), foundChannel.getValue());
41-
} else {
42-
ChannelItem existingChannel = this.channels.get(foundChannel.getKey());
43-
44-
Operation subscribeOperation = foundChannel.getValue().getSubscribe();
45-
if(subscribeOperation != null) {
46-
existingChannel.setSubscribe(subscribeOperation);
47-
}
48-
49-
Operation publishOperation = foundChannel.getValue().getPublish();
50-
if(publishOperation != null) {
51-
existingChannel.setPublish(publishOperation);
52-
}
53-
}
54-
}
37+
this.channels.putAll(ChannelMerger.merge(foundChannelItems));
5538
}
5639

5740
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
2+
3+
import com.asyncapi.v2.model.channel.ChannelItem;
4+
import com.asyncapi.v2.model.channel.operation.Operation;
5+
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
6+
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
7+
8+
import java.util.*;
9+
10+
import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
11+
12+
13+
/**
14+
* Util to merge multiple {@link ChannelItem}s
15+
*/
16+
public class ChannelMerger {
17+
18+
/**
19+
* Merges multiple channelItems by channel name
20+
* <p>
21+
* Given two channelItems for the same channel name, the first seen ChannelItem is used
22+
* If an operation is null, the next non-null operation is used
23+
* Messages within operations are merged
24+
*
25+
* @param channelEntries Ordered pairs of channel name to ChannelItem
26+
* @return A map of channelName to a single ChannelItem
27+
*/
28+
public static Map<String, ChannelItem> merge(List<Map.Entry<String, ChannelItem>> channelEntries) {
29+
Map<String, ChannelItem> mergedChannels = new TreeMap<>();
30+
31+
for (Map.Entry<String, ChannelItem> entry : channelEntries) {
32+
if (!mergedChannels.containsKey(entry.getKey())) {
33+
mergedChannels.put(entry.getKey(), entry.getValue());
34+
} else {
35+
ChannelItem channelItem = mergedChannels.get(entry.getKey());
36+
channelItem.setPublish(mergeOperation(channelItem.getPublish(), entry.getValue().getPublish()));
37+
channelItem.setSubscribe(mergeOperation(channelItem.getSubscribe(), entry.getValue().getSubscribe()));
38+
}
39+
}
40+
41+
return mergedChannels;
42+
}
43+
44+
private static Operation mergeOperation(Operation operation, Operation otherOperation) {
45+
Set<Message> mergedMessages = getMessages(operation);
46+
Set<Message> currentEntryMessages = getMessages(otherOperation);
47+
mergedMessages.addAll(currentEntryMessages);
48+
49+
Operation mergedOperation = operation != null ? operation : otherOperation;
50+
if (!mergedMessages.isEmpty()) {
51+
mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages));
52+
}
53+
return mergedOperation;
54+
}
55+
56+
private static Set<Message> getMessages(Operation operation) {
57+
return Optional
58+
.ofNullable(operation)
59+
.map(Operation::getMessage)
60+
.map(MessageHelper::messageObjectToSet)
61+
.orElseGet(TreeSet::new);
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
2+
3+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisher;
4+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncSubscriber;
5+
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
6+
7+
public class ChannelPriority {
8+
/**
9+
* Manual defined channels have the highest priority
10+
* <p>
11+
* Example: Definition via {@link AsyncApiDocket}
12+
*/
13+
public static final int MANUAL_DEFINED = 1;
14+
15+
/**
16+
* Definition via custom annotations
17+
* <p>
18+
* Example: {@link AsyncPublisher}, {@link AsyncSubscriber}
19+
*/
20+
public static final int ASYNC_ANNOTATION = 2;
21+
22+
/**
23+
* Definitions found via spring listener annotations are used last.
24+
* <p>
25+
* Examples: Plugins like KafkaListener, etc
26+
*/
27+
public static final int AUTO_DISCOVERED = 3;
28+
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AbstractClassLevelListenerScanner.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.asyncapi.v2.model.channel.ChannelItem;
66
import com.asyncapi.v2.model.channel.operation.Operation;
77
import com.google.common.collect.Maps;
8-
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
8+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
99
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
1010
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
1111
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
@@ -84,7 +84,7 @@ protected AsyncHeaders buildHeaders(Method method) {
8484
public Map<String, ChannelItem> scan() {
8585
Set<Class<?>> components = componentClassScanner.scan();
8686
Set<Map.Entry<String, ChannelItem>> channels = mapToChannels(components);
87-
return mergeChannels(channels);
87+
return ChannelMerger.merge(new ArrayList<>(channels));
8888
}
8989

9090
private Set<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> components) {
@@ -99,32 +99,6 @@ private boolean isClassAnnotated(Class<?> component) {
9999
return component.isAnnotationPresent(getListenerAnnotationClass());
100100
}
101101

102-
private Map<String, ChannelItem> mergeChannels(Set<Map.Entry<String, ChannelItem>> channelEntries) {
103-
Map<String, ChannelItem> mergedChannels = new TreeMap<>();
104-
105-
for (Map.Entry<String, ChannelItem> entry : channelEntries) {
106-
if (!mergedChannels.containsKey(entry.getKey())) {
107-
mergedChannels.put(entry.getKey(), entry.getValue());
108-
} else {
109-
ChannelItem channelItem = mergedChannels.get(entry.getKey());
110-
Set<Message> mergedMessages = getChannelMessages(channelItem);
111-
Set<Message> currentEntryMessages = getChannelMessages(entry.getValue());
112-
mergedMessages.addAll(currentEntryMessages);
113-
channelItem.getPublish().setMessage(toMessageObjectOrComposition(mergedMessages));
114-
}
115-
}
116-
117-
return mergedChannels;
118-
}
119-
120-
private Set<Message> getChannelMessages(ChannelItem channelItem) {
121-
return Optional
122-
.ofNullable(channelItem.getPublish())
123-
.map(Operation::getMessage)
124-
.map(MessageHelper::messageObjectToSet)
125-
.orElseGet(TreeSet::new);
126-
}
127-
128102
private Optional<Map.Entry<String, ChannelItem>> mapClassToChannel(Class<?> component) {
129103
log.debug("Mapping class \"{}\" to channel", component.getName());
130104

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/AbstractOperationDataScanner.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.asyncapi.v2.binding.OperationBinding;
55
import com.asyncapi.v2.model.channel.ChannelItem;
66
import com.asyncapi.v2.model.channel.operation.Operation;
7+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
78
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
89
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
910
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
1011
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
1112
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference;
1213
import io.github.stavshamir.springwolf.schemas.SchemasService;
1314
import lombok.extern.slf4j.Slf4j;
15+
import org.springframework.core.annotation.Order;
1416

1517
import java.util.List;
1618
import java.util.Map;
@@ -20,10 +22,13 @@
2022
import static java.util.stream.Collectors.*;
2123

2224
@Slf4j
25+
@Order(value = ChannelPriority.MANUAL_DEFINED)
2326
public abstract class AbstractOperationDataScanner implements ChannelsScanner {
2427

2528
protected abstract SchemasService getSchemaService();
29+
2630
protected abstract List<OperationData> getOperationData();
31+
2732
protected abstract OperationData.OperationType getOperationType();
2833

2934
@Override
@@ -64,9 +69,9 @@ private ChannelItem buildChannel(List<OperationData> operationDataList) {
6469

6570
ChannelItem.ChannelItemBuilder channelBuilder = ChannelItem.builder()
6671
.bindings(channelBinding);
67-
switch(getOperationType()) {
72+
switch (getOperationType()) {
6873
case PUBLISH:
69-
channelBuilder = channelBuilder.publish(operation);
74+
channelBuilder = channelBuilder.publish(operation);
7075
break;
7176
case SUBSCRIBE:
7277
channelBuilder = channelBuilder.subscribe(operation);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata;
2+
3+
import com.asyncapi.v2.binding.OperationBinding;
4+
import lombok.Data;
5+
6+
@Data
7+
public class ProcessedOperationBinding {
8+
private final String type;
9+
private final OperationBinding binding;
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation;
2+
3+
import com.asyncapi.v2.binding.OperationBinding;
4+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProcessedOperationBinding;
5+
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema;
6+
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
7+
import org.apache.commons.lang3.StringUtils;
8+
9+
import java.lang.reflect.Method;
10+
import java.util.Arrays;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.stream.Collectors;
15+
16+
import static java.util.stream.Collectors.groupingBy;
17+
18+
class AsyncAnnotationScannerUtil {
19+
public static AsyncHeaders getAsyncHeaders(AsyncOperation op) {
20+
if (op.headers().values().length == 0) {
21+
return AsyncHeaders.NOT_DOCUMENTED;
22+
}
23+
24+
AsyncHeaders asyncHeaders = new AsyncHeaders(op.headers().schemaName());
25+
Arrays.stream(op.headers().values())
26+
.collect(groupingBy(AsyncOperation.Headers.Header::name))
27+
.forEach((headerName, headers) -> {
28+
List<String> values = getHeaderValues(headers);
29+
String exampleValue = values.stream().findFirst().orElse(null);
30+
asyncHeaders.addHeader(
31+
AsyncHeaderSchema
32+
.headerBuilder()
33+
.headerName(headerName)
34+
.description(getDescription(headers))
35+
.enumValue(values)
36+
.example(exampleValue)
37+
.build()
38+
);
39+
});
40+
41+
return asyncHeaders;
42+
}
43+
44+
private static List<String> getHeaderValues(List<AsyncOperation.Headers.Header> value) {
45+
return value
46+
.stream()
47+
.map(AsyncOperation.Headers.Header::value)
48+
.sorted()
49+
.collect(Collectors.toList());
50+
}
51+
52+
private static String getDescription(List<AsyncOperation.Headers.Header> value) {
53+
return value
54+
.stream()
55+
.map(AsyncOperation.Headers.Header::description)
56+
.filter(StringUtils::isNotBlank)
57+
.sorted()
58+
.findFirst()
59+
.orElse(null);
60+
}
61+
62+
public static Map<String, OperationBinding> processBindingFromAnnotation(Method method, List<OperationBindingProcessor> operationBindingProcessors) {
63+
return operationBindingProcessors.stream()
64+
.map(operationBindingProcessor -> operationBindingProcessor.process(method))
65+
.filter(Optional::isPresent)
66+
.map(Optional::get)
67+
.collect(Collectors.toMap(ProcessedOperationBinding::getType, ProcessedOperationBinding::getBinding));
68+
}
69+
}

0 commit comments

Comments
 (0)