Skip to content

Commit bb6a092

Browse files
committed
Implemented RetryPolicy for Effects
1 parent 084a426 commit bb6a092

File tree

22 files changed

+466
-42
lines changed

22 files changed

+466
-42
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,12 @@ public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInF
8181
[TestMethod]
8282
public override Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects()
8383
=> UpsertingExistingEffectDoesNotAffectOtherExistingEffects(FunctionStoreFactory.Create());
84+
85+
[TestMethod]
86+
public override Task CaptureEffectWithRetryPolicy()
87+
=> CaptureEffectWithRetryPolicy(FunctionStoreFactory.Create());
88+
89+
[TestMethod]
90+
public override Task CaptureEffectWithRetryPolicyWithResult()
91+
=> CaptureEffectWithRetryPolicyWithResult(FunctionStoreFactory.Create());
8492
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/CustomMessageSerializerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ await functionStore.CreateFunction(
3333
var messagesWriter = new MessageWriter(storedId, functionStore, eventSerializer, scheduleReInvocation: (_, _) => Task.CompletedTask);
3434
var lazyExistingEffects = new Lazy<Task<IReadOnlyList<StoredEffect>>>(() => Task.FromResult((IReadOnlyList<StoredEffect>) new List<StoredEffect>()));
3535
var effectResults = new EffectResults(flowId, storedId, lazyExistingEffects, functionStore.EffectsStore, DefaultSerializer.Instance);
36-
var effect = new Effect(effectResults);
36+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
3737
var registeredTimeouts = new RegisteredTimeouts(storedId, functionStore.TimeoutStore, effect, () => DateTime.UtcNow);
3838
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
3939
storedId,

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ private Effect CreateEffect(StoredId storedId, FlowId flowId, IFunctionStore fun
494494
{
495495
var lazyExistingEffects = new Lazy<Task<IReadOnlyList<StoredEffect>>>(() => Task.FromResult((IReadOnlyList<StoredEffect>) new List<StoredEffect>()));
496496
var effectResults = new EffectResults(flowId, storedId, lazyExistingEffects, functionStore.EffectsStore, DefaultSerializer.Instance);
497-
var effect = new Effect(effectResults);
497+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
498498
return effect;
499499
}
500500

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
55
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
66
using Cleipnir.ResilientFunctions.Domain;
7+
using Cleipnir.ResilientFunctions.Domain.Events;
78
using Cleipnir.ResilientFunctions.Domain.Exceptions;
89
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
910
using Cleipnir.ResilientFunctions.Helpers;
@@ -1710,22 +1711,33 @@ protected async Task ClearFailedEffectsRemovesFailedEffectBeforeRestart(Task<IFu
17101711
var (flowType, flowInstance) = functionId;
17111712
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch));
17121713

1714+
var retryPolicy = RetryPolicy.CreateConstantDelay(
1715+
interval: TimeSpan.FromMilliseconds(10),
1716+
maximumAttempts: 1,
1717+
suspendThreshold: TimeSpan.FromMinutes(5)
1718+
);
17131719
var shouldFail = true;
17141720
var registration = functionsRegistry.RegisterParamless(
17151721
flowType,
17161722
inner: async workflow =>
17171723
{
1718-
await workflow.Effect.Capture(() =>
1724+
await workflow.Effect.Capture("AlwaysFail", () =>
17191725
{
17201726
if (shouldFail)
17211727
throw new TimeoutException("Timeout!");
1722-
});
1728+
}, retryPolicy);
17231729
}
17241730
);
17251731

17261732
try
17271733
{
1728-
await registration.Invoke(flowInstance);
1734+
var timeoutEvent = new TimeoutEvent(EffectId.CreateWithRootContext("SomeTimeout", EffectType.Timeout), DateTime.UtcNow)
1735+
.ToMessageAndIdempotencyKey();
1736+
1737+
await registration.Invoke(
1738+
flowInstance,
1739+
InitialState.CreateWithMessagesOnly([timeoutEvent])
1740+
);
17291741
}
17301742
catch (FatalWorkflowException exception)
17311743
{
@@ -1734,11 +1746,25 @@ await workflow.Effect.Capture(() =>
17341746

17351747
var controlPanel = await registration.ControlPanel(flowInstance.Value);
17361748
controlPanel.ShouldNotBeNull();
1749+
1750+
try
1751+
{
1752+
await controlPanel.Restart();
1753+
}
1754+
catch (FatalWorkflowException exception)
1755+
{
1756+
exception.ErrorType.ShouldBe(typeof(TimeoutException));
1757+
}
17371758

1738-
await controlPanel.BusyWaitUntil(c => c.Status == Status.Failed);
1739-
1759+
await controlPanel.Effects.AllIds.SelectAsync(ids => ids.Any()).ShouldBeTrueAsync();
1760+
await controlPanel.Messages.AsObjects.SelectAsync(ids => ids.Any()).ShouldBeTrueAsync();
1761+
1762+
await controlPanel.ClearFailures();
1763+
await controlPanel.Effects.AllIds.SelectAsync(ids => ids.Any()).ShouldBeFalseAsync();
1764+
await controlPanel.Messages.AsObjects.SelectAsync(msgs => msgs.Any(msg => msg is not NoOp)).ShouldBeFalseAsync();
1765+
17401766
shouldFail = false;
1741-
await controlPanel.Restart(clearFailedEffects: true);
1767+
await controlPanel.Restart();
17421768

17431769
unhandledExceptionCatcher.ShouldNotHaveExceptions();
17441770
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
66
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
77
using Cleipnir.ResilientFunctions.Domain;
8+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
89
using Cleipnir.ResilientFunctions.Helpers;
910
using Cleipnir.ResilientFunctions.Reactive.Utilities;
1011
using Cleipnir.ResilientFunctions.Storage;
@@ -335,7 +336,7 @@ public async Task EffectsCrudTest(Task<IFunctionStore> storeTask)
335336
store.EffectsStore,
336337
DefaultSerializer.Instance
337338
);
338-
var effect = new Effect(effectResults);
339+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
339340

340341
var option = await effect.TryGet<int>("Id1");
341342
option.HasValue.ShouldBeFalse();
@@ -381,7 +382,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
381382
store.EffectsStore,
382383
DefaultSerializer.Instance
383384
);
384-
var effect = new Effect(effectResults);
385+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
385386

386387
syncedCounter.Current.ShouldBe(0);
387388

@@ -650,7 +651,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
650651
effectStore,
651652
DefaultSerializer.Instance
652653
);
653-
var effect = new Effect(effectResults);
654+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
654655

655656
var result = await effect.Capture("1", () => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
656657
result.ShouldBe("hello world");
@@ -723,7 +724,7 @@ public async Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects(Task<
723724
effectStore,
724725
DefaultSerializer.Instance
725726
);
726-
var effect = new Effect(effectResults);
727+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
727728

728729
await effect.Capture("1", () => "hello world");
729730
await effect.Capture("2", () => "hello universe");
@@ -736,4 +737,95 @@ public async Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects(Task<
736737
storedEffects.Single(se => se.EffectId.Id == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo<string>().ShouldBe("hello world again");
737738
storedEffects.Single(se => se.EffectId.Id == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo<string>().ShouldBe("hello universe");
738739
}
740+
741+
public abstract Task CaptureEffectWithRetryPolicy();
742+
public async Task CaptureEffectWithRetryPolicy(Task<IFunctionStore> storeTask)
743+
{
744+
var utcNow = DateTime.UtcNow;
745+
746+
var store = await storeTask;
747+
var flowId = TestFlowId.Create();
748+
using var registry = new FunctionsRegistry(store, new Settings(utcNow: () => utcNow, enableWatchdogs: false));
749+
var syncedCounter = new SyncedCounter();
750+
751+
var retryPolicy = RetryPolicy.Create(suspendThreshold: TimeSpan.Zero, initialInterval: TimeSpan.FromSeconds(1), backoffCoefficient: 1);
752+
var registration = registry.RegisterParamless(
753+
flowType: flowId.Type,
754+
async workflow =>
755+
{
756+
var effect = workflow.Effect;
757+
await effect.Capture(() =>
758+
{
759+
if (syncedCounter.Current <= 1)
760+
{
761+
syncedCounter.Increment();
762+
throw new TimeoutException();
763+
}
764+
765+
syncedCounter.Increment();
766+
return Task.CompletedTask;
767+
}, retryPolicy);
768+
}
769+
);
770+
771+
await Should.ThrowAsync<InvocationPostponedException>(() => registration.Invoke(flowId.Instance));
772+
utcNow += TimeSpan.FromSeconds(2);
773+
774+
var cp = await registration.ControlPanel(flowId.Instance).ShouldNotBeNullAsync();
775+
cp.Status.ShouldBe(Status.Postponed);
776+
777+
await Should.ThrowAsync<InvocationPostponedException>(() => cp.Restart());
778+
779+
utcNow += TimeSpan.FromSeconds(10);
780+
781+
await cp.Restart();
782+
783+
syncedCounter.Current.ShouldBe(3);
784+
}
785+
786+
public abstract Task CaptureEffectWithRetryPolicyWithResult();
787+
public async Task CaptureEffectWithRetryPolicyWithResult(Task<IFunctionStore> storeTask)
788+
{
789+
var utcNow = DateTime.UtcNow;
790+
791+
var store = await storeTask;
792+
var flowId = TestFlowId.Create();
793+
using var registry = new FunctionsRegistry(store, new Settings(utcNow: () => utcNow, enableWatchdogs: false));
794+
var syncedCounter = new SyncedCounter();
795+
796+
var retryPolicy = RetryPolicy.Create(suspendThreshold: TimeSpan.Zero, initialInterval: TimeSpan.FromSeconds(1), backoffCoefficient: 1);
797+
var registration = registry.RegisterFunc<string, string>(
798+
flowType: flowId.Type,
799+
async (param, workflow) =>
800+
{
801+
var effect = workflow.Effect;
802+
return await effect.Capture(() =>
803+
{
804+
if (syncedCounter.Current <= 1)
805+
{
806+
syncedCounter.Increment();
807+
throw new TimeoutException();
808+
}
809+
810+
syncedCounter.Increment();
811+
return Task.FromResult(param);
812+
}, retryPolicy);
813+
}
814+
);
815+
816+
await Should.ThrowAsync<InvocationPostponedException>(() => registration.Invoke(flowId.Instance, "Hello World!"));
817+
utcNow += TimeSpan.FromSeconds(2);
818+
819+
var cp = await registration.ControlPanel(flowId.Instance).ShouldNotBeNullAsync();
820+
cp.Status.ShouldBe(Status.Postponed);
821+
822+
await Should.ThrowAsync<InvocationPostponedException>(() => cp.Restart());
823+
824+
utcNow += TimeSpan.FromSeconds(10);
825+
826+
var result = await cp.Restart();
827+
result.ShouldBe("Hello World!");
828+
829+
syncedCounter.Current.ShouldBe(3);
830+
}
739831
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/TimeoutStoreTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ private Effect CreateEffect(FlowId flowId, StoredId storedId, IFunctionStore fun
223223
{
224224
var lazyExistingEffects = new Lazy<Task<IReadOnlyList<StoredEffect>>>(() => Task.FromResult((IReadOnlyList<StoredEffect>) new List<StoredEffect>()));
225225
var effectResults = new EffectResults(flowId, storedId, lazyExistingEffects, functionStore.EffectsStore, DefaultSerializer.Instance);
226-
var effect = new Effect(effectResults);
226+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow);
227227
return effect;
228228
}
229229

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ public Tuple<Effect, States> CreateEffectAndStates(StoredId storedId, FlowId flo
451451
Serializer
452452
);
453453

454-
var effect = new Effect(effectResults);
454+
var effect = new Effect(effectResults, UtcNow);
455455
return Tuple.Create(effect, states);
456456
}
457457

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public async Task Delay(DateTime until, bool suspend = true, string? effectId =
5353
{
5454
effectId ??= $"Delay#{Effect.TakeNextImplicitId()}";
5555
var systemEffectId = EffectId.CreateWithCurrentContext(effectId, EffectType.System);
56-
until = await Effect.CreateOrGet(systemEffectId, until);
56+
until = await Effect.CreateOrGet(systemEffectId, until, flush: false);
5757
var delay = (until - _utcNow()).RoundUpToZero();
5858

5959
if (delay == TimeSpan.Zero)

Core/Cleipnir.ResilientFunctions/CoreRuntime/RegisteredTimeouts.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ await timeoutStore.UpsertTimeout(
3737
overwrite: true
3838
);
3939

40-
await effect.Upsert(timeoutId, TimeoutStatus.Registered);
40+
await effect.Upsert(timeoutId, TimeoutStatus.Registered, flush: false);
4141
}
4242

4343
public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn)
@@ -60,7 +60,7 @@ public async Task CancelTimeout(EffectId timeoutId)
6060
return;
6161

6262
await timeoutStore.RemoveTimeout(storedId, timeoutId);
63-
await effect.Upsert(timeoutId, TimeoutStatus.Cancelled);
63+
await effect.Upsert(timeoutId, TimeoutStatus.Cancelled, flush: false);
6464
}
6565

6666
public async Task<IReadOnlyList<RegisteredTimeout>> PendingTimeouts()

Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -307,25 +307,59 @@ public async Task SaveChanges()
307307

308308
public Task Delete() => _invocationHelper.Delete(StoredId);
309309

310-
public async Task<TReturn> Restart(bool clearFailedEffects = false)
310+
/// <summary>
311+
/// Clear existing failed effects, retry information and timeouts.
312+
/// </summary>
313+
public async Task ClearFailures()
311314
{
312-
if (clearFailedEffects)
313-
await Effects.RemoveFailed();
315+
await Effects.RemoveFailed();
316+
await Messages.RemoveTimeouts();
317+
318+
await Refresh();
319+
}
320+
321+
/// <summary>
322+
/// Restart invocation inlined immediately
323+
/// </summary>
324+
/// <param name="clearFailures">Clear existing failed effects, retry information and timeouts</param>
325+
/// <param name="refresh">Refresh control panel after invocation</param>
326+
/// <returns>Result of invocation</returns>
327+
public async Task<TReturn> Restart(bool clearFailures = false, bool refresh = true)
328+
{
329+
if (clearFailures)
330+
await ClearFailures();
314331

315332
if (_innerParamChanged)
316333
await SaveChanges();
317334

318-
return await _invoker.Restart(StoredId.Instance, Epoch);
335+
try
336+
{
337+
return await _invoker.Restart(StoredId.Instance, Epoch);
338+
}
339+
finally
340+
{
341+
if (refresh)
342+
await Refresh();
343+
}
319344
}
320-
public async Task ScheduleRestart(bool clearFailedEffects = false)
345+
346+
/// <summary>
347+
/// Schedule invocation immediately
348+
/// </summary>
349+
/// <param name="clearFailures">Clear existing failed effects, retry information and timeouts</param>
350+
/// <param name="refresh">Refresh control panel after invocation</param>
351+
public async Task ScheduleRestart(bool clearFailures = false, bool refresh = true)
321352
{
322-
if (clearFailedEffects)
323-
await Effects.RemoveFailed();
353+
if (clearFailures)
354+
await ClearFailures();
324355

325356
if (_innerParamChanged)
326357
await SaveChanges();
327358

328359
await _invoker.ScheduleRestart(StoredId.Instance, Epoch);
360+
361+
if (refresh)
362+
await Refresh();
329363
}
330364

331365
public async Task Refresh()

0 commit comments

Comments
 (0)