Skip to content

Commit 1ddc872

Browse files
committed
Added ability to clear failed effects before restarting a flow
1 parent 21a85f4 commit 1ddc872

File tree

7 files changed

+74
-2
lines changed

7 files changed

+74
-2
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,8 @@ public override Task CorrelationsCanBeChanged()
181181
[TestMethod]
182182
public override Task DeleteRemovesFunctionFromAllStores()
183183
=> DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create());
184+
185+
[TestMethod]
186+
public override Task ClearFailedEffectsRemovesFailedEffectBeforeRestart()
187+
=> ClearFailedEffectsRemovesFailedEffectBeforeRestart(FunctionStoreFactory.Create());
184188
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,4 +1699,47 @@ await store.EffectsStore
16991699

17001700
unhandledExceptionCatcher.ShouldNotHaveExceptions();
17011701
}
1702+
1703+
public abstract Task ClearFailedEffectsRemovesFailedEffectBeforeRestart();
1704+
protected async Task ClearFailedEffectsRemovesFailedEffectBeforeRestart(Task<IFunctionStore> storeTask)
1705+
{
1706+
var unhandledExceptionCatcher = new UnhandledExceptionCatcher();
1707+
1708+
var store = await storeTask;
1709+
var functionId = TestFlowId.Create();
1710+
var (flowType, flowInstance) = functionId;
1711+
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch));
1712+
1713+
var shouldFail = true;
1714+
var registration = functionsRegistry.RegisterParamless(
1715+
flowType,
1716+
inner: async workflow =>
1717+
{
1718+
await workflow.Effect.Capture(() =>
1719+
{
1720+
if (shouldFail)
1721+
throw new TimeoutException("Timeout!");
1722+
});
1723+
}
1724+
);
1725+
1726+
try
1727+
{
1728+
await registration.Invoke(flowInstance);
1729+
}
1730+
catch (FatalWorkflowException exception)
1731+
{
1732+
exception.ErrorType.ShouldBe(typeof(TimeoutException));
1733+
}
1734+
1735+
var controlPanel = await registration.ControlPanel(flowInstance.Value);
1736+
controlPanel.ShouldNotBeNull();
1737+
1738+
await controlPanel.BusyWaitUntil(c => c.Status == Status.Failed);
1739+
1740+
shouldFail = false;
1741+
await controlPanel.Restart(clearFailedEffects: true);
1742+
1743+
unhandledExceptionCatcher.ShouldNotHaveExceptions();
1744+
}
17021745
}

Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,15 +307,21 @@ public async Task SaveChanges()
307307

308308
public Task Delete() => _invocationHelper.Delete(StoredId);
309309

310-
public async Task<TReturn> Restart()
310+
public async Task<TReturn> Restart(bool clearFailedEffects = false)
311311
{
312+
if (clearFailedEffects)
313+
await Effects.RemoveFailed();
314+
312315
if (_innerParamChanged)
313316
await SaveChanges();
314317

315318
return await _invoker.Restart(StoredId.Instance, Epoch);
316319
}
317-
public async Task ScheduleRestart()
320+
public async Task ScheduleRestart(bool clearFailedEffects = false)
318321
{
322+
if (clearFailedEffects)
323+
await Effects.RemoveFailed();
324+
319325
if (_innerParamChanged)
320326
await SaveChanges();
321327

Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ public async Task<WorkStatus> GetStatus(EffectId effectId)
5353
return storedEffects[effectId].WorkStatus;
5454
}
5555

56+
public async Task RemoveFailed()
57+
{
58+
foreach (var effectId in await AllIds)
59+
if (await GetStatus(effectId) == WorkStatus.Failed)
60+
await Remove(effectId);
61+
}
62+
5663
public Task Remove(string effectId) => Remove(effectId.ToEffectId());
5764
public async Task Remove(EffectId effectId)
5865
{

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,8 @@ public override Task CorrelationsCanBeChanged()
180180
[TestMethod]
181181
public override Task DeleteRemovesFunctionFromAllStores()
182182
=> DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create());
183+
184+
[TestMethod]
185+
public override Task ClearFailedEffectsRemovesFailedEffectBeforeRestart()
186+
=> ClearFailedEffectsRemovesFailedEffectBeforeRestart(FunctionStoreFactory.Create());
183187
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,8 @@ public override Task CorrelationsCanBeChanged()
181181
[TestMethod]
182182
public override Task DeleteRemovesFunctionFromAllStores()
183183
=> DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create());
184+
185+
[TestMethod]
186+
public override Task ClearFailedEffectsRemovesFailedEffectBeforeRestart()
187+
=> ClearFailedEffectsRemovesFailedEffectBeforeRestart(FunctionStoreFactory.Create());
184188
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,8 @@ public override Task CorrelationsCanBeChanged()
182182
[TestMethod]
183183
public override Task DeleteRemovesFunctionFromAllStores()
184184
=> DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create());
185+
186+
[TestMethod]
187+
public override Task ClearFailedEffectsRemovesFailedEffectBeforeRestart()
188+
=> ClearFailedEffectsRemovesFailedEffectBeforeRestart(FunctionStoreFactory.Create());
185189
}

0 commit comments

Comments
 (0)