Skip to content

Commit 4132ad3

Browse files
ciberkleidodrotbohm
andcommitted
GH-806 - Add archive support for Neo4j.
Co-authored-by: Oliver Drotbohm <oliver.drotbohm@broadcom.com>
1 parent d302ad1 commit 4132ad3

File tree

2 files changed

+284
-207
lines changed

2 files changed

+284
-207
lines changed

spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java

Lines changed: 104 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.modulith.events.neo4j;
1717

18+
import static org.neo4j.cypherdsl.core.Cypher.*;
19+
1820
import java.time.Instant;
1921
import java.time.ZoneOffset;
2022
import java.util.ArrayList;
@@ -23,6 +25,7 @@
2325
import java.util.Objects;
2426
import java.util.Optional;
2527
import java.util.UUID;
28+
import java.util.function.Function;
2629

2730
import org.neo4j.cypherdsl.core.Cypher;
2831
import org.neo4j.cypherdsl.core.Node;
@@ -48,6 +51,7 @@
4851
*
4952
* @author Gerrit Meier
5053
* @author Oliver Drotbohm
54+
* @author Cora Iberkleid
5155
* @since 1.1
5256
*/
5357
@Transactional
@@ -61,85 +65,115 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
6165
private static final String PUBLICATION_DATE = "publicationDate";
6266
private static final String COMPLETION_DATE = "completionDate";
6367

64-
private static final Node EVENT_PUBLICATION_NODE = Cypher.node("Neo4jEventPublication")
68+
private static final Node EVENT_PUBLICATION_NODE = node("Neo4jEventPublication")
6569
.named("neo4jEventPublication");
6670

67-
private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = Cypher
68-
.match(EVENT_PUBLICATION_NODE)
69-
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(Cypher.parameter(EVENT_HASH)))
70-
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(Cypher.parameter(LISTENER_ID)))
71+
private static final Node EVENT_PUBLICATION_COMPLETED_NODE = node("Neo4jEventPublicationCompleted")
72+
.named("neo4jEventPublicationCompleted");
73+
74+
private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = match(EVENT_PUBLICATION_NODE)
75+
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
76+
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
7177
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())
7278
.returning(EVENT_PUBLICATION_NODE)
7379
.build();
7480

75-
private static final Statement DELETE_BY_EVENT_AND_LISTENER_ID = Cypher.match(EVENT_PUBLICATION_NODE)
76-
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(Cypher.parameter(EVENT_HASH)))
77-
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(Cypher.parameter(LISTENER_ID)))
81+
private static final Statement DELETE_BY_EVENT_AND_LISTENER_ID = match(EVENT_PUBLICATION_NODE)
82+
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
83+
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
7884
.delete(EVENT_PUBLICATION_NODE)
7985
.build();
8086

81-
private static final Statement DELETE_BY_ID_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
82-
.where(EVENT_PUBLICATION_NODE.property(ID).in(Cypher.parameter(ID)))
87+
private static final Statement DELETE_BY_ID_STATEMENT = match(EVENT_PUBLICATION_NODE)
88+
.where(EVENT_PUBLICATION_NODE.property(ID).in(parameter(ID)))
8389
.delete(EVENT_PUBLICATION_NODE)
8490
.build();
8591

86-
private static final Statement DELETE_COMPLETED_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
87-
.where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNotNull())
88-
.delete(EVENT_PUBLICATION_NODE)
92+
private static final Function<Node, Statement> DELETE_COMPLETED_STATEMENT = node -> match(node)
93+
.where(node.property(COMPLETION_DATE).isNotNull())
94+
.delete(node)
8995
.build();
9096

91-
private static final Statement DELETE_COMPLETED_BEFORE_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
92-
.where(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(Cypher.parameter(PUBLICATION_DATE)))
93-
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNotNull())
94-
.delete(EVENT_PUBLICATION_NODE)
97+
private static final Function<Node, Statement> DELETE_COMPLETED_BEFORE_STATEMENT = node -> match(node)
98+
.where(node.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE)))
99+
.and(node.property(COMPLETION_DATE).isNotNull())
100+
.delete(node)
95101
.build();
96102

97-
private static final Statement INCOMPLETE_PUBLISHED_BEFORE_STATEMENT = Cypher
98-
.match(EVENT_PUBLICATION_NODE)
99-
.where(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(Cypher.parameter(PUBLICATION_DATE)))
103+
private static final Statement INCOMPLETE_PUBLISHED_BEFORE_STATEMENT = match(EVENT_PUBLICATION_NODE)
104+
.where(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE)))
100105
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())
101106
.returning(EVENT_PUBLICATION_NODE)
102107
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
103108
.build();
104109

105110
private static final Statement CREATE_STATEMENT = Cypher.create(EVENT_PUBLICATION_NODE)
106-
.set(EVENT_PUBLICATION_NODE.property(ID).to(Cypher.parameter(ID)))
107-
.set(EVENT_PUBLICATION_NODE.property(EVENT_SERIALIZED).to(Cypher.parameter(EVENT_SERIALIZED)))
108-
.set(EVENT_PUBLICATION_NODE.property(EVENT_HASH).to(Cypher.parameter(EVENT_HASH)))
109-
.set(EVENT_PUBLICATION_NODE.property(EVENT_TYPE).to(Cypher.parameter(EVENT_TYPE)))
110-
.set(EVENT_PUBLICATION_NODE.property(LISTENER_ID).to(Cypher.parameter(LISTENER_ID)))
111-
.set(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).to(Cypher.parameter(PUBLICATION_DATE)))
111+
.set(EVENT_PUBLICATION_NODE.property(ID).to(parameter(ID)))
112+
.set(EVENT_PUBLICATION_NODE.property(EVENT_SERIALIZED).to(parameter(EVENT_SERIALIZED)))
113+
.set(EVENT_PUBLICATION_NODE.property(EVENT_HASH).to(parameter(EVENT_HASH)))
114+
.set(EVENT_PUBLICATION_NODE.property(EVENT_TYPE).to(parameter(EVENT_TYPE)))
115+
.set(EVENT_PUBLICATION_NODE.property(LISTENER_ID).to(parameter(LISTENER_ID)))
116+
.set(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).to(parameter(PUBLICATION_DATE)))
112117
.build();
113118

114-
private static final Statement COMPLETE_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
115-
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(Cypher.parameter(EVENT_HASH)))
116-
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(Cypher.parameter(LISTENER_ID)))
119+
private static final Statement COMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE)
120+
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
121+
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
117122
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())
118-
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(Cypher.parameter(COMPLETION_DATE)))
123+
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
124+
.build();
125+
126+
private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = match(EVENT_PUBLICATION_NODE)
127+
.where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
128+
.and(not(exists(match(EVENT_PUBLICATION_COMPLETED_NODE)
129+
.where(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).eq(parameter(ID)))
130+
.returning(literalTrue()).build())))
131+
.with(EVENT_PUBLICATION_NODE)
132+
.create(EVENT_PUBLICATION_COMPLETED_NODE)
133+
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).to(EVENT_PUBLICATION_NODE.property(ID)))
134+
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
135+
.build();
136+
137+
private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = match(EVENT_PUBLICATION_NODE)
138+
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
139+
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
140+
.and(not(exists(match(EVENT_PUBLICATION_COMPLETED_NODE)
141+
.where(EVENT_PUBLICATION_COMPLETED_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
142+
.and(EVENT_PUBLICATION_COMPLETED_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
143+
.returning(literalTrue()).build())))
144+
.with(EVENT_PUBLICATION_NODE)
145+
.create(EVENT_PUBLICATION_COMPLETED_NODE)
146+
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).to(EVENT_PUBLICATION_NODE.property(ID)))
147+
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
119148
.build();
120149

121-
private static final Statement COMPLETE_BY_ID_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
122-
.where(EVENT_PUBLICATION_NODE.property(ID).eq(Cypher.parameter(ID)))
123-
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(Cypher.parameter(COMPLETION_DATE)))
150+
private static final Function<Node, Statement> COMPLETE_BY_ID_STATEMENT = node -> match(node)
151+
.where(node.property(ID).eq(parameter(ID)))
152+
.set(node.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
124153
.build();
125154

126-
private static final ResultStatement INCOMPLETE_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
155+
private static final ResultStatement INCOMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE)
127156
.where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())
128157
.returning(EVENT_PUBLICATION_NODE)
129158
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
130159
.build();
131160

132-
private static final ResultStatement ALL_COMPLETED_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
133-
.where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNotNull())
134-
.returning(EVENT_PUBLICATION_NODE)
135-
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
161+
private static final Function<Node, ResultStatement> ALL_COMPLETED_STATEMENT = node -> match(node)
162+
.where(node.property(COMPLETION_DATE).isNotNull())
163+
.returning(node)
164+
.orderBy(node.property(PUBLICATION_DATE))
136165
.build();
137166

138167
private final Neo4jClient neo4jClient;
139168
private final Renderer renderer;
140169
private final EventSerializer eventSerializer;
141170
private final CompletionMode completionMode;
142171

172+
private final Statement deleteCompletedStatement;
173+
private final Statement deleteCompletedBeforeStatement;
174+
private final Statement completedByIdStatement;
175+
private final ResultStatement allCompletedStatement;
176+
143177
Neo4jEventPublicationRepository(Neo4jClient neo4jClient, Configuration cypherDslConfiguration,
144178
EventSerializer eventSerializer, CompletionMode completionMode) {
145179

@@ -152,6 +186,13 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
152186
this.renderer = Renderer.getRenderer(cypherDslConfiguration);
153187
this.eventSerializer = eventSerializer;
154188
this.completionMode = completionMode;
189+
190+
var archiveNode = completionMode == CompletionMode.ARCHIVE ? EVENT_PUBLICATION_COMPLETED_NODE : EVENT_PUBLICATION_NODE;
191+
192+
this.deleteCompletedStatement = DELETE_COMPLETED_STATEMENT.apply(archiveNode);
193+
this.deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT.apply(archiveNode);
194+
this.completedByIdStatement = COMPLETE_BY_ID_STATEMENT.apply(archiveNode);
195+
this.allCompletedStatement = ALL_COMPLETED_STATEMENT.apply(archiveNode);
155196
}
156197

157198
/*
@@ -201,6 +242,18 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
201242
.bind(identifier.getValue()).to(LISTENER_ID)
202243
.run();
203244

245+
} else if (completionMode == CompletionMode.ARCHIVE) {
246+
247+
neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT))
248+
.bind(eventHash).to(EVENT_HASH)
249+
.bind(identifier.getValue()).to(LISTENER_ID)
250+
.bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE)
251+
.run();
252+
neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID))
253+
.bind(eventHash).to(EVENT_HASH)
254+
.bind(identifier.getValue()).to(LISTENER_ID)
255+
.run();
256+
204257
} else {
205258

206259
neo4jClient.query(renderer.render(COMPLETE_STATEMENT))
@@ -223,13 +276,22 @@ public void markCompleted(UUID identifier, Instant completionDate) {
223276

224277
deletePublications(List.of(identifier));
225278

279+
} else if (completionMode == CompletionMode.ARCHIVE) {
280+
281+
neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT))
282+
.bind("").to(ID)
283+
.bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE)
284+
.run();
285+
deletePublications(List.of(identifier));
286+
226287
} else {
227288

228-
neo4jClient.query(renderer.render(COMPLETE_BY_ID_STATEMENT))
289+
neo4jClient.query(renderer.render(completedByIdStatement))
229290
.bind(Values.value(identifier.toString())).to(ID)
230291
.bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE)
231292
.run();
232293
}
294+
233295
}
234296

235297
/*
@@ -287,7 +349,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
287349
@Override
288350
public List<TargetEventPublication> findCompletedPublications() {
289351

290-
return new ArrayList<>(neo4jClient.query(renderer.render(ALL_COMPLETED_STATEMENT))
352+
return new ArrayList<>(neo4jClient.query(renderer.render(allCompletedStatement))
291353
.fetchAs(TargetEventPublication.class)
292354
.mappedBy(this::mapRecordToPublication)
293355
.all());
@@ -313,7 +375,7 @@ public void deletePublications(List<UUID> identifiers) {
313375
@Override
314376
@Transactional
315377
public void deleteCompletedPublications() {
316-
neo4jClient.query(renderer.render(DELETE_COMPLETED_STATEMENT)).run();
378+
neo4jClient.query(renderer.render(deleteCompletedStatement)).run();
317379
}
318380

319381
/*
@@ -324,7 +386,7 @@ public void deleteCompletedPublications() {
324386
@Transactional
325387
public void deleteCompletedPublicationsBefore(Instant instant) {
326388

327-
neo4jClient.query(renderer.render(DELETE_COMPLETED_BEFORE_STATEMENT))
389+
neo4jClient.query(renderer.render(deleteCompletedBeforeStatement))
328390
.bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE)
329391
.run();
330392
}

0 commit comments

Comments
 (0)