Skip to content

Commit d4a2ad7

Browse files
committed
Flush pending effects before persisting new function status
1 parent 8d5fa0e commit d4a2ad7

File tree

8 files changed

+68
-7
lines changed

8 files changed

+68
-7
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,8 @@ public override Task DelayedFlushIsReflectedInUnderlyingStoreForSet()
7373
[TestMethod]
7474
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7575
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(FunctionStoreFactory.Create());
76+
77+
[TestMethod]
78+
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
79+
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
7680
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,4 +665,46 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
665665
storedEffects.Single(se => se.EffectId.Id == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo<string>().ShouldBe("hello universe");
666666
}
667667

668+
public abstract Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow();
669+
public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(Task<IFunctionStore> storeTask)
670+
{
671+
var store = await storeTask;
672+
var (type, instance) = TestFlowId.Create();
673+
var storedId = TestStoredId.Create();
674+
var someEffectIdValue = Guid.NewGuid();
675+
676+
var unhandledExceptionCatcher = new UnhandledExceptionCatcher();
677+
var registry = new FunctionsRegistry(store, settings: new Settings(unhandledExceptionHandler: unhandledExceptionCatcher.Catch));
678+
679+
var writtenEffectFlag = new SyncedFlag();
680+
var continueFlag = new SyncedFlag();
681+
682+
var flow = registry.RegisterParamless(
683+
type,
684+
async Task (workflow) =>
685+
{
686+
await workflow.Effect.Capture("SomeEffectId", () => someEffectIdValue, ResiliencyLevel.AtLeastOnceDelayFlush);
687+
writtenEffectFlag.Raise();
688+
await continueFlag.WaitForRaised();
689+
}
690+
);
691+
692+
await flow.Schedule(instance);
693+
694+
await writtenEffectFlag.WaitForRaised();
695+
var cp = await flow.ControlPanel(instance).ShouldNotBeNullAsync();
696+
var effectIds = await cp.Effects.AllIds;
697+
effectIds.Any().ShouldBeFalse();
698+
699+
continueFlag.Raise();
700+
701+
await cp.WaitForCompletion();
702+
703+
await cp.Refresh();
704+
effectIds = (await cp.Effects.AllIds).ToList();
705+
effectIds.Count().ShouldBe(1);
706+
effectIds.Single().Id.ShouldBe("SomeEffectId");
707+
(await cp.Effects.GetValue<Guid>("SomeEffectId")).ShouldBe(someEffectIdValue);
708+
}
709+
668710
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public async Task<TReturn> Invoke(FlowInstance instance, TParam param, InitialSt
5555
}
5656
finally{ disposables.Dispose(); }
5757

58-
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent: null);
58+
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent: null, workflow);
5959
return result.SucceedWithValue!;
6060
}
6161

@@ -90,7 +90,7 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
9090
catch (Exception exception) { var fwe = FatalWorkflowException.CreateNonGeneric(flowId, exception); await PersistFailure(storedId, flowId, fwe, param, parent?.StoredId); throw fwe; }
9191
finally{ disposables.Dispose(); }
9292

93-
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent?.StoredId, allowPostponedOrSuspended: true);
93+
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent?.StoredId, workflow, allowPostponedOrSuspended: true);
9494
}
9595
catch (Exception exception) { _unhandledExceptionHandler.Invoke(_flowType, exception); }
9696
});
@@ -141,7 +141,7 @@ public async Task<TReturn> Restart(StoredInstance instanceId, int expectedEpoch)
141141
catch (Exception exception) { var fwe = FatalWorkflowException.CreateNonGeneric(flowId, exception); await PersistFailure(storedId, flowId, fwe, param, parent, epoch); throw fwe; }
142142
finally{ disposables.Dispose(); }
143143

144-
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, epoch);
144+
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, workflow, epoch);
145145
return result.SucceedWithValue!;
146146
}
147147

@@ -167,7 +167,7 @@ public async Task ScheduleRestart(StoredInstance instance, int expectedEpoch)
167167
catch (Exception exception) { var fwe = FatalWorkflowException.CreateNonGeneric(flowId, exception); await PersistFailure(storedId, flowId, fwe, param, parent, epoch); throw fwe; }
168168
finally{ disposables.Dispose(); }
169169

170-
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, epoch, allowPostponedOrSuspended: true);
170+
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, workflow, epoch, allowPostponedOrSuspended: true);
171171
}
172172
catch (Exception exception) { _unhandledExceptionHandler.Invoke(_flowType, exception); }
173173
});
@@ -198,7 +198,7 @@ internal async Task ScheduleRestart(StoredInstance instance, RestartedFunction r
198198
catch (Exception exception) { var fwe = FatalWorkflowException.CreateNonGeneric(flowId, exception); await PersistFailure(storedId, flowId, fwe, param, parent, epoch); throw fwe; }
199199
finally { disposables.Dispose(); }
200200

201-
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, epoch, allowPostponedOrSuspended: true);
201+
await PersistResultAndEnsureSuccess(storedId, flowId, result, param, parent, workflow, epoch, allowPostponedOrSuspended: true);
202202
}
203203
catch (Exception exception) { _unhandledExceptionHandler.Invoke(_flowType, exception); }
204204
finally{ onCompletion(); }
@@ -342,8 +342,9 @@ private async Task PersistFailure(StoredId storedId, FlowId flowId, FatalWorkflo
342342
await _invocationHelper.PersistFailure(storedId, flowId, exception, param, parent, expectedEpoch);
343343
}
344344

345-
private async Task PersistResultAndEnsureSuccess(StoredId storedId, FlowId flowId, Result<TReturn> result, TParam param, StoredId? parent, int expectedEpoch = 0, bool allowPostponedOrSuspended = false)
345+
private async Task PersistResultAndEnsureSuccess(StoredId storedId, FlowId flowId, Result<TReturn> result, TParam param, StoredId? parent, Workflow workflow, int expectedEpoch = 0, bool allowPostponedOrSuspended = false)
346346
{
347+
await workflow.Effect.Flush();
347348
await _invocationHelper.PublishCompletionMessageToParent(parent, childId: flowId, result);
348349

349350
var outcome = await _invocationHelper.PersistResult(storedId, flowId, result, param, parent, expectedEpoch);

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,6 @@ public Task<T[]> WhenAll<T>(params Task<T>[] tasks)
9898

9999
internal EffectId CreateEffectId(string id, EffectType? type = null)
100100
=> id.ToEffectId(type, context: EffectContext.CurrentContext.Parent?.Serialize());
101+
102+
public Task Flush() => effectResults.Flush();
101103
}

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ private async Task FlushOrAddToPending(EffectId effectId, StoredEffectId storedE
361361
}
362362

363363
private readonly SemaphoreSlim _flushSync = new(initialCount: 1, maxCount: 1);
364-
private async Task Flush()
364+
public async Task Flush()
365365
{
366366
await _flushSync.WaitAsync();
367367

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,8 @@ public override Task DelayedFlushIsReflectedInUnderlyingStoreForSet()
7272
[TestMethod]
7373
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7474
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(FunctionStoreFactory.Create());
75+
76+
[TestMethod]
77+
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
78+
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
7579
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,8 @@ public override Task DelayedFlushIsReflectedInUnderlyingStoreForSet()
7373
[TestMethod]
7474
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7575
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(FunctionStoreFactory.Create());
76+
77+
[TestMethod]
78+
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
79+
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
7680
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,8 @@ public override Task DelayedFlushIsReflectedInUnderlyingStoreForSet()
7373
[TestMethod]
7474
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7575
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(FunctionStoreFactory.Create());
76+
77+
[TestMethod]
78+
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
79+
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
7680
}

0 commit comments

Comments
 (0)