Skip to content

Commit 41aad24

Browse files
joshlongodrotbohm
authored andcommitted
GH-869 - Support for externalizing events into a Spring Messaging MessageChannel.
1 parent 413e2f5 commit 41aad24

File tree

9 files changed

+297
-0
lines changed

9 files changed

+297
-0
lines changed

spring-modulith-events/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<module>spring-modulith-events-kafka</module>
2727
<module>spring-modulith-events-mongodb</module>
2828
<module>spring-modulith-events-neo4j</module>
29+
<module>spring-modulith-events-messaging</module>
2930
</modules>
3031

3132
<profiles>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.springframework.modulith</groupId>
9+
<artifactId>spring-modulith-events</artifactId>
10+
<version>1.3.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<name>Spring Modulith - Events - Spring Messaging support</name>
14+
<artifactId>spring-modulith-events-messaging</artifactId>
15+
16+
<properties>
17+
<module.name>org.springframework.modulith.events.messaging</module.name>
18+
</properties>
19+
20+
<dependencies>
21+
22+
<dependency>
23+
<groupId>org.springframework.modulith</groupId>
24+
<artifactId>spring-modulith-api</artifactId>
25+
<version>${project.version}</version>
26+
</dependency>
27+
28+
<dependency>
29+
<groupId>org.springframework.modulith</groupId>
30+
<artifactId>spring-modulith-events-core</artifactId>
31+
<version>${project.version}</version>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>org.springframework</groupId>
36+
<artifactId>spring-messaging</artifactId>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.springframework.integration</groupId>
41+
<artifactId>spring-integration-core</artifactId>
42+
<scope>test</scope>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>com.fasterxml.jackson.core</groupId>
47+
<artifactId>jackson-databind</artifactId>
48+
<optional>true</optional>
49+
</dependency>
50+
51+
<!-- Test dependencies -->
52+
53+
<dependency>
54+
<groupId>org.springframework.modulith</groupId>
55+
<artifactId>spring-modulith-starter-jdbc</artifactId>
56+
<version>${project.version}</version>
57+
<scope>test</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>com.h2database</groupId>
62+
<artifactId>h2</artifactId>
63+
<scope>test</scope>
64+
</dependency>
65+
66+
<dependency>
67+
<groupId>org.springframework.boot</groupId>
68+
<artifactId>spring-boot-starter-json</artifactId>
69+
<scope>test</scope>
70+
</dependency>
71+
72+
<dependency>
73+
<groupId>org.springframework.boot</groupId>
74+
<artifactId>spring-boot-starter-test</artifactId>
75+
<scope>test</scope>
76+
</dependency>
77+
78+
</dependencies>
79+
80+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2023-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.messaging;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.springframework.beans.factory.BeanFactory;
23+
import org.springframework.boot.autoconfigure.AutoConfiguration;
24+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
26+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.expression.BeanFactoryResolver;
29+
import org.springframework.expression.spel.support.StandardEvaluationContext;
30+
import org.springframework.messaging.MessageChannel;
31+
import org.springframework.messaging.support.MessageBuilder;
32+
import org.springframework.modulith.events.EventExternalizationConfiguration;
33+
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
34+
import org.springframework.modulith.events.support.BrokerRouting;
35+
import org.springframework.modulith.events.support.DelegatingEventExternalizer;
36+
37+
/**
38+
* Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to a Spring Messaging
39+
* {@link MessageChannel message channel}.
40+
*
41+
* @author Josh Long
42+
*/
43+
@AutoConfiguration
44+
@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)
45+
@ConditionalOnClass(MessageChannel.class)
46+
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled",
47+
havingValue = "true",
48+
matchIfMissing = true)
49+
class SpringMessagingEventExternalizerConfiguration {
50+
51+
private static final Logger logger = LoggerFactory.getLogger(SpringMessagingEventExternalizerConfiguration.class);
52+
53+
public static final String MODULITH_ROUTING_HEADER = "modulithRouting";
54+
55+
@Bean
56+
DelegatingEventExternalizer springMessagingEventExternalizer(
57+
EventExternalizationConfiguration configuration,
58+
BeanFactory factory) {
59+
60+
logger.debug("Registering domain event externalization for Spring Messaging…");
61+
62+
var context = new StandardEvaluationContext();
63+
context.setBeanResolver(new BeanFactoryResolver(factory));
64+
65+
return new DelegatingEventExternalizer(configuration, (target, payload) -> {
66+
var routing = BrokerRouting.of(target, context);
67+
var message = MessageBuilder
68+
.withPayload(payload)
69+
.setHeader(MODULITH_ROUTING_HEADER, routing)
70+
.build();
71+
if (logger.isDebugEnabled()) {
72+
logger.info("trying to find a {} with name {}", MessageChannel.class.getName(), routing.getTarget());
73+
}
74+
var bean = factory.getBean(routing.getTarget(), MessageChannel.class);
75+
bean.send(message);
76+
return CompletableFuture.completedFuture(null);
77+
});
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Messaging event externalization support.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
package org.springframework.modulith.events.messaging;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.springframework.modulith.events.messaging.SpringMessagingEventExternalizerConfiguration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2023-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.messaging;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
20+
21+
import lombok.RequiredArgsConstructor;
22+
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.jupiter.api.Test;
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.beans.factory.annotation.Qualifier;
28+
import org.springframework.boot.autoconfigure.SpringBootApplication;
29+
import org.springframework.boot.test.context.SpringBootTest;
30+
import org.springframework.context.ApplicationEventPublisher;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.integration.core.GenericHandler;
33+
import org.springframework.integration.dsl.DirectChannelSpec;
34+
import org.springframework.integration.dsl.IntegrationFlow;
35+
import org.springframework.integration.dsl.MessageChannels;
36+
import org.springframework.messaging.MessageChannel;
37+
import org.springframework.modulith.events.CompletedEventPublications;
38+
import org.springframework.modulith.events.Externalized;
39+
import org.springframework.transaction.annotation.Transactional;
40+
41+
/**
42+
* Integration tests for Spring Messaging-based event publication.
43+
*
44+
* @author Josh Long
45+
*/
46+
@SpringBootTest
47+
class SpringMessagingEventPublicationIntegrationTests {
48+
49+
private static final String CHANNEL_NAME = "target";
50+
51+
private static final AtomicInteger COUNTER = new AtomicInteger();
52+
53+
@Autowired TestPublisher publisher;
54+
@Autowired CompletedEventPublications completed;
55+
56+
@SpringBootApplication
57+
static class TestConfiguration {
58+
59+
@Bean
60+
TestPublisher testPublisher(ApplicationEventPublisher publisher) {
61+
return new TestPublisher(publisher);
62+
}
63+
64+
@Bean
65+
IntegrationFlow inboundIntegrationFlow(
66+
@Qualifier(CHANNEL_NAME) MessageChannel inbound) {
67+
68+
return IntegrationFlow
69+
.from(inbound)
70+
.handle((GenericHandler<TestEvent>) (payload, headers) -> {
71+
COUNTER.incrementAndGet();
72+
return null;
73+
})
74+
.get();
75+
}
76+
77+
@Bean(value = CHANNEL_NAME)
78+
DirectChannelSpec target() {
79+
return MessageChannels.direct();
80+
}
81+
82+
}
83+
84+
@Test
85+
void publishesEventToSpringMessaging() throws Exception {
86+
var publishes = 2;
87+
for (var i = 0; i < publishes; i++) {
88+
publisher.publishEvent();
89+
}
90+
Thread.sleep(200);
91+
assertThat(COUNTER.get()).isEqualTo(publishes);
92+
assertThat(completed.findAll()).hasSize(publishes);
93+
}
94+
95+
@Externalized(CHANNEL_NAME)
96+
static class TestEvent {}
97+
98+
@RequiredArgsConstructor
99+
static class TestPublisher {
100+
101+
private final ApplicationEventPublisher events;
102+
103+
@Transactional
104+
void publishEvent() {
105+
events.publishEvent(new TestEvent());
106+
}
107+
}
108+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
spring.modulith.events.jdbc.schema-initialization.enabled=true
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<configuration>
3+
4+
<property name="CONSOLE_LOG_PATTERN" value="%d{HH:mm:ss.SSS} %1.-1level - %8.8t : %m%n%wEx" />
5+
6+
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
7+
<include resource="org/springframework/boot/logging/logback/console-appender.xml" />
8+
9+
<root level="INFO">
10+
<appender-ref ref="CONSOLE" />
11+
</root>
12+
13+
<logger name="org.springframework.modulith" level="INFO" />
14+
<logger name="org.springframework.messaging" level="INFO" />
15+
16+
</configuration>

src/docs/antora/modules/ROOT/pages/events.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,12 @@ When routing key is set, requires SQS queue to be configured as a FIFO queue.
368368
|Uses Spring Cloud AWS SNS support.
369369
The logical routing key will be used as SNS message group id.
370370
When routing key is set, requires SNS to be configured as a FIFO topic with content based deduplication enabled.
371+
372+
|Spring Messaging
373+
|`spring-modulith-events-messaging`
374+
|Uses Spring's core `Message` and `MessageChannel` support.
375+
Resolves the target `MessageChannel` by its bean name given the `target` in the `Externalized` annotation. Forwards routing information as a header - called `modulithRouting` - to be processed in whatever way by downstream components, typically in a Spring Integration `IntegrationFlow`.
376+
371377
|===
372378
373379
[[externalization.fundamentals]]

0 commit comments

Comments
 (0)