15
15
*/
16
16
package org .springframework .modulith .events .mongodb ;
17
17
18
+ import static org .springframework .data .mongodb .core .aggregation .Aggregation .*;
18
19
import static org .springframework .data .mongodb .core .query .Criteria .*;
19
20
import static org .springframework .data .mongodb .core .query .Query .*;
20
21
24
25
import java .util .Optional ;
25
26
import java .util .UUID ;
26
27
28
+ import org .bson .Document ;
29
+ import org .springframework .data .annotation .Id ;
27
30
import org .springframework .data .domain .Sort ;
28
31
import org .springframework .data .mongodb .core .MongoTemplate ;
32
+ import org .springframework .data .mongodb .core .aggregation .Fields ;
33
+ import org .springframework .data .mongodb .core .aggregation .MergeOperation .WhenDocumentsMatch ;
29
34
import org .springframework .data .mongodb .core .query .Criteria ;
30
35
import org .springframework .data .mongodb .core .query .Query ;
31
36
import org .springframework .data .mongodb .core .query .Update ;
@@ -96,7 +101,8 @@ public TargetEventPublication create(TargetEventPublication publication) {
96
101
@ Override
97
102
public void markCompleted (Object event , PublicationTargetIdentifier identifier , Instant completionDate ) {
98
103
99
- var query = byEventAndListenerId (event , identifier );
104
+ var criteria = byEventAndListenerId (event , identifier );
105
+ var query = defaultQuery (criteria );
100
106
var update = Update .update (COMPLETION_DATE , completionDate );
101
107
102
108
if (completionMode == CompletionMode .DELETE ) {
@@ -105,9 +111,8 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
105
111
106
112
} else if (completionMode == CompletionMode .ARCHIVE ) {
107
113
108
- mongoTemplate .findAndModify (query , update , MongoDbEventPublication .class , collection );
109
- var completedEvent = mongoTemplate .findAndRemove (query , MongoDbEventPublication .class , collection );
110
- mongoTemplate .save (completedEvent , archiveCollection );
114
+ markCompleted (criteria , completionDate );
115
+
111
116
} else {
112
117
113
118
mongoTemplate .findAndModify (query , update , MongoDbEventPublication .class , collection );
@@ -121,21 +126,20 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
121
126
@ Override
122
127
public void markCompleted (UUID identifier , Instant completionDate ) {
123
128
124
- var criteria = query (where (ID ).is (identifier ));
129
+ var criteria = where (ID ).is (identifier ).and (COMPLETION_DATE ).isNull ();
130
+ var query = query (criteria );
125
131
var update = Update .update (COMPLETION_DATE , completionDate );
126
132
127
133
if (completionMode == CompletionMode .DELETE ) {
128
134
129
- mongoTemplate .remove (criteria , MongoDbEventPublication .class , collection );
135
+ mongoTemplate .remove (query , MongoDbEventPublication .class , collection );
130
136
131
137
} else if (completionMode == CompletionMode .ARCHIVE ) {
132
138
133
- mongoTemplate .findAndModify (criteria , update , MongoDbEventPublication .class , collection );
134
- var completedEvent = mongoTemplate .findAndRemove (criteria , MongoDbEventPublication .class , collection );
135
- mongoTemplate .save (completedEvent , archiveCollection );
139
+ markCompleted (criteria , completionDate );
136
140
137
141
} else {
138
- mongoTemplate .findAndModify (criteria , update , MongoDbEventPublication .class , collection );
142
+ mongoTemplate .findAndModify (query , update , MongoDbEventPublication .class , collection );
139
143
}
140
144
}
141
145
@@ -168,7 +172,7 @@ public List<TargetEventPublication> findIncompletePublicationsPublishedBefore(In
168
172
public Optional <TargetEventPublication > findIncompletePublicationsByEventAndTargetIdentifier (
169
173
Object event , PublicationTargetIdentifier targetIdentifier ) {
170
174
171
- var results = readMapped (byEventAndListenerId (event , targetIdentifier ));
175
+ var results = readMapped (defaultQuery ( byEventAndListenerId (event , targetIdentifier ) ));
172
176
173
177
// if there are several events with exactly the same payload we return the oldest one first
174
178
return results .isEmpty () ? Optional .empty () : Optional .of (results .get (0 ));
@@ -230,13 +234,13 @@ private List<TargetEventPublication> readMapped(Query query, String collection)
230
234
231
235
}
232
236
233
- private Query byEventAndListenerId (Object event , PublicationTargetIdentifier identifier ) {
237
+ private Criteria byEventAndListenerId (Object event , PublicationTargetIdentifier identifier ) {
234
238
235
239
var eventAsMongoType = mongoTemplate .getConverter ().convertToMongoType (event , TypeInformation .OBJECT );
236
240
237
- return defaultQuery ( where (EVENT ).is (eventAsMongoType ) //
241
+ return where (EVENT ).is (eventAsMongoType ) //
238
242
.and (LISTENER_ID ).is (identifier .getValue ())
239
- .and (COMPLETION_DATE ).isNull ()) ;
243
+ .and (COMPLETION_DATE ).isNull ();
240
244
}
241
245
242
246
private static MongoDbEventPublication domainToDocument (TargetEventPublication publication ) {
@@ -256,6 +260,28 @@ private static Query defaultQuery(Criteria criteria) {
256
260
return query (criteria ).with (DEFAULT_SORT );
257
261
}
258
262
263
+ private void markCompleted (Criteria lookup , Instant now ) {
264
+
265
+ var aggregation = newAggregation (MongoDbEventPublication .class ,
266
+
267
+ match (lookup ),
268
+
269
+ addFields ()
270
+ .addFieldWithValue (COMPLETION_DATE , now )
271
+ .build (),
272
+
273
+ merge ()
274
+ .intoCollection (archiveCollection )
275
+ .on (ID )
276
+ .whenMatched (WhenDocumentsMatch .keepExistingDocument ())
277
+ .build ());
278
+
279
+ mongoTemplate
280
+ .aggregate (aggregation , collection , Document .class )
281
+ .forEach (it -> mongoTemplate .remove (query (where (Fields .UNDERSCORE_ID ).is (it .get (Fields .UNDERSCORE_ID ))),
282
+ collection ));
283
+ }
284
+
259
285
private static class MongoDbEventPublicationAdapter implements TargetEventPublication {
260
286
261
287
private final MongoDbEventPublication publication ;
@@ -326,4 +352,6 @@ public int hashCode() {
326
352
return Objects .hash (publication );
327
353
}
328
354
}
355
+
356
+ record IdOnly (@ Id UUID id ) {}
329
357
}
0 commit comments