Skip to content

Commit 68e783e

Browse files
committed
GH-796 - Revamped event publication registry and JDBC implementation.
1 parent dfcdee3 commit 68e783e

File tree

64 files changed

+2479
-71
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2479
-71
lines changed

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Optional;
2020
import java.util.UUID;
2121

22+
import org.jspecify.annotations.Nullable;
2223
import org.springframework.context.ApplicationEvent;
2324
import org.springframework.context.PayloadApplicationEvent;
2425

@@ -100,4 +101,24 @@ default boolean isCompleted() {
100101
default int compareTo(EventPublication that) {
101102
return this.getPublicationDate().compareTo(that.getPublicationDate());
102103
}
104+
105+
Status getStatus();
106+
107+
@Nullable
108+
Instant getLastResubmissionDate();
109+
110+
int getCompletionAttempts();
111+
112+
enum Status {
113+
114+
PUBLISHED,
115+
116+
PROCESSING,
117+
118+
COMPLETED,
119+
120+
FAILED,
121+
122+
RESUBMITTED;
123+
}
103124
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2023-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events;
17+
18+
/**
19+
* All uncompleted event publications.
20+
*
21+
* @author Oliver Drotbohm
22+
* @since 2.0
23+
*/
24+
public interface FailedEventPublications {
25+
26+
void resubmit(ResubmissionOptions options);
27+
}

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/IncompleteEventPublications.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ public interface IncompleteEventPublications {
4141
* @param duration must not be {@literal null}.
4242
*/
4343
void resubmitIncompletePublicationsOlderThan(Duration duration);
44+
45+
void resubmitIncompletePublications(ResubmissionOptions options);
4446
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events;
17+
18+
import java.time.Duration;
19+
import java.util.function.Predicate;
20+
21+
/**
22+
* @author Oliver Drotbohm
23+
*/
24+
public class ResubmissionOptions {
25+
26+
private int maxInFlight = Integer.MAX_VALUE;
27+
private int batchSize = 100;
28+
private Duration minAge = Duration.ZERO;
29+
private Predicate<EventPublication> filter = it -> true;
30+
31+
public int getMaxInFlight() {
32+
return maxInFlight;
33+
}
34+
35+
public int getBatchSize() {
36+
return batchSize;
37+
}
38+
39+
public Duration getMinAge() {
40+
return minAge;
41+
}
42+
43+
public Predicate<EventPublication> getFilter() {
44+
return filter;
45+
}
46+
}

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3434
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
3535
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown;
36+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3637
import org.springframework.context.Lifecycle;
3738
import org.springframework.context.annotation.Bean;
3839
import org.springframework.context.annotation.Import;
@@ -42,6 +43,7 @@
4243
import org.springframework.modulith.events.core.DefaultEventPublicationRegistry;
4344
import org.springframework.modulith.events.core.EventPublicationRegistry;
4445
import org.springframework.modulith.events.core.EventPublicationRepository;
46+
import org.springframework.modulith.events.core.EventPublicationProperties;
4547
import org.springframework.modulith.events.support.CompletionRegisteringAdvisor;
4648
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
4749
import org.springframework.scheduling.annotation.AbstractAsyncConfiguration;
@@ -56,16 +58,17 @@
5658
* @author Dmitry Belyaev
5759
*/
5860
@AutoConfiguration
59-
@Import(AsyncEnablingConfiguration.class)
61+
@Import({ AsyncEnablingConfiguration.class, StalenessMonitorConfiguration.class })
62+
@EnableConfigurationProperties(EventPublicationProperties.class)
6063
public class EventPublicationAutoConfiguration extends EventPublicationConfiguration {
6164

6265
@Override
6366
@Bean
6467
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
6568
@ConditionalOnBean(EventPublicationRepository.class)
6669
DefaultEventPublicationRegistry eventPublicationRegistry(EventPublicationRepository repository,
67-
ObjectProvider<Clock> clock) {
68-
return super.eventPublicationRegistry(repository, clock);
70+
ObjectProvider<Clock> clock, ObjectProvider<EventPublicationProperties> properties) {
71+
return super.eventPublicationRegistry(repository, clock, properties);
6972
}
7073

7174
@Bean

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationConfiguration.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.modulith.events.core.DefaultEventPublicationRegistry;
2828
import org.springframework.modulith.events.core.EventPublicationRegistry;
2929
import org.springframework.modulith.events.core.EventPublicationRepository;
30+
import org.springframework.modulith.events.core.EventPublicationProperties;
3031
import org.springframework.modulith.events.support.CompletionRegisteringAdvisor;
3132
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
3233

@@ -43,8 +44,10 @@ class EventPublicationConfiguration {
4344
@Bean
4445
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
4546
DefaultEventPublicationRegistry eventPublicationRegistry(EventPublicationRepository repository,
46-
ObjectProvider<Clock> clock) {
47-
return new DefaultEventPublicationRegistry(repository, clock.getIfAvailable(() -> Clock.systemUTC()));
47+
ObjectProvider<Clock> clock, ObjectProvider<EventPublicationProperties> properties) {
48+
49+
return new DefaultEventPublicationRegistry(repository, clock.getIfAvailable(() -> Clock.systemUTC()),
50+
properties.getIfAvailable(() -> EventPublicationProperties.DEFAULTS));
4851
}
4952

5053
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.config;
17+
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.boot.autoconfigure.AutoConfiguration;
21+
import org.springframework.modulith.events.core.EventPublicationProperties;
22+
import org.springframework.modulith.events.core.EventPublicationRegistry;
23+
import org.springframework.modulith.events.support.EventUtils;
24+
import org.springframework.scheduling.annotation.SchedulingConfigurer;
25+
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* Configures a fixed-delay task to mark stale in-flight event publications as failed according to the configuration in
30+
* {@link EventPublicationProperties}.
31+
*
32+
* @author Oliver Drotbohm
33+
* @since 2.0
34+
*/
35+
@AutoConfiguration
36+
class StalenessMonitorConfiguration implements SchedulingConfigurer {
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(StalenessMonitorConfiguration.class);
39+
40+
private final EventPublicationRegistry registry;
41+
private final EventPublicationProperties properties;
42+
43+
/**
44+
* Creates a new {@link StalenessMonitorConfiguration} for the given {@link EventPublicationRegistry} and
45+
* {@link EventPublicationProperties}.
46+
*
47+
* @param registry must not be {@literal null}.
48+
* @param properties must not be {@literal null}.
49+
*/
50+
StalenessMonitorConfiguration(EventPublicationRegistry registry, EventPublicationProperties properties) {
51+
52+
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
53+
Assert.notNull(properties, "EventPublicationRepositoryProperties must not be null!");
54+
55+
this.registry = registry;
56+
this.properties = properties;
57+
}
58+
59+
/*
60+
* (non-Javadoc)
61+
* @see org.springframework.scheduling.annotation.SchedulingConfigurer#configureTasks(org.springframework.scheduling.config.ScheduledTaskRegistrar)
62+
*/
63+
@Override
64+
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
65+
66+
if (properties.monitorStaleness()) {
67+
68+
LOGGER.info("Checking for stale event publications every {}.",
69+
EventUtils.prettyPrint(properties.getStalenessCheckIntervall()));
70+
71+
taskRegistrar.addFixedDelayTask(registry::markStalePublicationsFailed,
72+
properties.getStalenessCheckIntervall());
73+
}
74+
}
75+
}

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublication.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ class DefaultEventPublication implements TargetEventPublication {
3434
private final Object event;
3535
private final PublicationTargetIdentifier targetIdentifier;
3636
private final Instant publicationDate;
37+
private final Instant lastResubmissionDate;
38+
private final int completionAttempts;
3739

40+
private Status status;
3841
private Optional<Instant> completionDate;
3942

4043
/**
@@ -55,6 +58,9 @@ class DefaultEventPublication implements TargetEventPublication {
5558
this.targetIdentifier = targetIdentifier;
5659
this.publicationDate = publicationDate;
5760
this.completionDate = Optional.empty();
61+
this.status = Status.PUBLISHED;
62+
this.lastResubmissionDate = publicationDate;
63+
this.completionAttempts = 1;
5864
}
5965

6066
/*
@@ -106,7 +112,36 @@ public Optional<Instant> getCompletionDate() {
106112
*/
107113
@Override
108114
public void markCompleted(Instant instant) {
115+
109116
this.completionDate = Optional.of(instant);
117+
this.status = Status.COMPLETED;
118+
}
119+
120+
/*
121+
* (non-Javadoc)
122+
* @see org.springframework.modulith.events.EventPublication#getStatus()
123+
*/
124+
@Override
125+
public Status getStatus() {
126+
return status;
127+
}
128+
129+
/*
130+
* (non-Javadoc)
131+
* @see org.springframework.modulith.events.EventPublication#getLastResubmissionDate()
132+
*/
133+
@Override
134+
public @Nullable Instant getLastResubmissionDate() {
135+
return lastResubmissionDate;
136+
}
137+
138+
/*
139+
* (non-Javadoc)
140+
* @see org.springframework.modulith.events.EventPublication#getCompletionAttempts()
141+
*/
142+
@Override
143+
public int getCompletionAttempts() {
144+
return completionAttempts;
110145
}
111146

112147
/*
@@ -117,7 +152,7 @@ public void markCompleted(Instant instant) {
117152
public String toString() {
118153

119154
return "DefaultEventPublication [event=" + event + ", targetIdentifier=" + targetIdentifier + ", publicationDate="
120-
+ publicationDate + ", completionDate=" + completionDate + "]";
155+
+ publicationDate + ", completionDate=" + completionDate + ", status=" + status + "]";
121156
}
122157

123158
/*
@@ -138,7 +173,8 @@ public boolean equals(@Nullable Object obj) {
138173
return Objects.equals(this.completionDate, that.completionDate) //
139174
&& Objects.equals(this.event, that.event) //
140175
&& Objects.equals(this.publicationDate, that.publicationDate) //
141-
&& Objects.equals(this.targetIdentifier, that.targetIdentifier);
176+
&& Objects.equals(this.targetIdentifier, that.targetIdentifier) //
177+
&& Objects.equals(this.status, that.status);
142178
}
143179

144180
/*
@@ -147,6 +183,6 @@ public boolean equals(@Nullable Object obj) {
147183
*/
148184
@Override
149185
public int hashCode() {
150-
return Objects.hash(completionDate, event, publicationDate, targetIdentifier);
186+
return Objects.hash(completionDate, event, publicationDate, targetIdentifier, status);
151187
}
152188
}

0 commit comments

Comments
 (0)