Skip to content

Commit 2cb0db7

Browse files
ciberkleidodrotbohm
andcommitted
GH-806 - Add archive support for JDBC.
Co-authored-by: Oliver Drotbohm <oliver.drotbohm@broadcom.com>
1 parent 179e4ac commit 2cb0db7

17 files changed

+271
-66
lines changed

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/DatabaseSchemaInitializer.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.springframework.core.io.ResourceLoader;
2424
import org.springframework.jdbc.core.JdbcOperations;
2525
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
26-
import org.springframework.util.Assert;
2726

2827
/**
2928
* Initializes the DB schema used to store events
@@ -37,23 +36,15 @@ class DatabaseSchemaInitializer implements InitializingBean {
3736

3837
private final DataSource dataSource;
3938
private final ResourceLoader resourceLoader;
40-
private final DatabaseType databaseType;
4139
private final JdbcOperations jdbcOperations;
42-
private final JdbcConfigurationProperties properties;
40+
private final JdbcRepositorySettings settings;
4341

44-
DatabaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader, DatabaseType databaseType,
45-
JdbcOperations jdbcOperations, JdbcConfigurationProperties properties) {
46-
47-
Assert.isTrue(properties.getSchemaInitialization().isEnabled(),
48-
"Schema initialization disabled! Initializer should not have been registered!");
42+
DatabaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader, JdbcOperations jdbcOperations, JdbcRepositorySettings settings) {
4943

5044
this.dataSource = dataSource;
5145
this.resourceLoader = resourceLoader;
52-
this.databaseType = databaseType;
5346
this.jdbcOperations = jdbcOperations;
54-
this.properties = properties;
55-
56-
properties.verify(databaseType);
47+
this.settings = settings;
5748
}
5849

5950
/*
@@ -66,7 +57,8 @@ public void afterPropertiesSet() throws Exception {
6657
try (Connection connection = dataSource.getConnection()) {
6758

6859
var initialSchema = connection.getSchema();
69-
var schemaName = properties.getSchema();
60+
var schemaName = settings.getSchema();
61+
var databaseType = settings.getDatabaseType();
7062
var useSchema = schemaName != null && !schemaName.isEmpty();
7163

7264
if (useSchema) { // A schema name has been specified.
@@ -80,7 +72,10 @@ public void afterPropertiesSet() throws Exception {
8072
}
8173

8274
var locator = new DatabaseSchemaLocator(resourceLoader);
83-
new ResourceDatabasePopulator(locator.getSchemaResource(databaseType)).execute(dataSource);
75+
76+
var populator = new ResourceDatabasePopulator();
77+
locator.getSchemaResource(settings).forEach(populator::addScript);
78+
populator.execute(dataSource);
8479

8580
// Return to the initial schema.
8681
if (initialSchema != null && useSchema) {

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/DatabaseSchemaLocator.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import org.springframework.core.io.ResourceLoader;
2020
import org.springframework.util.Assert;
2121

22+
import java.util.Collection;
23+
import java.util.List;
24+
2225
/**
2326
* Simple wrapper around a {@link ResourceLoader} to load database specific schema files from the classpath.
2427
*
@@ -41,16 +44,22 @@ public class DatabaseSchemaLocator {
4144
}
4245

4346
/**
44-
* Loads the {@link Resource} containing the schema for the given {@link DatabaseType} from the classpath.
47+
* Loads the {@link Resource} containing the schema for the given {@link JdbcRepositorySettings} from the classpath.
4548
*
46-
* @param databaseType must not be {@literal null}.
49+
* @param settings must not be {@literal null}.
4750
* @return will never be {@literal null}.
4851
*/
49-
Resource getSchemaResource(DatabaseType databaseType) {
52+
Collection<Resource> getSchemaResource(JdbcRepositorySettings settings) {
5053

51-
Assert.notNull(databaseType, "DatabaseType must not be null!");
54+
Assert.notNull(settings, "JdbcRepositorySettings must not be null!");
5255

56+
var databaseType = settings.getDatabaseType();
5357
var schemaResourceFilename = databaseType.getSchemaResourceFilename();
54-
return resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + schemaResourceFilename);
58+
var schemaResource = resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + schemaResourceFilename);
59+
60+
return !settings.isArchiveCompletion()
61+
? List.of(schemaResource)
62+
: List.of(schemaResource, resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + databaseType.getArchiveSchemaResourceFilename()));
63+
5564
}
5665
}

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/DatabaseType.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* @author Björn Kieling
2626
* @author Oliver Drotbohm
2727
* @author Raed Ben Hamouda
28+
* @author Cora Iberkleid
2829
*/
2930
enum DatabaseType {
3031

@@ -106,7 +107,7 @@ boolean isSchemaSupported() {
106107
}
107108
};
108109

109-
static final String SCHEMA_NOT_SUPPORTED = "Setting the schema name not supported on MySQL!";
110+
static final String SCHEMA_NOT_SUPPORTED = "Setting the schema name is not supported!";
110111

111112
static DatabaseType from(String productName) {
112113

@@ -138,6 +139,8 @@ String getSchemaResourceFilename() {
138139
return "/schema-" + value + ".sql";
139140
}
140141

142+
String getArchiveSchemaResourceFilename() { return "/schema-" + value + "-archive.sql"; }
143+
141144
boolean isSchemaSupported() {
142145
return true;
143146
}

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationAutoConfiguration.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import javax.sql.DataSource;
2121

22+
import org.springframework.beans.factory.annotation.Autowired;
2223
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
2324
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2425
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -44,14 +45,16 @@
4445
@EnableConfigurationProperties(JdbcConfigurationProperties.class)
4546
class JdbcEventPublicationAutoConfiguration implements EventPublicationConfigurationExtension {
4647

48+
@Autowired Environment environment;
49+
4750
@Bean
4851
DatabaseType databaseType(DataSource dataSource) {
4952
return DatabaseType.from(fromDataSource(dataSource));
5053
}
5154

5255
@Bean
5356
JdbcRepositorySettings jdbcEventPublicationRepositorySettings(DatabaseType databaseType,
54-
JdbcConfigurationProperties properties, Environment environment) {
57+
JdbcConfigurationProperties properties) {
5558

5659
return new JdbcRepositorySettings(databaseType, CompletionMode.from(environment), properties.getSchema());
5760
}
@@ -65,9 +68,9 @@ JdbcEventPublicationRepository jdbcEventPublicationRepository(JdbcTemplate jdbcT
6568
@Bean
6669
@ConditionalOnProperty(name = "spring.modulith.events.jdbc.schema-initialization.enabled", havingValue = "true")
6770
DatabaseSchemaInitializer databaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader,
68-
DatabaseType databaseType, JdbcTemplate jdbcTemplate, JdbcConfigurationProperties properties) {
71+
DatabaseType databaseType, JdbcTemplate jdbcTemplate, JdbcRepositorySettings settings) {
6972

70-
return new DatabaseSchemaInitializer(dataSource, resourceLoader, databaseType, jdbcTemplate, properties);
73+
return new DatabaseSchemaInitializer(dataSource, resourceLoader, jdbcTemplate, settings);
7174
}
7275

7376
private static String fromDataSource(DataSource dataSource) {

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Björn Kieling
5151
* @author Oliver Drotbohm
5252
* @author Raed Ben Hamouda
53+
* @author Cora Iberkleid
5354
*/
5455
class JdbcEventPublicationRepository implements EventPublicationRepository, BeanClassLoaderAware {
5556

@@ -130,20 +131,39 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
130131
ID = ?
131132
""";
132133

133-
private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """
134+
private static final String SQL_STATEMENT_DELETE_COMPLETED = """
134135
DELETE
135136
FROM %s
136137
WHERE
137138
COMPLETION_DATE IS NOT NULL
138139
""";
139140

140-
private static final String SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE = """
141+
private static final String SQL_STATEMENT_DELETE_COMPLETED_BEFORE = """
141142
DELETE
142143
FROM %s
143144
WHERE
144145
COMPLETION_DATE < ?
145146
""";
146147

148+
private static final String SQL_STATEMENT_COPY_TO_ARCHIVE_BY_ID = """
149+
-- Only copy if no entry in target table
150+
INSERT INTO %s (ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, COMPLETION_DATE)
151+
SELECT ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, ?
152+
FROM %s
153+
WHERE ID = ?
154+
AND NOT EXISTS (SELECT 1 FROM %s WHERE ID = EVENT_PUBLICATION.ID)
155+
""";
156+
157+
private static final String SQL_STATEMENT_COPY_TO_ARCHIVE_BY_EVENT_AND_LISTENER_ID = """
158+
-- Only copy if no entry in target table
159+
INSERT INTO %s (ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, COMPLETION_DATE)
160+
SELECT ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, ?
161+
FROM %s
162+
WHERE LISTENER_ID = ?
163+
AND SERIALIZED_EVENT = ?
164+
AND NOT EXISTS (SELECT 1 FROM %s WHERE ID = EVENT_PUBLICATION.ID)
165+
""";
166+
147167
private static final int DELETE_BATCH_SIZE = 100;
148168

149169
private final JdbcOperations operations;
@@ -162,8 +182,10 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
162182
sqlStatementDelete,
163183
sqlStatementDeleteByEventAndListenerId,
164184
sqlStatementDeleteById,
165-
sqlStatementDeleteUncompleted,
166-
sqlStatementDeleteUncompletedBefore;
185+
sqlStatementDeleteCompleted,
186+
sqlStatementDeleteCompletedBefore,
187+
sqlStatementCopyToArchive,
188+
sqlStatementCopyToArchiveByEventAndListenerId;
167189

168190
/**
169191
* Creates a new {@link JdbcEventPublicationRepository} for the given {@link JdbcOperations}, {@link EventSerializer},
@@ -186,9 +208,10 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer
186208

187209
var schema = settings.getSchema();
188210
var table = ObjectUtils.isEmpty(schema) ? "EVENT_PUBLICATION" : schema + ".EVENT_PUBLICATION";
211+
var completedTable = settings.isArchiveCompletion() ? table + "_ARCHIVE" : table;
189212

190213
this.sqlStatementInsert = SQL_STATEMENT_INSERT.formatted(table);
191-
this.sqlStatementFindCompleted = SQL_STATEMENT_FIND_COMPLETED.formatted(table);
214+
this.sqlStatementFindCompleted = SQL_STATEMENT_FIND_COMPLETED.formatted(completedTable);
192215
this.sqlStatementFindUncompleted = SQL_STATEMENT_FIND_UNCOMPLETED.formatted(table);
193216
this.sqlStatementFindUncompletedBefore = SQL_STATEMENT_FIND_UNCOMPLETED_BEFORE.formatted(table);
194217
this.sqlStatementUpdateByEventAndListenerId = SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID.formatted(table);
@@ -197,8 +220,10 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer
197220
this.sqlStatementDelete = SQL_STATEMENT_DELETE.formatted(table);
198221
this.sqlStatementDeleteByEventAndListenerId = SQL_STATEMENT_DELETE_BY_EVENT_AND_LISTENER_ID.formatted(table);
199222
this.sqlStatementDeleteById = SQL_STATEMENT_DELETE_BY_ID.formatted(table);
200-
this.sqlStatementDeleteUncompleted = SQL_STATEMENT_DELETE_UNCOMPLETED.formatted(table);
201-
this.sqlStatementDeleteUncompletedBefore = SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE.formatted(table);
223+
this.sqlStatementDeleteCompleted = SQL_STATEMENT_DELETE_COMPLETED.formatted(completedTable);
224+
this.sqlStatementDeleteCompletedBefore = SQL_STATEMENT_DELETE_COMPLETED_BEFORE.formatted(completedTable);
225+
this.sqlStatementCopyToArchive = SQL_STATEMENT_COPY_TO_ARCHIVE_BY_ID.formatted(completedTable, table, completedTable);
226+
this.sqlStatementCopyToArchiveByEventAndListenerId = SQL_STATEMENT_COPY_TO_ARCHIVE_BY_EVENT_AND_LISTENER_ID.formatted(completedTable, table, completedTable);
202227
}
203228

204229
/*
@@ -246,6 +271,14 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
246271

247272
operations.update(sqlStatementDeleteByEventAndListenerId, targetIdentifier, serializedEvent);
248273

274+
} else if (settings.isArchiveCompletion()) {
275+
276+
operations.update(sqlStatementCopyToArchiveByEventAndListenerId, //
277+
Timestamp.from(completionDate), //
278+
targetIdentifier, //
279+
serializedEvent);
280+
operations.update(sqlStatementDeleteByEventAndListenerId, targetIdentifier, serializedEvent);
281+
249282
} else {
250283

251284
operations.update(sqlStatementUpdateByEventAndListenerId, //
@@ -263,10 +296,18 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
263296
@Transactional
264297
public void markCompleted(UUID identifier, Instant completionDate) {
265298

299+
var databaseId = uuidToDatabase(identifier);
300+
var timestamp = Timestamp.from(completionDate);
301+
266302
if (settings.isDeleteCompletion()) {
267-
operations.update(sqlStatementDeleteById, uuidToDatabase(identifier));
303+
operations.update(sqlStatementDeleteById, databaseId);
304+
305+
} else if (settings.isArchiveCompletion()) {
306+
operations.update(sqlStatementCopyToArchive, timestamp, databaseId);
307+
operations.update(sqlStatementDeleteById, databaseId);
308+
268309
} else {
269-
operations.update(sqlStatementUpdateById, Timestamp.from(completionDate), uuidToDatabase(identifier));
310+
operations.update(sqlStatementUpdateById, timestamp, databaseId);
270311
}
271312
}
272313

@@ -338,7 +379,7 @@ public void deletePublications(List<UUID> identifiers) {
338379
*/
339380
@Override
340381
public void deleteCompletedPublications() {
341-
operations.execute(sqlStatementDeleteUncompleted);
382+
operations.execute(sqlStatementDeleteCompleted);
342383
}
343384

344385
/*
@@ -350,7 +391,7 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
350391

351392
Assert.notNull(instant, "Instant must not be null!");
352393

353-
operations.update(sqlStatementDeleteUncompletedBefore, Timestamp.from(instant));
394+
operations.update(sqlStatementDeleteCompletedBefore, Timestamp.from(instant));
354395
}
355396

356397
private String serializeEvent(Object event) {
@@ -457,9 +498,7 @@ private static class JdbcEventPublication implements TargetEventPublication {
457498
* @param id must not be {@literal null}.
458499
* @param publicationDate must not be {@literal null}.
459500
* @param listenerId must not be {@literal null} or empty.
460-
* @param serializedEvent must not be {@literal null} or empty.
461-
* @param eventType must not be {@literal null}.
462-
* @param serializer must not be {@literal null}.
501+
* @param event must not be {@literal null}..
463502
* @param completionDate can be {@literal null}.
464503
*/
465504
public JdbcEventPublication(UUID id, Instant publicationDate, String listenerId, Supplier<Object> event,
@@ -468,6 +507,7 @@ public JdbcEventPublication(UUID id, Instant publicationDate, String listenerId,
468507
Assert.notNull(id, "Id must not be null!");
469508
Assert.notNull(publicationDate, "Publication date must not be null!");
470509
Assert.hasText(listenerId, "Listener id must not be null or empty!");
510+
Assert.notNull(event, "Event must not be null!");
471511

472512
this.id = id;
473513
this.publicationDate = publicationDate;

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcRepositorySettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public class JdbcRepositorySettings {
4747
this.databaseType = databaseType;
4848
this.schema = schema;
4949
this.completionMode = completionMode;
50+
51+
if (schema != null && !databaseType.isSchemaSupported()) {
52+
throw new IllegalStateException(DatabaseType.SCHEMA_NOT_SUPPORTED);
53+
}
5054
}
5155

5256
/**
@@ -74,4 +78,14 @@ public String getSchema() {
7478
public boolean isDeleteCompletion() {
7579
return completionMode == CompletionMode.DELETE;
7680
}
81+
82+
/**
83+
* Returns whether we use the archiving completion mode.
84+
*/
85+
public boolean isArchiveCompletion() { return completionMode == CompletionMode.ARCHIVE; }
86+
87+
/**
88+
* Returns whether we use the updating completion mode.
89+
*/
90+
public boolean isUpdateCompletion() { return completionMode == CompletionMode.UPDATE; }
7791
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
2+
(
3+
ID UUID NOT NULL,
4+
COMPLETION_DATE TIMESTAMP(9) WITH TIME ZONE,
5+
EVENT_TYPE VARCHAR(512) NOT NULL,
6+
LISTENER_ID VARCHAR(512) NOT NULL,
7+
PUBLICATION_DATE TIMESTAMP(9) WITH TIME ZONE NOT NULL,
8+
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
9+
PRIMARY KEY (ID)
10+
);
11+
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_LISTENER_ID_AND_SERIALIZED_EVENT_IDX ON EVENT_PUBLICATION_ARCHIVE (LISTENER_ID, SERIALIZED_EVENT);
12+
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX ON EVENT_PUBLICATION_ARCHIVE (COMPLETION_DATE);
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
2+
(
3+
ID UUID NOT NULL,
4+
COMPLETION_DATE TIMESTAMP(9),
5+
EVENT_TYPE VARCHAR(512) NOT NULL,
6+
LISTENER_ID VARCHAR(512) NOT NULL,
7+
PUBLICATION_DATE TIMESTAMP(9) NOT NULL,
8+
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
9+
PRIMARY KEY (ID)
10+
);
11+
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_LISTENER_ID_AND_SERIALIZED_EVENT_IDX ON EVENT_PUBLICATION_ARCHIVE (LISTENER_ID, SERIALIZED_EVENT);
12+
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX ON EVENT_PUBLICATION_ARCHIVE (COMPLETION_DATE);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
2+
(
3+
ID VARCHAR(36) NOT NULL,
4+
LISTENER_ID VARCHAR(512) NOT NULL,
5+
EVENT_TYPE VARCHAR(512) NOT NULL,
6+
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
7+
PUBLICATION_DATE TIMESTAMP(6) NOT NULL,
8+
COMPLETION_DATE TIMESTAMP(6) DEFAULT NULL NULL,
9+
PRIMARY KEY (ID),
10+
INDEX EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX (COMPLETION_DATE)
11+
);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
2+
(
3+
ID VARCHAR(36) NOT NULL,
4+
LISTENER_ID VARCHAR(512) NOT NULL,
5+
EVENT_TYPE VARCHAR(512) NOT NULL,
6+
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
7+
PUBLICATION_DATE TIMESTAMP(6) NOT NULL,
8+
COMPLETION_DATE TIMESTAMP(6) DEFAULT NULL NULL,
9+
PRIMARY KEY (ID),
10+
INDEX EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX (COMPLETION_DATE)
11+
);

0 commit comments

Comments
 (0)