Skip to content

Commit 6440e4b

Browse files
committed
fix: ObsoleteFileMaintainer is closed during catalog replacement
When catalog is replaced with another catalog contents, its ObsoleteFileMaintainer is closed which effectivelly stops old files purging. The replaced catalog continues to be used (and functional) while the purging logic is non functional. Refs: #989 Signed-off-by: Jan Novotný <novotnaci@gmail.com>
1 parent a713530 commit 6440e4b

File tree

9 files changed

+217
-141
lines changed

9 files changed

+217
-141
lines changed

evita_common/src/main/java/io/evitadb/utils/FolderLock.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.evitadb.exception.FolderAlreadyUsedException;
2828
import io.evitadb.exception.GenericEvitaInternalError;
2929
import io.evitadb.exception.UnexpectedIOException;
30+
import io.evitadb.utils.IOUtils.ExceptionThrowingRunnable;
3031
import lombok.extern.slf4j.Slf4j;
3132

3233
import javax.annotation.Nonnull;
@@ -115,8 +116,8 @@ public void close() throws IOException {
115116
"Failed to release and close the lock file " + this.lockFilePath + ".",
116117
"Failed to release and close the lock file."
117118
),
118-
(IOUtils.IOExceptionThrowingRunnable) this.lockFileLock::release,
119-
(IOUtils.IOExceptionThrowingRunnable) this.lockFileChannel::close
119+
(ExceptionThrowingRunnable) this.lockFileLock::release,
120+
(ExceptionThrowingRunnable) this.lockFileChannel::close
120121
);
121122
FileUtils.deleteFileIfExists(this.lockFilePath);
122123
ACQUIRED_LOCKS.remove(this.lockFilePath.toAbsolutePath().toString());

evita_common/src/main/java/io/evitadb/utils/IOUtils.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,26 @@ public static long copy(
111111
}
112112

113113
/**
114-
* Executes lambda logic encapsulated in {@link IOExceptionThrowingRunnable} instances, suppressing
114+
* Executes lambda logic encapsulated in {@link ExceptionThrowingRunnable} instances, suppressing
115115
* and aggregating any {@link IOException} that occurs during execution. If any exceptions are thrown, they
116116
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
117117
*
118118
* @param <T> the type of exception that will be thrown if any {@link IOException} occurs
119119
* @param exceptionFactory a supplier that provides an exception of type {@code T}, used to wrap any
120120
* {@link IOException} thrown during the execution of the provided runnables
121-
* @param consumer varargs of {@link IOExceptionThrowingRunnable} instances which encapsulate
121+
* @param consumer varargs of {@link ExceptionThrowingRunnable} instances which encapsulate
122122
* the resources/actions to be closed or executed
123123
* @throws T the consolidated exception containing any {@link IOException}s that were
124124
* thrown by the provided runnables
125125
*/
126126
public static <T extends RuntimeException> void executeSafely(
127127
@Nonnull Supplier<T> exceptionFactory,
128-
@Nonnull IOExceptionThrowingRunnable consumer
128+
@Nonnull ExceptionThrowingRunnable consumer
129129
) throws T {
130130
T exception = null;
131131
try {
132132
consumer.run();
133-
} catch (IOException e) {
133+
} catch (Exception e) {
134134
exception = exceptionFactory.get();
135135
exception.addSuppressed(e);
136136
}
@@ -179,7 +179,7 @@ public static <S, T extends RuntimeException> void executeSafely(
179179
* {@link IOException} thrown during the execution of the provided runnables
180180
* @param consumer varargs of {@link IOExceptionThrowingConsumer} instances which encapsulate
181181
* the resources/actions to be closed or executed
182-
* @throws T the consolidated exception containing any {@link IOException}s that were
182+
* @throws U the consolidated exception containing any {@link Exception}s that were
183183
* thrown by the provided runnables
184184
*/
185185
public static <S, T, U extends RuntimeException> void executeSafely(
@@ -234,24 +234,24 @@ public static <S, T extends RuntimeException> S executeSafely(
234234
}
235235

236236
/**
237-
* Closes multiple resources encapsulated in {@link IOExceptionThrowingRunnable} instances, suppressing
237+
* Closes multiple resources encapsulated in {@link ExceptionThrowingRunnable} instances, suppressing
238238
* and aggregating any {@link IOException} that occurs during execution. If any exceptions are thrown, they
239239
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
240240
*
241241
* @param <T> the type of exception that will be thrown if any {@link IOException} occurs
242242
* @param exceptionFactory a supplier that provides an exception of type {@code T}, used to wrap any
243243
* {@link IOException} thrown during the execution of the provided runnables
244-
* @param runnable varargs of {@link IOExceptionThrowingRunnable} instances which encapsulate
244+
* @param runnable varargs of {@link ExceptionThrowingRunnable} instances which encapsulate
245245
* the resources/actions to be closed or executed
246246
* @throws T the consolidated exception containing any {@link IOException}s that were
247247
* thrown by the provided runnables
248248
*/
249249
public static <T extends RuntimeException> void close(
250250
@Nonnull Supplier<T> exceptionFactory,
251-
@Nonnull IOExceptionThrowingRunnable... runnable
251+
@Nonnull ExceptionThrowingRunnable... runnable
252252
) throws T {
253253
T exception = null;
254-
for (IOExceptionThrowingRunnable lambda : runnable) {
254+
for (ExceptionThrowingRunnable lambda : runnable) {
255255
try {
256256
lambda.run();
257257
} catch (Exception e) {
@@ -269,7 +269,7 @@ public static <T extends RuntimeException> void close(
269269
* and aggregating any {@link Throwable} that occurs during execution. If any exceptions are thrown, they
270270
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
271271
*
272-
* Unlike {@link #close(Supplier, IOExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
272+
* Unlike {@link #close(Supplier, ExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
273273
* rather than just {@link Exception}, providing a more robust safety net for resource cleanup.
274274
*
275275
* @param <T> the type of exception that will be thrown if any exception occurs
@@ -299,7 +299,7 @@ public static <T extends RuntimeException> void closeSafely(
299299
}
300300

301301
/**
302-
* Executes the provided {@link IOExceptionThrowingRunnable} instances, ensuring that exceptions thrown
302+
* Executes the provided {@link ExceptionThrowingRunnable} instances, ensuring that exceptions thrown
303303
* during their execution are logged but not propagated. This method is typically used for safely closing
304304
* resources without allowing individual close failures to disrupt the overall process.
305305
*
@@ -310,9 +310,9 @@ public static <T extends RuntimeException> void closeSafely(
310310
* @throws T if a runtime exception specific to the implementation needs propagation
311311
*/
312312
public static <T extends RuntimeException> void closeQuietly(
313-
@Nonnull IOExceptionThrowingRunnable... runnable
313+
@Nonnull ExceptionThrowingRunnable... runnable
314314
) throws T {
315-
for (IOExceptionThrowingRunnable lambda : runnable) {
315+
for (ExceptionThrowingRunnable lambda : runnable) {
316316
try {
317317
lambda.run();
318318
} catch (Exception e) {
@@ -327,7 +327,7 @@ public static <T extends RuntimeException> void closeQuietly(
327327
* during their execution are logged but not propagated. This method is typically used for safely closing
328328
* resources without allowing individual close failures to disrupt the overall process.
329329
*
330-
* Unlike {@link #closeQuietly(IOExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
330+
* Unlike {@link #closeQuietly(ExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
331331
* rather than just {@link Exception}, providing a more robust safety net for resource cleanup.
332332
*
333333
* @param runnable the runnable instances, which may throw any exception during execution
@@ -351,25 +351,25 @@ public static <T extends RuntimeException> void closeSafely(
351351

352352
/**
353353
* Represents a functional interface that can be used to encapsulate a block of code
354-
* that may throw an {@link IOException}. This interface is effectively a specialized
354+
* that may throw an {@link Exception}. This interface is effectively a specialized
355355
* form of {@link Runnable} for operations where checked I/O exceptions need to be handled.
356356
*
357357
* Implementations of this interface enable the execution of operations with the awareness
358-
* and explicit handling of {@link IOException}. This is especially useful in scenarios
358+
* and explicit handling of {@link Exception}. This is especially useful in scenarios
359359
* where multiple such operations need to be executed or wrapped with exception aggregation,
360360
* for example, in utility methods like resource handling or cleanup.
361361
*
362362
* Method {@code run} is similar to the {@link Runnable#run()} method but allows an
363-
* {@link IOException} to be thrown.
363+
* {@link Exception} to be thrown.
364364
*
365365
* Functional-style programming can make use of this interface to define inline behaviors
366-
* for operations expected to throw {@link IOException}. It can also be integrated with
366+
* for operations expected to throw {@link Exception}. It can also be integrated with
367367
* various utility methods that leverage this interface for exception handling and resource management.
368368
*/
369369
@FunctionalInterface
370-
public interface IOExceptionThrowingRunnable {
370+
public interface ExceptionThrowingRunnable {
371371

372-
void run() throws IOException;
372+
void run() throws Exception;
373373

374374
}
375375

evita_engine/src/main/java/io/evitadb/core/executor/DelayedAsyncTask.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import io.evitadb.core.metric.event.system.BackgroundTaskFinishedEvent;
2727
import io.evitadb.core.metric.event.system.BackgroundTaskStartedEvent;
28+
import io.evitadb.exception.GenericEvitaInternalError;
2829
import io.evitadb.utils.Assert;
2930
import lombok.Getter;
3031
import lombok.extern.slf4j.Slf4j;
@@ -147,6 +148,7 @@ public DelayedAsyncTask(
147148
* The task is scheduled using the scheduler's schedule method.
148149
*/
149150
public void scheduleImmediately() {
151+
assertNotClosed();
150152
final OffsetDateTime now = OffsetDateTime.now();
151153
if (this.nextPlannedExecution.compareAndExchange(OffsetDateTime.MIN, now) == OffsetDateTime.MIN) {
152154
scheduleTask(computeMinimalSchedulingGap(now.toInstant().toEpochMilli()));
@@ -162,6 +164,8 @@ public void scheduleImmediately() {
162164
* The task is scheduled using the scheduler's schedule method.
163165
*/
164166
public void schedule() {
167+
assertNotClosed();
168+
165169
if (this.delay == Long.MAX_VALUE) {
166170
// the task is manual task and will never be scheduled
167171
return;
@@ -191,6 +195,20 @@ public void close() throws IOException {
191195
}
192196
// release the lambda to allow garbage collection
193197
this.lambda.set(null);
198+
this.nextPlannedExecution.set(OffsetDateTime.MIN);
199+
}
200+
}
201+
202+
/**
203+
* Ensures that the task is not closed before proceeding with its execution or scheduling.
204+
* If the task is already marked as closed, this method throws a GenericEvitaInternalError
205+
* to indicate that the operation cannot be performed on a closed task.
206+
*
207+
* @throws GenericEvitaInternalError if the task is marked as closed
208+
*/
209+
private void assertNotClosed() {
210+
if (this.closed.get()) {
211+
throw new GenericEvitaInternalError("Cannot schedule task `" + this.taskName + "` that has been closed.");
194212
}
195213
}
196214

@@ -223,8 +241,9 @@ private long computeMinimalSchedulingGap(long nowMillis) {
223241
if (lastFinishedExecutionTime.equals(OffsetDateTime.MIN)) {
224242
return 0;
225243
} else {
226-
return Math.max(0, this.minimalSchedulingGap - (nowMillis - lastFinishedExecutionTime.toInstant()
227-
.toEpochMilli())
244+
return Math.max(
245+
0, this.minimalSchedulingGap - (nowMillis - lastFinishedExecutionTime.toInstant()
246+
.toEpochMilli())
228247
);
229248
}
230249
}

evita_engine/src/main/java/io/evitadb/core/file/ExportFileService.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -494,24 +494,30 @@ void purgeFiles(@Nonnull OffsetDateTime thresholdDate) {
494494
log.error("Failed to list files in the directory: {}", this.storageOptions.exportDirectory(), e);
495495
}
496496

497-
// then check the size of the directory and delete oldest files until the directory size is below the limit
498-
final long directorySize = FileUtils.getDirectorySize(this.storageOptions.exportDirectory());
499-
// delete the oldest files until the directory size is below the limit
500-
if (directorySize > this.storageOptions.exportDirectorySizeLimitBytes()) {
501-
final List<FileForFetch> filesByCreationDate = this.files.stream()
502-
.sorted(Comparator.comparing(FileForFetch::created))
503-
.toList();
504-
long savedSize = 0L;
505-
for (FileForFetch it : filesByCreationDate) {
506-
log.info("Purging the oldest file, because the export directory grew too big: {}", it);
507-
final long metadataFileSize = it.metadataPath(this.storageOptions.exportDirectory()).toFile().length();
508-
deleteFile(it.fileId());
509-
savedSize += it.totalSizeInBytes() + metadataFileSize;
510-
// finish removing files if the directory size is below the limit
511-
if (directorySize - savedSize <= this.storageOptions.exportDirectorySizeLimitBytes()) {
512-
break;
497+
try {
498+
// then check the size of the directory and delete oldest files until the directory size is below the limit
499+
final long directorySize = FileUtils.getDirectorySize(this.storageOptions.exportDirectory());
500+
// delete the oldest files until the directory size is below the limit
501+
if (directorySize > this.storageOptions.exportDirectorySizeLimitBytes()) {
502+
final List<FileForFetch> filesByCreationDate = this.files.stream()
503+
.sorted(Comparator.comparing(FileForFetch::created))
504+
.toList();
505+
long savedSize = 0L;
506+
for (FileForFetch it : filesByCreationDate) {
507+
log.info("Purging the oldest file, because the export directory grew too big: {}", it);
508+
final long metadataFileSize = it.metadataPath(this.storageOptions.exportDirectory())
509+
.toFile()
510+
.length();
511+
deleteFile(it.fileId());
512+
savedSize += it.totalSizeInBytes() + metadataFileSize;
513+
// finish removing files if the directory size is below the limit
514+
if (directorySize - savedSize <= this.storageOptions.exportDirectorySizeLimitBytes()) {
515+
break;
516+
}
513517
}
514518
}
519+
} catch (UnexpectedIOException e) {
520+
log.error("Failed to calculate size of the directory: {}", this.storageOptions.exportDirectory(), e);
515521
}
516522
}
517523

evita_engine/src/main/java/io/evitadb/core/traffic/task/TrafficRecorderTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.evitadb.stream.RandomAccessFileInputStream;
4040
import io.evitadb.utils.Assert;
4141
import io.evitadb.utils.IOUtils;
42+
import io.evitadb.utils.IOUtils.ExceptionThrowingRunnable;
4243
import io.evitadb.utils.StringUtils;
4344
import lombok.extern.slf4j.Slf4j;
4445

@@ -212,7 +213,7 @@ private FileForFetch start() {
212213
if (exportSessionSink != null) {
213214
IOUtils.close(
214215
() -> new UnexpectedIOException("Failed to close export session sink."),
215-
(IOUtils.IOExceptionThrowingRunnable) exportSessionSink::close
216+
(ExceptionThrowingRunnable) exportSessionSink::close
216217
);
217218
}
218219
}

evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -299,33 +299,42 @@ void tearDown() throws IOException {
299299
@ParameterizedTest
300300
@MethodSource("combineSettings")
301301
void shouldSerializeAndReconstructEmptyOffsetIndex(Crc32Check crc32Check, Compression compression) {
302-
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
303-
configure(StorageOptions.temporary(), crc32Check, compression), EntityBodyStoragePart::new, 0
304-
);
305-
IOUtils.closeQuietly(insertionOutput.fileOffsetIndex::close);
302+
final StorageOptions storageOptions = configure(StorageOptions.temporary(), crc32Check, compression);
303+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
304+
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
305+
storageOptions,
306+
observableOutputKeeper,
307+
EntityBodyStoragePart::new, 0
308+
);
309+
IOUtils.closeQuietly(insertionOutput.fileOffsetIndex::close);
310+
}
306311
}
307312

308313
@DisplayName("Offset index can be stored empty and then new records added.")
309314
@ParameterizedTest
310315
@MethodSource("combineSettings")
311316
void shouldSerializeEmptyOffsetIndexWithLaterAddingRecordsAndReconstructCorrectly(Crc32Check crc32Check, Compression compression) {
312317
final StorageOptions storageOptions = configure(StorageOptions.temporary(), crc32Check, compression);
313-
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(storageOptions, EntityBodyStoragePart::new, 0);
314-
final OffsetIndex offsetIndex = insertionOutput.fileOffsetIndex();
315-
final InsertionOutput insertionOutput2 = createRecordsInFileOffsetIndex(offsetIndex, 100, 0, 1);
316-
/* input count records +1 record for the OffsetIndex itself */
317-
if (crc32Check == Crc32Check.YES) {
318+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
319+
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
320+
storageOptions, observableOutputKeeper, EntityBodyStoragePart::new, 0
321+
);
322+
final OffsetIndex offsetIndex = insertionOutput.fileOffsetIndex();
323+
final InsertionOutput insertionOutput2 = createRecordsInFileOffsetIndex(offsetIndex, 100, 0, 1);
324+
/* input count records +1 record for the OffsetIndex itself */
325+
if (crc32Check == Crc32Check.YES) {
326+
assertEquals(
327+
/* 100 records, 1 empty header, 1 header with single fragment */
328+
100 + computeExpectedRecordCount(storageOptions, 100).fragments() + 1,
329+
offsetIndex.verifyContents().getRecordCount()
330+
);
331+
}
318332
assertEquals(
319-
/* 100 records, 1 empty header, 1 header with single fragment */
320-
100 + computeExpectedRecordCount(storageOptions, 100).fragments() + 1,
321-
offsetIndex.verifyContents().getRecordCount()
333+
100,
334+
offsetIndex.count(insertionOutput2.catalogVersion())
322335
);
336+
IOUtils.closeQuietly(offsetIndex::close);
323337
}
324-
assertEquals(
325-
100,
326-
offsetIndex.count(insertionOutput2.catalogVersion())
327-
);
328-
IOUtils.closeQuietly(offsetIndex::close);
329338
}
330339

331340
@DisplayName("Hundreds entities should be stored in OffsetIndex and retrieved intact.")
@@ -922,19 +931,21 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex(
922931
@Nonnull StorageOptions storageOptions,
923932
@Nonnull IntFunction<EntityBodyStoragePart> bodyPartFactory
924933
) {
925-
return shouldSerializeAndReconstructOffsetIndex(storageOptions, bodyPartFactory, 600);
934+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
935+
return shouldSerializeAndReconstructOffsetIndex(storageOptions, observableOutputKeeper, bodyPartFactory, 600);
936+
}
926937
}
927938

928939
@Nonnull
929940
private InsertionOutput shouldSerializeAndReconstructOffsetIndex(
930941
@Nonnull StorageOptions storageOptions,
942+
@Nonnull ObservableOutputKeeper observableOutputKeeper,
931943
@Nonnull IntFunction<EntityBodyStoragePart> bodyPartFactory,
932944
int recordCount
933945
) {
934946
OffsetIndex loadedFileOffsetIndex = null;
935947
try (
936-
final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class));
937-
final WriteOnlyFileHandle writeHandle = new WriteOnlyFileHandle(this.targetFile, storageOptions, observableOutputKeeper);
948+
final WriteOnlyFileHandle writeHandle = new WriteOnlyFileHandle(this.targetFile, storageOptions, observableOutputKeeper)
938949
) {
939950
final OffsetIndex fileOffsetIndex = new OffsetIndex(
940951
0L,

0 commit comments

Comments
 (0)