Skip to content

Commit 7487fb6

Browse files
authored
Merge pull request #918 from FgForrest/dev
fix: error during closing session after goLive operation via gRPC
2 parents 6c0c7b4 + 1206519 commit 7487fb6

File tree

15 files changed

+1394
-263
lines changed

15 files changed

+1394
-263
lines changed

evita_api/src/main/java/io/evitadb/api/GoLiveProgressRecord.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ public void updatePercentCompleted(int percentage) {
101101
if (percentage < 0 || percentage > 100) {
102102
throw new IllegalArgumentException("Percentage must be between 0 and 100, but was: " + percentage);
103103
}
104-
this.percentCompleted.set(percentage);
105-
notifyClientObserver(percentage);
104+
final int previousValue = this.percentCompleted.getAndSet(percentage);
105+
if (previousValue != percentage) {
106+
notifyClientObserver(percentage);
107+
}
106108
}
107109

108110
/**

evita_engine/src/main/java/io/evitadb/core/Evita.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.evitadb.api.requestResponse.schema.mutation.catalog.RemoveCatalogSchemaMutation;
4949
import io.evitadb.api.task.ServerTask;
5050
import io.evitadb.core.SessionRegistry.SuspendOperation;
51+
import io.evitadb.core.SessionRegistry.SuspensionInformation;
5152
import io.evitadb.core.async.ClientRunnableTask;
5253
import io.evitadb.core.async.EmptySettings;
5354
import io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline;
@@ -326,6 +327,28 @@ public void emitStartObservabilityEvents() {
326327
);
327328
}
328329

330+
/**
331+
* Checks if sessions were forcefully closed for the specified catalog and session ID.
332+
*
333+
* @param catalogName the name of the catalog for which to check if sessions were forcefully closed; must not be null
334+
* @param sessionId the unique identifier of the session to check; must not be null
335+
* @return true if sessions were forcefully closed for the specified catalog and session ID, false otherwise
336+
*/
337+
public boolean wasSessionForcefullyClosedForCatalog(@Nonnull String catalogName, @Nonnull UUID sessionId) {
338+
return ofNullable(this.catalogSessionRegistries.get(catalogName))
339+
.map(it -> it.wereSessionsForcefullyClosedForCatalog(sessionId))
340+
.orElse(false);
341+
}
342+
343+
/**
344+
* Clears all session registries and their temporary information.
345+
*/
346+
public void clearSessionRegistries() {
347+
for (SessionRegistry value : this.catalogSessionRegistries.values()) {
348+
value.clearTemporaryInformation();
349+
}
350+
}
351+
329352
/**
330353
* Emits statistics of the ThreadPool associated with the scheduler.
331354
*/
@@ -741,6 +764,29 @@ ServerTask<EmptySettings, Void> createLoadCatalogTask(@Nonnull String catalogNam
741764
);
742765
}
743766

767+
/**
768+
* Closes all active sessions associated with the specified catalog and suspends further operations.
769+
*
770+
* @param catalogName the name of the catalog whose sessions are to be closed and suspended
771+
* @param suspendOperation the operation to be executed during the suspension of the catalog
772+
*/
773+
@Nonnull
774+
public Optional<SuspensionInformation> closeAllSessionsAndSuspend(@Nonnull String catalogName, @Nonnull SuspendOperation suspendOperation) {
775+
return ofNullable(this.catalogSessionRegistries.get(catalogName))
776+
.flatMap(it -> it.closeAllActiveSessionsAndSuspend(suspendOperation));
777+
}
778+
779+
/**
780+
* Discards the suspension state of the session registry associated with the given catalog name, if present.
781+
* The method resumes operations for the session registry if it exists for the provided catalog name.
782+
*
783+
* @param catalogName The name of the catalog whose suspension state should be discarded. Must not be null.
784+
*/
785+
public void discardSuspension(@Nonnull String catalogName) {
786+
ofNullable(this.catalogSessionRegistries.get(catalogName))
787+
.ifPresent(SessionRegistry::resumeOperations);
788+
}
789+
744790
/*
745791
PRIVATE METHODS
746792
*/
@@ -962,6 +1008,9 @@ private void replaceCatalogReference(@Nonnull Catalog catalog) {
9621008
}
9631009
);
9641010

1011+
// discard suspension of the session registry for the catalog, if present
1012+
discardSuspension(catalogName);
1013+
9651014
// notify structural changes callbacks
9661015
ofNullable(originalCatalog.get())
9671016
.ifPresent(it -> notifyStructuralChangeObservers(catalog, it));

evita_engine/src/main/java/io/evitadb/core/EvitaSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import io.evitadb.api.task.ServerTask;
7474
import io.evitadb.api.task.Task;
7575
import io.evitadb.api.task.TaskStatus;
76+
import io.evitadb.core.SessionRegistry.SuspendOperation;
7677
import io.evitadb.core.async.Interruptible;
7778
import io.evitadb.core.async.Scheduler;
7879
import io.evitadb.core.cdc.predicate.MutationPredicateFactory;
@@ -410,6 +411,8 @@ public GoLiveProgress goLiveAndCloseWithProgress(@Nullable IntConsumer progressO
410411
isTrue(!theCatalog.supportsTransaction(), "Catalog went live already and is currently in transactional mode!");
411412
executeTerminationSteps(null, theCatalog);
412413
this.closedFuture = CompletableFuture.completedFuture(new CommitVersions(this.catalog.getVersion() + 1, this.catalog.getSchema().version()));
414+
this.evita.closeAllSessionsAndSuspend(this.catalog.getName(), SuspendOperation.REJECT)
415+
.ifPresent(it -> it.addForcefullyClosedSession(this.id));
413416
return theCatalog.goLive(progressObserver);
414417
}
415418

0 commit comments

Comments
 (0)