Skip to content

Commit 6afc00c

Browse files
committed
GH-452 - Introduce EventPublicationRepository.findCompletedPublications().
The default EventPublicationRegistry previously erroneously called find*In*completePublications(), i.e. returned incomplete publications when it was supposed to look up the completed ones. To be able to do the latter we need to extend EventPublicationRepository and introduce a query execution for all completed publications. As we'd normally only chance API in major versions, but the bugfix needing to go into a bugfix release, the new method is introduced as default method rejecting the execution in case a currently existing repository implementation does not implement it. This allows us to adapt the implementations we ship to support the bugfix, but will require other implementations to ship an adapted version. This means we can ship a release that's not breaking assuming one of the official store implementations is used.
1 parent e3dbd02 commit 6afc00c

File tree

10 files changed

+154
-4
lines changed

10 files changed

+154
-4
lines changed

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import org.springframework.beans.factory.DisposableBean;
27-
import org.springframework.context.ApplicationListener;
2827
import org.springframework.modulith.events.CompletedEventPublications;
2928
import org.springframework.modulith.events.EventPublication;
3029
import org.springframework.transaction.annotation.Propagation;
@@ -132,7 +131,7 @@ public void deleteCompletedPublicationsOlderThan(Duration duration) {
132131
*/
133132
@Override
134133
public Collection<? extends TargetEventPublication> findAll() {
135-
return findIncompletePublications();
134+
return events.findCompletedPublications();
136135
}
137136

138137
/*

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,17 @@ default void markCompleted(TargetEventPublication publication, Instant completio
9292
Optional<TargetEventPublication> findIncompletePublicationsByEventAndTargetIdentifier( //
9393
Object event, PublicationTargetIdentifier targetIdentifier);
9494

95+
/**
96+
* Returns all completed event publications currently found in the system.
97+
*
98+
* @return will never be {@literal null}.
99+
* @since 1.1.2
100+
*/
101+
default List<TargetEventPublication> findCompletedPublications() {
102+
throw new UnsupportedOperationException(
103+
"Your store implementation does not support looking up completed publications!");
104+
}
105+
95106
/**
96107
* Deletes all publications with the given identifiers.
97108
*

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232
import org.springframework.jdbc.core.JdbcOperations;
33-
import org.springframework.jdbc.core.ResultSetExtractor;
34-
import org.springframework.jdbc.core.RowMapper;
3533
import org.springframework.lang.Nullable;
3634
import org.springframework.modulith.events.core.EventPublicationRepository;
3735
import org.springframework.modulith.events.core.EventSerializer;
@@ -56,6 +54,13 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE
5654
VALUES (?, ?, ?, ?, ?)
5755
""";
5856

57+
private static final String SQL_STATEMENT_FIND_COMPLETED = """
58+
SELECT ID, COMPLETION_DATE, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SERIALIZED_EVENT
59+
FROM EVENT_PUBLICATION
60+
WHERE COMPLETION_DATE IS NOT NULL
61+
ORDER BY PUBLICATION_DATE ASC
62+
""";
63+
5964
private static final String SQL_STATEMENT_FIND_UNCOMPLETED = """
6065
SELECT ID, COMPLETION_DATE, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SERIALIZED_EVENT
6166
FROM EVENT_PUBLICATION
@@ -189,6 +194,18 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
189194
return result == null ? Optional.empty() : result.stream().findFirst();
190195
}
191196

197+
/*
198+
* (non-Javadoc)
199+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
200+
*/
201+
@Override
202+
public List<TargetEventPublication> findCompletedPublications() {
203+
204+
var result = operations.query(SQL_STATEMENT_FIND_COMPLETED, this::resultSetToPublications);
205+
206+
return result == null ? Collections.emptyList() : result;
207+
}
208+
192209
@Override
193210
@Transactional(readOnly = true)
194211
@SuppressWarnings("null")

spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,21 @@ void findsPublicationsOlderThanReference() throws Exception {
294294
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
295295
}
296296

297+
@Test // GH-452
298+
void findsCompletedPublications() {
299+
300+
var event = new TestEvent("first");
301+
var publication = createPublication(event);
302+
303+
repository.markCompleted(publication, Instant.now());
304+
305+
assertThat(repository.findCompletedPublications())
306+
.hasSize(1)
307+
.element(0)
308+
.extracting(TargetEventPublication::getEvent)
309+
.isEqualTo(event);
310+
}
311+
297312
private TargetEventPublication createPublication(Object event) {
298313

299314
var token = event.toString();

spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
5050
and p.completionDate is null
5151
""";
5252

53+
private static String COMPLETE = """
54+
select p
55+
from JpaEventPublication p
56+
where
57+
p.completionDate is not null
58+
order by
59+
p.publicationDate asc
60+
""";
61+
5362
private static String INCOMPLETE = """
5463
select p
5564
from JpaEventPublication p
@@ -186,6 +195,20 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
186195
.map(this::entityToDomain);
187196
}
188197

198+
/*
199+
* (non-Javadoc)
200+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
201+
*/
202+
@Override
203+
public List<TargetEventPublication> findCompletedPublications() {
204+
205+
return entityManager.createQuery(COMPLETE, JpaEventPublication.class)
206+
.getResultList()
207+
.stream()
208+
.map(this::entityToDomain)
209+
.toList();
210+
}
211+
189212
/*
190213
* (non-Javadoc)
191214
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)

spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,21 @@ void findsPublicationsOlderThanReference() throws Exception {
270270
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
271271
}
272272

273+
@Test // GH-452
274+
void findsCompletedPublications() {
275+
276+
var event = new TestEvent("first");
277+
var publication = createPublication(event);
278+
279+
repository.markCompleted(publication, Instant.now());
280+
281+
assertThat(repository.findCompletedPublications())
282+
.hasSize(1)
283+
.element(0)
284+
.extracting(TargetEventPublication::getEvent)
285+
.isEqualTo(event);
286+
}
287+
273288
private TargetEventPublication createPublication(Object event) {
274289

275290
var token = event.toString();

spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
127127
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
128128
}
129129

130+
/*
131+
* (non-Javadoc)
132+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
133+
*/
134+
@Override
135+
public List<TargetEventPublication> findCompletedPublications() {
136+
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)));
137+
}
138+
130139
/*
131140
* (non-Javadoc)
132141
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)

spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,21 @@ void findsPublicationsOlderThanReference() throws Exception {
129129
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
130130
}
131131

132+
@Test // GH-452
133+
void findsCompletedPublications() {
134+
135+
var event = new TestEvent("first");
136+
var publication = createPublication(event);
137+
138+
repository.markCompleted(publication, Instant.now());
139+
140+
assertThat(repository.findCompletedPublications())
141+
.hasSize(1)
142+
.element(0)
143+
.extracting(TargetEventPublication::getEvent)
144+
.isEqualTo(event);
145+
}
146+
132147
private TargetEventPublication createPublication(Object event) {
133148
return createPublication(event, TARGET_IDENTIFIER);
134149
}

spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.time.Instant;
1919
import java.time.ZoneOffset;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Objects;
@@ -45,6 +46,7 @@
4546
* A {@link Neo4jClient} based implementation of {@link EventPublicationRepository}.
4647
*
4748
* @author Gerrit Meier
49+
* @author Oliver Drotbohm
4850
* @since 1.1
4951
*/
5052
@Transactional
@@ -114,6 +116,12 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
114116
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
115117
.build();
116118

119+
private static final ResultStatement ALL_COMPLETED_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
120+
.where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNotNull())
121+
.returning(EVENT_PUBLICATION_NODE)
122+
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
123+
.build();
124+
117125
private final Neo4jClient neo4jClient;
118126
private final Renderer renderer;
119127
private final EventSerializer eventSerializer;
@@ -225,6 +233,19 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
225233
.one();
226234
}
227235

236+
/*
237+
* (non-Javadoc)
238+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
239+
*/
240+
@Override
241+
public List<TargetEventPublication> findCompletedPublications() {
242+
243+
return new ArrayList<>(neo4jClient.query(renderer.render(ALL_COMPLETED_STATEMENT))
244+
.fetchAs(TargetEventPublication.class)
245+
.mappedBy(this::mapRecordToPublication)
246+
.all());
247+
}
248+
228249
/*
229250
* (non-Javadoc)
230251
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)

spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,31 @@ void deleteCompletedPublicationsBefore() throws Exception {
220220
}
221221
}
222222

223+
@Test // GH-452
224+
void findsCompletedPublications() {
225+
226+
var event = new TestEvent("first");
227+
var publication = createPublication(event);
228+
229+
repository.markCompleted(publication, Instant.now());
230+
231+
assertThat(repository.findCompletedPublications())
232+
.hasSize(1)
233+
.element(0)
234+
.extracting(TargetEventPublication::getEvent)
235+
.isEqualTo(event);
236+
}
237+
238+
private TargetEventPublication createPublication(Object event) {
239+
240+
var token = event.toString();
241+
242+
doReturn(token).when(eventSerializer).serialize(event);
243+
doReturn(event).when(eventSerializer).deserialize(token, event.getClass());
244+
245+
return repository.create(TargetEventPublication.of(event, TARGET_IDENTIFIER));
246+
}
247+
223248
@Value
224249
static class TestEvent {
225250
String eventId;

0 commit comments

Comments
 (0)