Skip to content

Commit a7e4a79

Browse files
ciberkleidodrotbohm
andcommitted
GH-806 - Add archive support for MongoDB.
Co-authored-by: Oliver Drotbohm <oliver.drotbohm@broadcom.com>
1 parent 2cb0db7 commit a7e4a79

File tree

2 files changed

+151
-122
lines changed

2 files changed

+151
-122
lines changed

spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
5252
private static final String ID = "id";
5353
private static final String LISTENER_ID = "listenerId";
5454
private static final String PUBLICATION_DATE = "publicationDate";
55-
5655
private static final Sort DEFAULT_SORT = Sort.by(PUBLICATION_DATE).ascending();
5756

57+
static final String ARCHIVE_COLLECTION = "event_publication_archive";
58+
5859
private final MongoTemplate mongoTemplate;
5960
private final CompletionMode completionMode;
61+
private final String collection, archiveCollection;
6062

6163
/**
6264
* Creates a new {@link MongoDbEventPublicationRepository} for the given {@link MongoTemplate}.
@@ -71,6 +73,8 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion
7173

7274
this.mongoTemplate = mongoTemplate;
7375
this.completionMode = completionMode;
76+
this.collection = "event_publication";
77+
this.archiveCollection = completionMode == CompletionMode.ARCHIVE ? ARCHIVE_COLLECTION : collection;
7478
}
7579

7680
/*
@@ -80,7 +84,7 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion
8084
@Override
8185
public TargetEventPublication create(TargetEventPublication publication) {
8286

83-
mongoTemplate.save(domainToDocument(publication));
87+
mongoTemplate.save(domainToDocument(publication), collection);
8488

8589
return publication;
8690
}
@@ -93,16 +97,20 @@ public TargetEventPublication create(TargetEventPublication publication) {
9397
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {
9498

9599
var query = byEventAndListenerId(event, identifier);
100+
var update = Update.update(COMPLETION_DATE, completionDate);
96101

97102
if (completionMode == CompletionMode.DELETE) {
98103

99-
mongoTemplate.remove(query, MongoDbEventPublication.class);
104+
mongoTemplate.remove(query, MongoDbEventPublication.class, collection);
100105

101-
} else {
106+
} else if (completionMode == CompletionMode.ARCHIVE) {
102107

103-
var update = Update.update(COMPLETION_DATE, completionDate);
108+
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
109+
var completedEvent = mongoTemplate.findAndRemove(query, MongoDbEventPublication.class, collection);
110+
mongoTemplate.save(completedEvent, archiveCollection);
111+
} else {
104112

105-
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class);
113+
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
106114
}
107115
}
108116

@@ -113,17 +121,21 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
113121
@Override
114122
public void markCompleted(UUID identifier, Instant completionDate) {
115123

116-
var criateria = query(where(ID).is(identifier));
124+
var criteria = query(where(ID).is(identifier));
125+
var update = Update.update(COMPLETION_DATE, completionDate);
117126

118127
if (completionMode == CompletionMode.DELETE) {
119128

120-
mongoTemplate.remove(criateria, MongoDbEventPublication.class);
129+
mongoTemplate.remove(criteria, MongoDbEventPublication.class, collection);
121130

122-
} else {
131+
} else if (completionMode == CompletionMode.ARCHIVE) {
123132

124-
var update = Update.update(COMPLETION_DATE, completionDate);
133+
mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
134+
var completedEvent = mongoTemplate.findAndRemove(criteria, MongoDbEventPublication.class, collection);
135+
mongoTemplate.save(completedEvent, archiveCollection);
125136

126-
mongoTemplate.findAndModify(criateria, update, MongoDbEventPublication.class);
137+
} else {
138+
mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
127139
}
128140
}
129141

@@ -168,7 +180,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
168180
*/
169181
@Override
170182
public List<TargetEventPublication> findCompletedPublications() {
171-
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)));
183+
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)), archiveCollection);
172184
}
173185

174186
/*
@@ -177,7 +189,9 @@ public List<TargetEventPublication> findCompletedPublications() {
177189
*/
178190
@Override
179191
public void deletePublications(List<UUID> identifiers) {
180-
mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class);
192+
193+
mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, collection);
194+
mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, archiveCollection);
181195
}
182196

183197
/*
@@ -186,7 +200,7 @@ public void deletePublications(List<UUID> identifiers) {
186200
*/
187201
@Override
188202
public void deleteCompletedPublications() {
189-
mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class);
203+
mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class, archiveCollection);
190204
}
191205

192206
/*
@@ -198,16 +212,22 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
198212

199213
Assert.notNull(instant, "Instant must not be null!");
200214

201-
mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class);
215+
mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class, archiveCollection);
202216
}
203217

204218
private List<TargetEventPublication> readMapped(Query query) {
219+
return readMapped(query, collection);
220+
}
221+
222+
private List<TargetEventPublication> readMapped(Query query, String collection) {
205223

206224
return mongoTemplate.query(MongoDbEventPublication.class)
225+
.inCollection(collection)
207226
.matching(query)
208227
.stream()
209228
.map(MongoDbEventPublicationRepository::documentToDomain)
210229
.toList();
230+
211231
}
212232

213233
private Query byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) {

0 commit comments

Comments
 (0)