Skip to content

Commit 07e38e7

Browse files
authored
Merge pull request #990 from FgForrest/989-obsoletefilemaintainer-is-closed-during-catalog-replacement
fix: ObsoleteFileMaintainer is closed during catalog replacement
2 parents a713530 + 6440e4b commit 07e38e7

File tree

9 files changed

+217
-141
lines changed

9 files changed

+217
-141
lines changed

evita_common/src/main/java/io/evitadb/utils/FolderLock.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.evitadb.exception.FolderAlreadyUsedException;
2828
import io.evitadb.exception.GenericEvitaInternalError;
2929
import io.evitadb.exception.UnexpectedIOException;
30+
import io.evitadb.utils.IOUtils.ExceptionThrowingRunnable;
3031
import lombok.extern.slf4j.Slf4j;
3132

3233
import javax.annotation.Nonnull;
@@ -115,8 +116,8 @@ public void close() throws IOException {
115116
"Failed to release and close the lock file " + this.lockFilePath + ".",
116117
"Failed to release and close the lock file."
117118
),
118-
(IOUtils.IOExceptionThrowingRunnable) this.lockFileLock::release,
119-
(IOUtils.IOExceptionThrowingRunnable) this.lockFileChannel::close
119+
(ExceptionThrowingRunnable) this.lockFileLock::release,
120+
(ExceptionThrowingRunnable) this.lockFileChannel::close
120121
);
121122
FileUtils.deleteFileIfExists(this.lockFilePath);
122123
ACQUIRED_LOCKS.remove(this.lockFilePath.toAbsolutePath().toString());

evita_common/src/main/java/io/evitadb/utils/IOUtils.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,26 @@ public static long copy(
111111
}
112112

113113
/**
114-
* Executes lambda logic encapsulated in {@link IOExceptionThrowingRunnable} instances, suppressing
114+
* Executes lambda logic encapsulated in {@link ExceptionThrowingRunnable} instances, suppressing
115115
* and aggregating any {@link IOException} that occurs during execution. If any exceptions are thrown, they
116116
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
117117
*
118118
* @param <T> the type of exception that will be thrown if any {@link IOException} occurs
119119
* @param exceptionFactory a supplier that provides an exception of type {@code T}, used to wrap any
120120
* {@link IOException} thrown during the execution of the provided runnables
121-
* @param consumer varargs of {@link IOExceptionThrowingRunnable} instances which encapsulate
121+
* @param consumer varargs of {@link ExceptionThrowingRunnable} instances which encapsulate
122122
* the resources/actions to be closed or executed
123123
* @throws T the consolidated exception containing any {@link IOException}s that were
124124
* thrown by the provided runnables
125125
*/
126126
public static <T extends RuntimeException> void executeSafely(
127127
@Nonnull Supplier<T> exceptionFactory,
128-
@Nonnull IOExceptionThrowingRunnable consumer
128+
@Nonnull ExceptionThrowingRunnable consumer
129129
) throws T {
130130
T exception = null;
131131
try {
132132
consumer.run();
133-
} catch (IOException e) {
133+
} catch (Exception e) {
134134
exception = exceptionFactory.get();
135135
exception.addSuppressed(e);
136136
}
@@ -179,7 +179,7 @@ public static <S, T extends RuntimeException> void executeSafely(
179179
* {@link IOException} thrown during the execution of the provided runnables
180180
* @param consumer varargs of {@link IOExceptionThrowingConsumer} instances which encapsulate
181181
* the resources/actions to be closed or executed
182-
* @throws T the consolidated exception containing any {@link IOException}s that were
182+
* @throws U the consolidated exception containing any {@link Exception}s that were
183183
* thrown by the provided runnables
184184
*/
185185
public static <S, T, U extends RuntimeException> void executeSafely(
@@ -234,24 +234,24 @@ public static <S, T extends RuntimeException> S executeSafely(
234234
}
235235

236236
/**
237-
* Closes multiple resources encapsulated in {@link IOExceptionThrowingRunnable} instances, suppressing
237+
* Closes multiple resources encapsulated in {@link ExceptionThrowingRunnable} instances, suppressing
238238
* and aggregating any {@link IOException} that occurs during execution. If any exceptions are thrown, they
239239
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
240240
*
241241
* @param <T> the type of exception that will be thrown if any {@link IOException} occurs
242242
* @param exceptionFactory a supplier that provides an exception of type {@code T}, used to wrap any
243243
* {@link IOException} thrown during the execution of the provided runnables
244-
* @param runnable varargs of {@link IOExceptionThrowingRunnable} instances which encapsulate
244+
* @param runnable varargs of {@link ExceptionThrowingRunnable} instances which encapsulate
245245
* the resources/actions to be closed or executed
246246
* @throws T the consolidated exception containing any {@link IOException}s that were
247247
* thrown by the provided runnables
248248
*/
249249
public static <T extends RuntimeException> void close(
250250
@Nonnull Supplier<T> exceptionFactory,
251-
@Nonnull IOExceptionThrowingRunnable... runnable
251+
@Nonnull ExceptionThrowingRunnable... runnable
252252
) throws T {
253253
T exception = null;
254-
for (IOExceptionThrowingRunnable lambda : runnable) {
254+
for (ExceptionThrowingRunnable lambda : runnable) {
255255
try {
256256
lambda.run();
257257
} catch (Exception e) {
@@ -269,7 +269,7 @@ public static <T extends RuntimeException> void close(
269269
* and aggregating any {@link Throwable} that occurs during execution. If any exceptions are thrown, they
270270
* are encapsulated and re-thrown as a single exception provided by the {@code exceptionFactory}.
271271
*
272-
* Unlike {@link #close(Supplier, IOExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
272+
* Unlike {@link #close(Supplier, ExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
273273
* rather than just {@link Exception}, providing a more robust safety net for resource cleanup.
274274
*
275275
* @param <T> the type of exception that will be thrown if any exception occurs
@@ -299,7 +299,7 @@ public static <T extends RuntimeException> void closeSafely(
299299
}
300300

301301
/**
302-
* Executes the provided {@link IOExceptionThrowingRunnable} instances, ensuring that exceptions thrown
302+
* Executes the provided {@link ExceptionThrowingRunnable} instances, ensuring that exceptions thrown
303303
* during their execution are logged but not propagated. This method is typically used for safely closing
304304
* resources without allowing individual close failures to disrupt the overall process.
305305
*
@@ -310,9 +310,9 @@ public static <T extends RuntimeException> void closeSafely(
310310
* @throws T if a runtime exception specific to the implementation needs propagation
311311
*/
312312
public static <T extends RuntimeException> void closeQuietly(
313-
@Nonnull IOExceptionThrowingRunnable... runnable
313+
@Nonnull ExceptionThrowingRunnable... runnable
314314
) throws T {
315-
for (IOExceptionThrowingRunnable lambda : runnable) {
315+
for (ExceptionThrowingRunnable lambda : runnable) {
316316
try {
317317
lambda.run();
318318
} catch (Exception e) {
@@ -327,7 +327,7 @@ public static <T extends RuntimeException> void closeQuietly(
327327
* during their execution are logged but not propagated. This method is typically used for safely closing
328328
* resources without allowing individual close failures to disrupt the overall process.
329329
*
330-
* Unlike {@link #closeQuietly(IOExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
330+
* Unlike {@link #closeQuietly(ExceptionThrowingRunnable...)}, this method catches any {@link Throwable}
331331
* rather than just {@link Exception}, providing a more robust safety net for resource cleanup.
332332
*
333333
* @param runnable the runnable instances, which may throw any exception during execution
@@ -351,25 +351,25 @@ public static <T extends RuntimeException> void closeSafely(
351351

352352
/**
353353
* Represents a functional interface that can be used to encapsulate a block of code
354-
* that may throw an {@link IOException}. This interface is effectively a specialized
354+
* that may throw an {@link Exception}. This interface is effectively a specialized
355355
* form of {@link Runnable} for operations where checked I/O exceptions need to be handled.
356356
*
357357
* Implementations of this interface enable the execution of operations with the awareness
358-
* and explicit handling of {@link IOException}. This is especially useful in scenarios
358+
* and explicit handling of {@link Exception}. This is especially useful in scenarios
359359
* where multiple such operations need to be executed or wrapped with exception aggregation,
360360
* for example, in utility methods like resource handling or cleanup.
361361
*
362362
* Method {@code run} is similar to the {@link Runnable#run()} method but allows an
363-
* {@link IOException} to be thrown.
363+
* {@link Exception} to be thrown.
364364
*
365365
* Functional-style programming can make use of this interface to define inline behaviors
366-
* for operations expected to throw {@link IOException}. It can also be integrated with
366+
* for operations expected to throw {@link Exception}. It can also be integrated with
367367
* various utility methods that leverage this interface for exception handling and resource management.
368368
*/
369369
@FunctionalInterface
370-
public interface IOExceptionThrowingRunnable {
370+
public interface ExceptionThrowingRunnable {
371371

372-
void run() throws IOException;
372+
void run() throws Exception;
373373

374374
}
375375

evita_engine/src/main/java/io/evitadb/core/executor/DelayedAsyncTask.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import io.evitadb.core.metric.event.system.BackgroundTaskFinishedEvent;
2727
import io.evitadb.core.metric.event.system.BackgroundTaskStartedEvent;
28+
import io.evitadb.exception.GenericEvitaInternalError;
2829
import io.evitadb.utils.Assert;
2930
import lombok.Getter;
3031
import lombok.extern.slf4j.Slf4j;
@@ -147,6 +148,7 @@ public DelayedAsyncTask(
147148
* The task is scheduled using the scheduler's schedule method.
148149
*/
149150
public void scheduleImmediately() {
151+
assertNotClosed();
150152
final OffsetDateTime now = OffsetDateTime.now();
151153
if (this.nextPlannedExecution.compareAndExchange(OffsetDateTime.MIN, now) == OffsetDateTime.MIN) {
152154
scheduleTask(computeMinimalSchedulingGap(now.toInstant().toEpochMilli()));
@@ -162,6 +164,8 @@ public void scheduleImmediately() {
162164
* The task is scheduled using the scheduler's schedule method.
163165
*/
164166
public void schedule() {
167+
assertNotClosed();
168+
165169
if (this.delay == Long.MAX_VALUE) {
166170
// the task is manual task and will never be scheduled
167171
return;
@@ -191,6 +195,20 @@ public void close() throws IOException {
191195
}
192196
// release the lambda to allow garbage collection
193197
this.lambda.set(null);
198+
this.nextPlannedExecution.set(OffsetDateTime.MIN);
199+
}
200+
}
201+
202+
/**
203+
* Ensures that the task is not closed before proceeding with its execution or scheduling.
204+
* If the task is already marked as closed, this method throws a GenericEvitaInternalError
205+
* to indicate that the operation cannot be performed on a closed task.
206+
*
207+
* @throws GenericEvitaInternalError if the task is marked as closed
208+
*/
209+
private void assertNotClosed() {
210+
if (this.closed.get()) {
211+
throw new GenericEvitaInternalError("Cannot schedule task `" + this.taskName + "` that has been closed.");
194212
}
195213
}
196214

@@ -223,8 +241,9 @@ private long computeMinimalSchedulingGap(long nowMillis) {
223241
if (lastFinishedExecutionTime.equals(OffsetDateTime.MIN)) {
224242
return 0;
225243
} else {
226-
return Math.max(0, this.minimalSchedulingGap - (nowMillis - lastFinishedExecutionTime.toInstant()
227-
.toEpochMilli())
244+
return Math.max(
245+
0, this.minimalSchedulingGap - (nowMillis - lastFinishedExecutionTime.toInstant()
246+
.toEpochMilli())
228247
);
229248
}
230249
}

evita_engine/src/main/java/io/evitadb/core/file/ExportFileService.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -494,24 +494,30 @@ void purgeFiles(@Nonnull OffsetDateTime thresholdDate) {
494494
log.error("Failed to list files in the directory: {}", this.storageOptions.exportDirectory(), e);
495495
}
496496

497-
// then check the size of the directory and delete oldest files until the directory size is below the limit
498-
final long directorySize = FileUtils.getDirectorySize(this.storageOptions.exportDirectory());
499-
// delete the oldest files until the directory size is below the limit
500-
if (directorySize > this.storageOptions.exportDirectorySizeLimitBytes()) {
501-
final List<FileForFetch> filesByCreationDate = this.files.stream()
502-
.sorted(Comparator.comparing(FileForFetch::created))
503-
.toList();
504-
long savedSize = 0L;
505-
for (FileForFetch it : filesByCreationDate) {
506-
log.info("Purging the oldest file, because the export directory grew too big: {}", it);
507-
final long metadataFileSize = it.metadataPath(this.storageOptions.exportDirectory()).toFile().length();
508-
deleteFile(it.fileId());
509-
savedSize += it.totalSizeInBytes() + metadataFileSize;
510-
// finish removing files if the directory size is below the limit
511-
if (directorySize - savedSize <= this.storageOptions.exportDirectorySizeLimitBytes()) {
512-
break;
497+
try {
498+
// then check the size of the directory and delete oldest files until the directory size is below the limit
499+
final long directorySize = FileUtils.getDirectorySize(this.storageOptions.exportDirectory());
500+
// delete the oldest files until the directory size is below the limit
501+
if (directorySize > this.storageOptions.exportDirectorySizeLimitBytes()) {
502+
final List<FileForFetch> filesByCreationDate = this.files.stream()
503+
.sorted(Comparator.comparing(FileForFetch::created))
504+
.toList();
505+
long savedSize = 0L;
506+
for (FileForFetch it : filesByCreationDate) {
507+
log.info("Purging the oldest file, because the export directory grew too big: {}", it);
508+
final long metadataFileSize = it.metadataPath(this.storageOptions.exportDirectory())
509+
.toFile()
510+
.length();
511+
deleteFile(it.fileId());
512+
savedSize += it.totalSizeInBytes() + metadataFileSize;
513+
// finish removing files if the directory size is below the limit
514+
if (directorySize - savedSize <= this.storageOptions.exportDirectorySizeLimitBytes()) {
515+
break;
516+
}
513517
}
514518
}
519+
} catch (UnexpectedIOException e) {
520+
log.error("Failed to calculate size of the directory: {}", this.storageOptions.exportDirectory(), e);
515521
}
516522
}
517523

evita_engine/src/main/java/io/evitadb/core/traffic/task/TrafficRecorderTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.evitadb.stream.RandomAccessFileInputStream;
4040
import io.evitadb.utils.Assert;
4141
import io.evitadb.utils.IOUtils;
42+
import io.evitadb.utils.IOUtils.ExceptionThrowingRunnable;
4243
import io.evitadb.utils.StringUtils;
4344
import lombok.extern.slf4j.Slf4j;
4445

@@ -212,7 +213,7 @@ private FileForFetch start() {
212213
if (exportSessionSink != null) {
213214
IOUtils.close(
214215
() -> new UnexpectedIOException("Failed to close export session sink."),
215-
(IOUtils.IOExceptionThrowingRunnable) exportSessionSink::close
216+
(ExceptionThrowingRunnable) exportSessionSink::close
216217
);
217218
}
218219
}

evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -299,33 +299,42 @@ void tearDown() throws IOException {
299299
@ParameterizedTest
300300
@MethodSource("combineSettings")
301301
void shouldSerializeAndReconstructEmptyOffsetIndex(Crc32Check crc32Check, Compression compression) {
302-
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
303-
configure(StorageOptions.temporary(), crc32Check, compression), EntityBodyStoragePart::new, 0
304-
);
305-
IOUtils.closeQuietly(insertionOutput.fileOffsetIndex::close);
302+
final StorageOptions storageOptions = configure(StorageOptions.temporary(), crc32Check, compression);
303+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
304+
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
305+
storageOptions,
306+
observableOutputKeeper,
307+
EntityBodyStoragePart::new, 0
308+
);
309+
IOUtils.closeQuietly(insertionOutput.fileOffsetIndex::close);
310+
}
306311
}
307312

308313
@DisplayName("Offset index can be stored empty and then new records added.")
309314
@ParameterizedTest
310315
@MethodSource("combineSettings")
311316
void shouldSerializeEmptyOffsetIndexWithLaterAddingRecordsAndReconstructCorrectly(Crc32Check crc32Check, Compression compression) {
312317
final StorageOptions storageOptions = configure(StorageOptions.temporary(), crc32Check, compression);
313-
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(storageOptions, EntityBodyStoragePart::new, 0);
314-
final OffsetIndex offsetIndex = insertionOutput.fileOffsetIndex();
315-
final InsertionOutput insertionOutput2 = createRecordsInFileOffsetIndex(offsetIndex, 100, 0, 1);
316-
/* input count records +1 record for the OffsetIndex itself */
317-
if (crc32Check == Crc32Check.YES) {
318+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
319+
final InsertionOutput insertionOutput = shouldSerializeAndReconstructOffsetIndex(
320+
storageOptions, observableOutputKeeper, EntityBodyStoragePart::new, 0
321+
);
322+
final OffsetIndex offsetIndex = insertionOutput.fileOffsetIndex();
323+
final InsertionOutput insertionOutput2 = createRecordsInFileOffsetIndex(offsetIndex, 100, 0, 1);
324+
/* input count records +1 record for the OffsetIndex itself */
325+
if (crc32Check == Crc32Check.YES) {
326+
assertEquals(
327+
/* 100 records, 1 empty header, 1 header with single fragment */
328+
100 + computeExpectedRecordCount(storageOptions, 100).fragments() + 1,
329+
offsetIndex.verifyContents().getRecordCount()
330+
);
331+
}
318332
assertEquals(
319-
/* 100 records, 1 empty header, 1 header with single fragment */
320-
100 + computeExpectedRecordCount(storageOptions, 100).fragments() + 1,
321-
offsetIndex.verifyContents().getRecordCount()
333+
100,
334+
offsetIndex.count(insertionOutput2.catalogVersion())
322335
);
336+
IOUtils.closeQuietly(offsetIndex::close);
323337
}
324-
assertEquals(
325-
100,
326-
offsetIndex.count(insertionOutput2.catalogVersion())
327-
);
328-
IOUtils.closeQuietly(offsetIndex::close);
329338
}
330339

331340
@DisplayName("Hundreds entities should be stored in OffsetIndex and retrieved intact.")
@@ -922,19 +931,21 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex(
922931
@Nonnull StorageOptions storageOptions,
923932
@Nonnull IntFunction<EntityBodyStoragePart> bodyPartFactory
924933
) {
925-
return shouldSerializeAndReconstructOffsetIndex(storageOptions, bodyPartFactory, 600);
934+
try (final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class))) {
935+
return shouldSerializeAndReconstructOffsetIndex(storageOptions, observableOutputKeeper, bodyPartFactory, 600);
936+
}
926937
}
927938

928939
@Nonnull
929940
private InsertionOutput shouldSerializeAndReconstructOffsetIndex(
930941
@Nonnull StorageOptions storageOptions,
942+
@Nonnull ObservableOutputKeeper observableOutputKeeper,
931943
@Nonnull IntFunction<EntityBodyStoragePart> bodyPartFactory,
932944
int recordCount
933945
) {
934946
OffsetIndex loadedFileOffsetIndex = null;
935947
try (
936-
final ObservableOutputKeeper observableOutputKeeper = new ObservableOutputKeeper(TEST_CATALOG, storageOptions, Mockito.mock(Scheduler.class));
937-
final WriteOnlyFileHandle writeHandle = new WriteOnlyFileHandle(this.targetFile, storageOptions, observableOutputKeeper);
948+
final WriteOnlyFileHandle writeHandle = new WriteOnlyFileHandle(this.targetFile, storageOptions, observableOutputKeeper)
938949
) {
939950
final OffsetIndex fileOffsetIndex = new OffsetIndex(
940951
0L,

0 commit comments

Comments
 (0)