Skip to content

Commit 632f9e9

Browse files
committed
Extend producer controllers of springwolf-kafka-plugin and springwolf-amqp-pluging to support message bindings which are sent by springwolf-ui
1 parent d7de977 commit 632f9e9

File tree

7 files changed

+62
-30
lines changed

7 files changed

+62
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
package io.github.stavshamir.springwolf.asyncapi.dtos;
1+
package io.github.stavshamir.springwolf.asyncapi.controller.dtos;
22

33
import lombok.Data;
44

55
import java.util.Map;
66

77
@Data
8-
public class KafkaMessageDto {
8+
public class MessageDto {
9+
10+
private final Map<String, String> bindings;
911

1012
private final Map<String, String> headers;
1113

1214
private final Map<String, ?> payload;
13-
1415
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

3+
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
34
import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer;
45
import lombok.RequiredArgsConstructor;
56
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
78
import org.springframework.http.HttpStatus;
8-
import org.springframework.web.bind.annotation.*;
9+
import org.springframework.web.bind.annotation.PostMapping;
10+
import org.springframework.web.bind.annotation.RequestBody;
11+
import org.springframework.web.bind.annotation.RequestMapping;
12+
import org.springframework.web.bind.annotation.RequestParam;
13+
import org.springframework.web.bind.annotation.RestController;
914
import org.springframework.web.server.ResponseStatusException;
1015

11-
import java.util.Map;
12-
1316
import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_CONFIG_PREFIX;
1417
import static io.github.stavshamir.springwolf.SpringWolfAmqpConfigConstants.SPRINGWOLF_AMQP_PLUGIN_PUBLISHING_ENABLED;
1518

@@ -23,14 +26,14 @@ public class SpringwolfAmqpController {
2326
private final SpringwolfAmqpProducer amqpProducer;
2427

2528
@PostMapping("/publish")
26-
public void publish(@RequestParam String topic, @RequestBody Map<String, Object> payload) {
29+
public void publish(@RequestParam String topic, @RequestBody MessageDto message) {
2730
if (amqpProducer.isEnabled()) {
2831
log.warn("AMQP producer is not enabled - message will not be published");
2932
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled");
3033
}
3134

32-
log.info("Publishing to amqp queue {}: {}", topic, payload);
33-
amqpProducer.send(topic, payload);
35+
log.info("Publishing to amqp queue {}: {}", topic, message.getPayload());
36+
amqpProducer.send(topic, message.getPayload());
3437
}
3538

3639
}

springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfAmqpProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public SpringwolfAmqpProducer(ChannelsService channelsService, List<RabbitTempla
3333
this.rabbitTemplate = rabbitTemplates.isEmpty() ? Optional.empty() : Optional.of(rabbitTemplates.get(0));
3434
}
3535

36-
public void send(String channelName, Map<String, Object> payload) {
36+
public void send(String channelName, Map<String, ?> payload) {
3737
ChannelItem channelItem = channelsService.getChannels().get(channelName);
3838

3939
String exchange = getExchangeName(channelItem);

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaController.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

3-
import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto;
3+
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
44
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
55
import lombok.RequiredArgsConstructor;
66
import lombok.extern.slf4j.Slf4j;
77
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
88
import org.springframework.http.HttpStatus;
9-
import org.springframework.web.bind.annotation.*;
9+
import org.springframework.web.bind.annotation.PostMapping;
10+
import org.springframework.web.bind.annotation.RequestBody;
11+
import org.springframework.web.bind.annotation.RequestMapping;
12+
import org.springframework.web.bind.annotation.RequestParam;
13+
import org.springframework.web.bind.annotation.RestController;
1014
import org.springframework.web.server.ResponseStatusException;
1115

1216
import static io.github.stavshamir.springwolf.SpringWolfKafkaConfigConstants.SPRINGWOLF_KAFKA_CONFIG_PREFIX;
@@ -22,8 +26,8 @@ public class SpringwolfKafkaController {
2226
private final SpringwolfKafkaProducer kafkaProducer;
2327

2428
@PostMapping("/publish")
25-
public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kafkaMessage) {
26-
if(kafkaMessage.getPayload() == null) {
29+
public void publish(@RequestParam String topic, @RequestBody MessageDto message) {
30+
if(message.getPayload() == null) {
2731
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Message payload is required");
2832
}
2933

@@ -32,8 +36,10 @@ public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kaf
3236
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled");
3337
}
3438

35-
log.debug("Publishing to kafka topic {}: {}", topic, kafkaMessage);
36-
kafkaProducer.send(topic, kafkaMessage.getHeaders(), kafkaMessage.getPayload());
39+
String kafkaKey = message.getBindings() != null ? message.getBindings().get("key") : null;
40+
log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message);
41+
42+
kafkaProducer.send(topic, kafkaKey, message.getHeaders(), message.getPayload());
3743
}
3844

3945
}

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ public boolean isEnabled() {
2525
return kafkaTemplate.isPresent();
2626
}
2727

28-
public void send(String topic, Map<String, String> headers, Map<String, ?> payload) {
28+
public void send(String topic, String key, Map<String, String> headers, Map<String, ?> payload) {
2929
if (kafkaTemplate.isPresent()) {
30-
kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload));
30+
kafkaTemplate.get().send(buildProducerRecord(topic, key, headers, payload));
3131
} else {
3232
log.warn("Kafka producer is not configured");
3333
}
3434
}
3535

36-
private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, Map<String, String> headers, Map<String, ?> payload) {
36+
private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, String key, Map<String, String> headers, Map<String, ?> payload) {
3737
List<Header> recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList();
3838

39-
return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders);
39+
return new ProducerRecord<>(topic, null, null, key, payload, recordHeaders);
4040
}
4141

4242
private List<Header> buildHeaders(Map<String, String> headers) {

springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerTest.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

3-
import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto;
3+
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
44
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
55
import org.junit.Test;
66
import org.junit.runner.RunWith;
@@ -17,7 +17,11 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
20-
import static org.mockito.Mockito.*;
20+
import static org.mockito.Mockito.eq;
21+
import static org.mockito.Mockito.isNull;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.verifyZeroInteractions;
24+
import static org.mockito.Mockito.when;
2125

2226
@RunWith(MockitoJUnitRunner.class)
2327
public class SpringwolfKafkaControllerTest {
@@ -37,7 +41,7 @@ public class SpringwolfKafkaControllerTest {
3741
@Test
3842
public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() {
3943
try {
40-
springwolfKafkaController.publish("test-topic", new KafkaMessageDto(null, null));
44+
springwolfKafkaController.publish("test-topic", new MessageDto(null,null, null));
4145
failBecauseExceptionWasNotThrown(ResponseStatusException.class);
4246
} catch (ResponseStatusException e) {
4347
assertThat(e.getStatus()).isEqualTo(HttpStatus.BAD_REQUEST);
@@ -50,7 +54,7 @@ public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() {
5054
when(springwolfKafkaProducer.isEnabled()).thenReturn(false);
5155

5256
Map<String, String> payload = Collections.singletonMap("some-key", "some-value");
53-
KafkaMessageDto messageToPublish = new KafkaMessageDto(null, payload);
57+
MessageDto messageToPublish = new MessageDto(null, null, payload);
5458

5559
try {
5660
springwolfKafkaController.publish("test-topic", messageToPublish);
@@ -65,11 +69,11 @@ public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() {
6569
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);
6670

6771
Map<String, String> payload = Collections.singletonMap("some-key", "some-value");
68-
KafkaMessageDto messageToPublish = new KafkaMessageDto(null, payload);
72+
MessageDto messageToPublish = new MessageDto(null, null, payload);
6973

7074
springwolfKafkaController.publish("test-topic", messageToPublish);
7175

72-
verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), payloadCaptor.capture());
76+
verify(springwolfKafkaProducer).send(eq("test-topic"),isNull(), isNull(), payloadCaptor.capture());
7377

7478
assertThat(payloadCaptor.getValue()).isEqualTo(payload);
7579
}
@@ -81,11 +85,29 @@ public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAreSend() {
8185
Map<String, String> headers = Collections.singletonMap("some-header-key", "some-header-value");
8286
Map<String, String> payload = Collections.singletonMap("some-payload-key", "some-payload-value");
8387

84-
KafkaMessageDto messageToPublish = new KafkaMessageDto(headers, payload);
88+
MessageDto messageToPublish = new MessageDto(null, headers, payload);
8589

8690
springwolfKafkaController.publish("test-topic", messageToPublish);
8791

88-
verify(springwolfKafkaProducer).send(eq("test-topic"), headerCaptor.capture(), payloadCaptor.capture());
92+
verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), headerCaptor.capture(), payloadCaptor.capture());
93+
94+
assertThat(headerCaptor.getValue()).isEqualTo(headers);
95+
assertThat(payloadCaptor.getValue()).isEqualTo(payload);
96+
}
97+
98+
@Test
99+
public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAndBindingsAreSend() {
100+
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);
101+
102+
Map<String, String> headers = Collections.singletonMap("some-header-key", "some-header-value");
103+
Map<String, String> payload = Collections.singletonMap("some-payload-key", "some-payload-value");
104+
Map<String, String> bindings = Collections.singletonMap("key", "kafka-key-value");
105+
106+
MessageDto messageToPublish = new MessageDto(bindings, headers, payload);
107+
108+
springwolfKafkaController.publish("test-topic", messageToPublish);
109+
110+
verify(springwolfKafkaProducer).send(eq("test-topic"), eq("kafka-key-value"), headerCaptor.capture(), payloadCaptor.capture());
89111

90112
assertThat(headerCaptor.getValue()).isEqualTo(headers);
91113
assertThat(payloadCaptor.getValue()).isEqualTo(payload);

springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateCon
4949
public void testSendingKafkaMessageWithoutHeaders() {
5050
Map<String, Object> payload = Collections.singletonMap("some", "field");
5151

52-
springwolfKafkaProducer.send("test-topic", null, payload);
52+
springwolfKafkaProducer.send("test-topic", null, null, payload);
5353

5454
verify(kafkaTemplate).send(recordArgumentCaptor.capture());
5555

@@ -67,7 +67,7 @@ public void testSendingKafkaMessageWithHeaders() {
6767
Map<String, Object> payload = Collections.singletonMap("some", "field");
6868
Map<String, String> headers = Collections.singletonMap("header-key", "header");
6969

70-
springwolfKafkaProducer.send("test-topic", headers, payload);
70+
springwolfKafkaProducer.send("test-topic", null, headers, payload);
7171

7272
verify(kafkaTemplate).send(recordArgumentCaptor.capture());
7373

0 commit comments

Comments
 (0)