Skip to content

Commit df6b997

Browse files
committed
Add KafkaMessageBindingProcessor and KafkaAsyncMessageBinding annotation as part of KafkaAsyncOperationBinding to support documentation of kafka keys as message binding
1 parent fe5a9b6 commit df6b997

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata;
2+
3+
import com.asyncapi.v2.binding.kafka.KafkaMessageBinding;
4+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding;
5+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding.KafkaAsyncMessageBinding;
6+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.MessageBindingProcessor;
7+
import io.swagger.v3.oas.models.media.Schema;
8+
import io.swagger.v3.oas.models.media.StringSchema;
9+
import org.springframework.context.EmbeddedValueResolverAware;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.util.StringUtils;
12+
import org.springframework.util.StringValueResolver;
13+
14+
import java.lang.reflect.Method;
15+
import java.util.Arrays;
16+
import java.util.Optional;
17+
18+
@Component
19+
public class KafkaMessageBindingProcessor implements MessageBindingProcessor, EmbeddedValueResolverAware {
20+
private StringValueResolver resolver;
21+
22+
@Override
23+
public void setEmbeddedValueResolver(StringValueResolver resolver) {
24+
this.resolver = resolver;
25+
}
26+
27+
@Override
28+
public Optional<ProcessedMessageBinding> process(Method method) {
29+
return Arrays.stream(method.getAnnotations())
30+
.filter(annotation -> annotation instanceof KafkaAsyncOperationBinding)
31+
.map(annotation -> (KafkaAsyncOperationBinding) annotation)
32+
.findAny()
33+
.map(this::mapToMessageBinding);
34+
}
35+
36+
private ProcessedMessageBinding mapToMessageBinding(KafkaAsyncOperationBinding bindingAnnotation) {
37+
KafkaAsyncMessageBinding messageBinding = bindingAnnotation.messageBinding();
38+
KafkaMessageBinding kafkaMessageBinding = KafkaMessageBinding.builder()
39+
.bindingVersion(resolveOrNull(messageBinding.bindingVersion()))
40+
.key(resolveSchemaOrNull(messageBinding))
41+
.build();
42+
43+
return new ProcessedMessageBinding(bindingAnnotation.type(), kafkaMessageBinding);
44+
}
45+
46+
private String resolveOrNull(String stringValue) {
47+
return StringUtils.isEmpty(stringValue) ? null : resolver.resolveStringValue(stringValue);
48+
}
49+
50+
private Schema<?> resolveSchemaOrNull(KafkaAsyncMessageBinding messageBinding) {
51+
Schema<?> schemaDefinition = null;
52+
switch (messageBinding.keyType()) {
53+
case NO_KEY:
54+
break;
55+
case STRING_KEY:
56+
schemaDefinition = new StringSchema()
57+
.description(resolveOrNull(messageBinding.description()));
58+
}
59+
60+
return schemaDefinition;
61+
}
62+
}

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/operationdata/annotation/KafkaAsyncOperationBinding.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,21 @@
2020
String clientId() default "";
2121
String bindingVersion() default "";
2222

23+
KafkaAsyncMessageBinding messageBinding() default @KafkaAsyncMessageBinding();
24+
25+
@Retention(RetentionPolicy.CLASS)
26+
@Target({})
27+
@interface KafkaAsyncMessageBinding {
28+
29+
KafkaKeyTypes keyType() default KafkaKeyTypes.NO_KEY;
30+
31+
String description() default "";
32+
33+
String bindingVersion() default "";
34+
35+
enum KafkaKeyTypes {
36+
NO_KEY,
37+
STRING_KEY
38+
}
39+
}
2340
}

0 commit comments

Comments
 (0)