Skip to content

Commit 16c7a2e

Browse files
committed
Added owner parameter to IFunctionStore's RestartExecution-method
1 parent f223f06 commit 16c7a2e

File tree

16 files changed

+61
-36
lines changed

16 files changed

+61
-36
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public Task<bool> CreateFunction(
4444
public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, StoredId? parent)
4545
=> _inner.BulkScheduleFunctions(functionsWithParam, parent);
4646

47-
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
48-
=> _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
47+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration, ReplicaId replicaId)
48+
=> _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration, replicaId);
4949

5050
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)
5151
=> _leaseUpdaterCallback(leaseUpdates, leaseExpiration).ToTask();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/LeasesUpdaterTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Linq;
44
using System.Threading.Tasks;
55
using Cleipnir.ResilientFunctions.CoreRuntime;
6+
using Cleipnir.ResilientFunctions.Domain;
67
using Cleipnir.ResilientFunctions.Storage;
78
using Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
89
using Cleipnir.ResilientFunctions.Tests.Utils;
@@ -77,7 +78,7 @@ public async Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlo
7778

7879
var id3 = TestStoredId.Create();
7980
await store.CreateFunction(id3, id3.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null, owner: null).ShouldBeTrueAsync();
80-
await store.RestartExecution(id3, expectedEpoch: 0, leaseExpiration: 0).ShouldNotBeNullAsync();
81+
await store.RestartExecution(id3, expectedEpoch: 0, leaseExpiration: 0, owner: ReplicaId.NewId()).ShouldNotBeNullAsync();
8182

8283
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
8384
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: id2Expires);
@@ -291,7 +292,7 @@ public async Task LeaseUpdatersRemovesFunctionWithLowerEpoch(Task<IFunctionStore
291292

292293
await store.CreateFunction(id1, "SomeInstanceId", param: null, leaseExpiration: tenSeconds.Ticks, postponeUntil: null, timestamp: 0, parent: null, owner: null);
293294
await store.CreateFunction(id2, "SomeInstanceId", param: null, leaseExpiration: seventySeconds.Ticks, postponeUntil: null, timestamp: 0, parent: null, owner: null);
294-
await store.RestartExecution(id2, expectedEpoch: 0, leaseExpiration: thousandSeconds.Ticks);
295+
await store.RestartExecution(id2, expectedEpoch: 0, leaseExpiration: thousandSeconds.Ticks, owner: ReplicaId.NewId());
295296

296297
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
297298
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ await store.CreateFunction(
302302
await store.RestartExecution(
303303
StoredId,
304304
expectedEpoch: 0,
305-
leaseExpiration: DateTime.UtcNow.Ticks
305+
leaseExpiration: DateTime.UtcNow.Ticks,
306+
owner: ReplicaId.NewId()
306307
).ShouldNotBeNullAsync();
307308

308309
var updatedStoredParameter = "hello world".ToJson();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ protected async Task BecomeLeaderSucceedsWhenEpochIsAsExpected(Task<IFunctionSto
163163

164164
var store = await storeTask;
165165
var paramJson = PARAM.ToJson();
166+
var owner = ReplicaId.NewId();
166167

167168
await store.CreateFunction(
168169
functionId,
@@ -180,13 +181,15 @@ await store
180181
.RestartExecution(
181182
functionId,
182183
expectedEpoch: 0,
183-
leaseExpiration
184+
leaseExpiration,
185+
owner
184186
).ShouldNotBeNullAsync();
185187

186188
var storedFunction = await store.GetFunction(functionId);
187189
storedFunction.ShouldNotBeNull();
188190
storedFunction.Epoch.ShouldBe(1);
189191
storedFunction.Expires.ShouldBe(leaseExpiration);
192+
storedFunction.OwnerId.ShouldBe(owner);
190193
}
191194

192195
public abstract Task BecomeLeaderFailsWhenEpochIsNotAsExpected();
@@ -213,13 +216,15 @@ await store
213216
.RestartExecution(
214217
functionId,
215218
expectedEpoch: 1,
216-
leaseExpiration: DateTime.UtcNow.Ticks
219+
leaseExpiration: DateTime.UtcNow.Ticks,
220+
owner: ReplicaId.NewId()
217221
).ShouldBeNullAsync();
218222

219223
var storedFunction = await store.GetFunction(functionId);
220224
storedFunction.ShouldNotBeNull();
221225
storedFunction.Epoch.ShouldBe(0);
222226
storedFunction.Expires.ShouldBe(leaseExpiration);
227+
storedFunction.OwnerId.ShouldBeNull();
223228
}
224229

225230
public abstract Task CreatingTheSameFunctionTwiceReturnsFalse();
@@ -482,7 +487,7 @@ await store.CreateFunction(
482487
owner: null
483488
).ShouldBeTrueAsync();
484489

485-
await store.RestartExecution(functionId, expectedEpoch: 0, DateTime.UtcNow.Ticks).ShouldNotBeNullAsync();
490+
await store.RestartExecution(functionId, expectedEpoch: 0, DateTime.UtcNow.Ticks, owner: ReplicaId.NewId()).ShouldNotBeNullAsync();
486491

487492
var sf = await store.GetFunction(functionId);
488493
sf.ShouldNotBeNull();
@@ -506,7 +511,7 @@ await store.CreateFunction(
506511
owner: null
507512
).ShouldBeTrueAsync();
508513

509-
await store.RestartExecution(functionId, expectedEpoch: 1, leaseExpiration: DateTime.UtcNow.Ticks).ShouldBeNullAsync();
514+
await store.RestartExecution(functionId, expectedEpoch: 1, leaseExpiration: DateTime.UtcNow.Ticks, owner: ReplicaId.NewId()).ShouldBeNullAsync();
510515

511516
var sf = await store.GetFunction(functionId);
512517
sf.ShouldNotBeNull();
@@ -763,12 +768,14 @@ await store.CreateFunction(
763768
await store.RestartExecution(
764769
functionId,
765770
expectedEpoch: 0,
766-
leaseExpiration: DateTime.UtcNow.Ticks
771+
leaseExpiration: DateTime.UtcNow.Ticks,
772+
owner: ReplicaId.NewId()
767773
).ShouldNotBeNullAsync();
768774
await store.RestartExecution(
769775
functionId,
770776
expectedEpoch: 0,
771-
leaseExpiration: DateTime.UtcNow.Ticks
777+
leaseExpiration: DateTime.UtcNow.Ticks,
778+
owner: ReplicaId.NewId()
772779
).ShouldBeNullAsync();
773780
}
774781

@@ -796,7 +803,8 @@ await store.CreateFunction(
796803
await store.RestartExecution(
797804
functionId,
798805
expectedEpoch: 0,
799-
leaseExpiration: DateTime.UtcNow.Ticks
806+
leaseExpiration: DateTime.UtcNow.Ticks,
807+
owner: ReplicaId.NewId()
800808
).ShouldNotBeNullAsync();
801809

802810
await store.Interrupted(functionId).ShouldBeAsync(false);
@@ -1462,7 +1470,7 @@ await store.CreateFunction(
14621470
parent: null,
14631471
owner: null
14641472
).ShouldBeTrueAsync();
1465-
await store.RestartExecution(id2, expectedEpoch: 0, leaseExpiration: 0);
1473+
await store.RestartExecution(id2, expectedEpoch: 0, leaseExpiration: 0, owner: ReplicaId.NewId());
14661474
await store.CreateFunction(
14671475
id3,
14681476
humanInstanceId: "SomeInstanceId3",
@@ -1858,7 +1866,8 @@ await store.EffectsStore.SetEffectResult(
18581866
.RestartExecution(
18591867
functionId,
18601868
expectedEpoch: 0,
1861-
leaseExpiration
1869+
leaseExpiration,
1870+
owner: ReplicaId.NewId()
18621871
).ShouldNotBeNullAsync();
18631872

18641873
sf.StoredId.ShouldBe(functionId);
@@ -1876,6 +1885,7 @@ protected async Task RestartExecutionWorksWithEmptyEffectsAndMessages(Task<IFunc
18761885

18771886
var store = await storeTask;
18781887
var paramJson = PARAM.ToJson();
1888+
var owner = ReplicaId.NewId();
18791889

18801890
await store.CreateFunction(
18811891
functionId,
@@ -1893,12 +1903,14 @@ await store.CreateFunction(
18931903
.RestartExecution(
18941904
functionId,
18951905
expectedEpoch: 0,
1896-
leaseExpiration
1906+
leaseExpiration,
1907+
owner
18971908
).ShouldNotBeNullAsync();
18981909

18991910
sf.StoredId.ShouldBe(functionId);
19001911
effects.Count.ShouldBe(0);
19011912
messages.Count.ShouldBe(0);
1913+
sf.OwnerId.ShouldBe(owner);
19021914
}
19031915

19041916
public abstract Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
7676
? Task.FromException(new TimeoutException())
7777
: _inner.BulkScheduleFunctions(functionsWithParam, parent);
7878

79-
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
79+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration, ReplicaId replicaId)
8080
=> _crashed
8181
? Task.FromException<StoredFlowWithEffectsAndMessages?>(new TimeoutException())
82-
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
82+
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration, replicaId);
8383

8484
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)
8585
=> _crashed

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ internal class InvocationHelper<TParam, TReturn>
2020
private readonly bool _isParamlessFunction;
2121
private readonly FlowType _flowType;
2222
private readonly StoredType _storedType;
23+
private readonly ReplicaId _replicaId;
2324
private readonly LeasesUpdater _leasesUpdater;
2425
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
2526
public UtcNow UtcNow { get; }
2627

2728
private ISerializer Serializer { get; }
2829

29-
public InvocationHelper(FlowType flowType, StoredType storedType, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, LeasesUpdater leasesUpdater, ISerializer serializer, UtcNow utcNow)
30+
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, LeasesUpdater leasesUpdater, ISerializer serializer, UtcNow utcNow)
3031
{
3132
_flowType = flowType;
3233
_isParamlessFunction = isParamlessFunction;
@@ -37,6 +38,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, bool isParamle
3738
_shutdownCoordinator = shutdownCoordinator;
3839
_leasesUpdater = leasesUpdater;
3940
_storedType = storedType;
41+
_replicaId = replicaId;
4042
_functionStore = functionStore;
4143
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
4244
}
@@ -225,7 +227,8 @@ public async Task PublishCompletionMessageToParent(StoredId? parent, FlowId chil
225227
var restarted = await _functionStore.RestartExecution(
226228
flowId,
227229
expectedEpoch,
228-
leaseExpiration: UtcNow().Ticks + _settings.LeaseLength.Ticks
230+
leaseExpiration: UtcNow().Ticks + _settings.LeaseLength.Ticks,
231+
_replicaId
229232
);
230233

231234
return restarted != null

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
216216
var invocationHelper = new InvocationHelper<TParam, TReturn>(
217217
flowType,
218218
storedType,
219+
ClusterInfo.ReplicaId,
219220
isParamlessFunction: false,
220221
settingsWithDefaults,
221222
_functionStore,
@@ -313,6 +314,7 @@ private ParamlessRegistration RegisterParamless(
313314
var invocationHelper = new InvocationHelper<Unit, Unit>(
314315
flowType,
315316
storedType,
317+
ClusterInfo.ReplicaId,
316318
isParamlessFunction: true,
317319
settingsWithDefaults,
318320
_functionStore,
@@ -410,6 +412,7 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
410412
var invocationHelper = new InvocationHelper<TParam, Unit>(
411413
flowType,
412414
storedType,
415+
ClusterInfo.ReplicaId,
413416
isParamlessFunction: false,
414417
settingsWithDefaults,
415418
_functionStore,

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Task BulkScheduleFunctions(
3737
StoredId? parent
3838
);
3939

40-
Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration);
40+
Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration, ReplicaId owner);
4141

4242
Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration);
4343

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
117117
return Task.CompletedTask;
118118
}
119119

120-
public virtual async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
120+
public virtual async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration, ReplicaId owner)
121121
{
122122
lock (_sync)
123123
{
@@ -132,6 +132,7 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
132132
state.Status = Status.Executing;
133133
state.Expires = leaseExpiration;
134134
state.Interrupted = false;
135+
state.Owner = owner;
135136
}
136137
var sf = await GetFunction(storedId);
137138
var effects = await EffectsStore.GetEffectResults(storedId);

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
5858
? Task.FromException(new TimeoutException())
5959
: _inner.BulkScheduleFunctions(functionsWithParam, parent);
6060

61-
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
61+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration, ReplicaId replicaId)
6262
=> _crashed
6363
? Task.FromException<StoredFlowWithEffectsAndMessages?>(new TimeoutException())
64-
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
64+
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration, replicaId);
6565

6666
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)
6767
=> _crashed

0 commit comments

Comments
 (0)