20
20
import java .time .Instant ;
21
21
import java .time .ZoneOffset ;
22
22
import java .util .ArrayList ;
23
+ import java .util .Collection ;
23
24
import java .util .List ;
24
25
import java .util .Map ;
25
26
import java .util .Objects ;
26
27
import java .util .Optional ;
27
28
import java .util .UUID ;
29
+ import java .util .function .BiFunction ;
28
30
import java .util .function .Function ;
29
31
30
32
import org .neo4j .cypherdsl .core .Cypher ;
31
33
import org .neo4j .cypherdsl .core .Node ;
32
34
import org .neo4j .cypherdsl .core .ResultStatement ;
33
35
import org .neo4j .cypherdsl .core .Statement ;
36
+ import org .neo4j .cypherdsl .core .StatementBuilder .OrderableOngoingReadingAndWithWithoutWhere ;
34
37
import org .neo4j .cypherdsl .core .renderer .Configuration ;
35
38
import org .neo4j .cypherdsl .core .renderer .Renderer ;
36
39
import org .neo4j .driver .Values ;
@@ -65,11 +68,14 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
65
68
private static final String PUBLICATION_DATE = "publicationDate" ;
66
69
private static final String COMPLETION_DATE = "completionDate" ;
67
70
71
+ private static final Collection <String > ALL_PROPERTIES = List .of (ID , EVENT_SERIALIZED , EVENT_HASH , EVENT_TYPE ,
72
+ LISTENER_ID , PUBLICATION_DATE , COMPLETION_DATE );
73
+
68
74
private static final Node EVENT_PUBLICATION_NODE = node ("Neo4jEventPublication" )
69
75
.named ("neo4jEventPublication" );
70
76
71
- private static final Node EVENT_PUBLICATION_COMPLETED_NODE = node ("Neo4jEventPublicationCompleted " )
72
- .named ("neo4jEventPublicationCompleted " );
77
+ private static final Node EVENT_PUBLICATION_ARCHIVE_NODE = node ("Neo4jEventPublicationArchive " )
78
+ .named ("neo4jEventPublicationArchive " );
73
79
74
80
private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = match (EVENT_PUBLICATION_NODE )
75
81
.where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
@@ -114,6 +120,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
114
120
.set (EVENT_PUBLICATION_NODE .property (EVENT_TYPE ).to (parameter (EVENT_TYPE )))
115
121
.set (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).to (parameter (LISTENER_ID )))
116
122
.set (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).to (parameter (PUBLICATION_DATE )))
123
+ .set (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
117
124
.build ();
118
125
119
126
private static final Statement COMPLETE_STATEMENT = match (EVENT_PUBLICATION_NODE )
@@ -123,29 +130,34 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
123
130
.set (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
124
131
.build ();
125
132
126
- private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = match (EVENT_PUBLICATION_NODE )
133
+ private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = applyProperties ( match (EVENT_PUBLICATION_NODE )
127
134
.where (EVENT_PUBLICATION_NODE .property (ID ).eq (parameter (ID )))
128
- .and (not (exists (match (EVENT_PUBLICATION_COMPLETED_NODE )
129
- .where (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).eq (parameter (ID )))
130
- .returning (literalTrue ()).build ())))
131
- .with (EVENT_PUBLICATION_NODE )
132
- .create (EVENT_PUBLICATION_COMPLETED_NODE )
133
- .set (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).to (EVENT_PUBLICATION_NODE .property (ID )))
134
- .set (EVENT_PUBLICATION_COMPLETED_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
135
- .build ();
136
-
137
- private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = match (EVENT_PUBLICATION_NODE )
138
- .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
139
- .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
140
- .and (not (exists (match (EVENT_PUBLICATION_COMPLETED_NODE )
141
- .where (EVENT_PUBLICATION_COMPLETED_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
142
- .and (EVENT_PUBLICATION_COMPLETED_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
135
+ .and (not (exists (match (EVENT_PUBLICATION_ARCHIVE_NODE )
136
+ .where (EVENT_PUBLICATION_ARCHIVE_NODE .property (ID ).eq (parameter (ID )))
143
137
.returning (literalTrue ()).build ())))
144
- .with (EVENT_PUBLICATION_NODE )
145
- .create (EVENT_PUBLICATION_COMPLETED_NODE )
146
- .set (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).to (EVENT_PUBLICATION_NODE .property (ID )))
147
- .set (EVENT_PUBLICATION_COMPLETED_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
148
- .build ();
138
+ .with (EVENT_PUBLICATION_NODE ));
139
+
140
+ private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = applyProperties (
141
+ match (EVENT_PUBLICATION_NODE )
142
+ .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
143
+ .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
144
+ .and (not (exists (match (EVENT_PUBLICATION_ARCHIVE_NODE )
145
+ .where (EVENT_PUBLICATION_ARCHIVE_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
146
+ .and (EVENT_PUBLICATION_ARCHIVE_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
147
+ .returning (literalTrue ()).build ())))
148
+ .with (EVENT_PUBLICATION_NODE ));
149
+
150
+ private static Statement applyProperties (OrderableOngoingReadingAndWithWithoutWhere source ) {
151
+
152
+ var operations = ALL_PROPERTIES .stream ()
153
+ .map (it -> EVENT_PUBLICATION_ARCHIVE_NODE .property (it ).to (EVENT_PUBLICATION_NODE .property (it )))
154
+ .toList ();
155
+
156
+ return source .create (EVENT_PUBLICATION_ARCHIVE_NODE )
157
+ .set (operations )
158
+ .set (EVENT_PUBLICATION_ARCHIVE_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
159
+ .build ();
160
+ }
149
161
150
162
private static final Function <Node , Statement > COMPLETE_BY_ID_STATEMENT = node -> match (node )
151
163
.where (node .property (ID ).eq (parameter (ID )))
@@ -168,6 +180,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
168
180
private final Renderer renderer ;
169
181
private final EventSerializer eventSerializer ;
170
182
private final CompletionMode completionMode ;
183
+ private final Node completedNode ;
171
184
172
185
private final Statement deleteCompletedStatement ;
173
186
private final Statement deleteCompletedBeforeStatement ;
@@ -187,12 +200,14 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
187
200
this .eventSerializer = eventSerializer ;
188
201
this .completionMode = completionMode ;
189
202
190
- var archiveNode = completionMode == CompletionMode .ARCHIVE ? EVENT_PUBLICATION_COMPLETED_NODE : EVENT_PUBLICATION_NODE ;
203
+ this .completedNode = completionMode == CompletionMode .ARCHIVE
204
+ ? EVENT_PUBLICATION_ARCHIVE_NODE
205
+ : EVENT_PUBLICATION_NODE ;
191
206
192
- this .deleteCompletedStatement = DELETE_COMPLETED_STATEMENT .apply (archiveNode );
193
- this .deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT .apply (archiveNode );
194
- this .completedByIdStatement = COMPLETE_BY_ID_STATEMENT .apply (archiveNode );
195
- this .allCompletedStatement = ALL_COMPLETED_STATEMENT .apply (archiveNode );
207
+ this .deleteCompletedStatement = DELETE_COMPLETED_STATEMENT .apply (completedNode );
208
+ this .deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT .apply (completedNode );
209
+ this .completedByIdStatement = COMPLETE_BY_ID_STATEMENT .apply (completedNode );
210
+ this .allCompletedStatement = ALL_COMPLETED_STATEMENT .apply (completedNode );
196
211
}
197
212
198
213
/*
@@ -219,7 +234,8 @@ public TargetEventPublication create(TargetEventPublication publication) {
219
234
EVENT_HASH , eventHash ,
220
235
EVENT_TYPE , eventType ,
221
236
LISTENER_ID , listenerId ,
222
- PUBLICATION_DATE , Values .value (publicationDate .atOffset (ZoneOffset .UTC ))))
237
+ PUBLICATION_DATE , Values .value (publicationDate .atOffset (ZoneOffset .UTC )),
238
+ COMPLETION_DATE , Values .NULL ))
223
239
.run ();
224
240
225
241
return publication ;
@@ -249,6 +265,7 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
249
265
.bind (identifier .getValue ()).to (LISTENER_ID )
250
266
.bind (Values .value (completionDate .atOffset (ZoneOffset .UTC ))).to (COMPLETION_DATE )
251
267
.run ();
268
+
252
269
neo4jClient .query (renderer .render (DELETE_BY_EVENT_AND_LISTENER_ID ))
253
270
.bind (eventHash ).to (EVENT_HASH )
254
271
.bind (identifier .getValue ()).to (LISTENER_ID )
@@ -279,9 +296,10 @@ public void markCompleted(UUID identifier, Instant completionDate) {
279
296
} else if (completionMode == CompletionMode .ARCHIVE ) {
280
297
281
298
neo4jClient .query (renderer .render (COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT ))
282
- .bind ("" ).to (ID )
299
+ .bind (Values . value ( identifier . toString ()) ).to (ID )
283
300
.bind (Values .value (completionDate .atOffset (ZoneOffset .UTC ))).to (COMPLETION_DATE )
284
301
.run ();
302
+
285
303
deletePublications (List .of (identifier ));
286
304
287
305
} else {
@@ -304,7 +322,7 @@ public List<TargetEventPublication> findIncompletePublications() {
304
322
305
323
return List .copyOf (neo4jClient .query (renderer .render (INCOMPLETE_STATEMENT ))
306
324
.fetchAs (TargetEventPublication .class )
307
- .mappedBy (this :: mapRecordToPublication )
325
+ .mappedBy (incompleteMapping () )
308
326
.all ());
309
327
}
310
328
@@ -319,7 +337,7 @@ public List<TargetEventPublication> findIncompletePublicationsPublishedBefore(In
319
337
return List .copyOf (neo4jClient .query (renderer .render (INCOMPLETE_PUBLISHED_BEFORE_STATEMENT ))
320
338
.bind (Values .value (instant .atOffset (ZoneOffset .UTC ))).to (PUBLICATION_DATE )
321
339
.fetchAs (TargetEventPublication .class )
322
- .mappedBy (this :: mapRecordToPublication )
340
+ .mappedBy (incompleteMapping () )
323
341
.all ());
324
342
}
325
343
@@ -338,7 +356,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
338
356
return neo4jClient .query (renderer .render (INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT ))
339
357
.bindAll (Map .of (EVENT_HASH , eventHash , LISTENER_ID , listenerId ))
340
358
.fetchAs (TargetEventPublication .class )
341
- .mappedBy (this :: mapRecordToPublication )
359
+ .mappedBy (incompleteMapping () )
342
360
.one ();
343
361
}
344
362
@@ -351,7 +369,7 @@ public List<TargetEventPublication> findCompletedPublications() {
351
369
352
370
return new ArrayList <>(neo4jClient .query (renderer .render (allCompletedStatement ))
353
371
.fetchAs (TargetEventPublication .class )
354
- .mappedBy (this :: mapRecordToPublication )
372
+ .mappedBy (completeMapping () )
355
373
.all ());
356
374
}
357
375
@@ -391,21 +409,31 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
391
409
.run ();
392
410
}
393
411
394
- private Neo4jEventPublicationAdapter mapRecordToPublication (TypeSystem typeSystem , org .neo4j .driver .Record record ) {
412
+ private BiFunction <TypeSystem , org .neo4j .driver .Record , TargetEventPublication > incompleteMapping () {
413
+ return (typeSystem , driverRecord ) -> mapRecordToPublication (typeSystem , driverRecord , EVENT_PUBLICATION_NODE );
414
+ }
415
+
416
+ private BiFunction <TypeSystem , org .neo4j .driver .Record , TargetEventPublication > completeMapping () {
417
+ return (typeSystem , driverRecord ) -> mapRecordToPublication (typeSystem , driverRecord , completedNode );
418
+ }
419
+
420
+ private Neo4jEventPublicationAdapter mapRecordToPublication (TypeSystem typeSystem , org .neo4j .driver .Record record ,
421
+ Node node ) {
395
422
396
- var publicationNode = record .get (EVENT_PUBLICATION_NODE .getRequiredSymbolicName ().getValue ()).asNode ();
423
+ var publicationNode = record .get (node .getRequiredSymbolicName ().getValue ()).asNode ();
397
424
var identifier = UUID .fromString (publicationNode .get (ID ).asString ());
398
425
var publicationDate = publicationNode .get (PUBLICATION_DATE ).asZonedDateTime ().toInstant ();
399
426
var listenerId = publicationNode .get (LISTENER_ID ).asString ();
400
427
var eventSerialized = publicationNode .get (EVENT_SERIALIZED ).asString ();
401
428
var eventHash = publicationNode .get (EVENT_HASH ).asString ();
402
429
var eventType = publicationNode .get (EVENT_TYPE ).asString ();
430
+ var completionDate = publicationNode .get (COMPLETION_DATE );
403
431
404
432
try {
405
433
406
434
var event = eventSerializer .deserialize (eventSerialized , Class .forName (eventType ));
407
435
var publication = new Neo4jEventPublication (identifier , publicationDate , listenerId , event ,
408
- eventHash );
436
+ eventHash , completionDate . isNull () ? null : completionDate . asZonedDateTime (). toInstant () );
409
437
410
438
return new Neo4jEventPublicationAdapter (publication );
411
439
0 commit comments