Skip to content

Commit af85182

Browse files
committed
Added crashed replica related methods to IFunctionStore
1 parent 2ce74a4 commit af85182

File tree

13 files changed

+278
-5
lines changed

13 files changed

+278
-5
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ public Task<bool> SuspendFunction(
104104
ComplimentaryState complimentaryState
105105
) => _inner.SuspendFunction(storedId, timestamp, expectedEpoch, effects, messages, complimentaryState);
106106

107+
public Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas()
108+
=> _inner.GetOwnerReplicas();
109+
110+
public Task RescheduleCrashedFunctions(ReplicaId replicaId)
111+
=> _inner.RescheduleCrashedFunctions(replicaId);
112+
107113
public Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting)
108114
=> _inner.Interrupt(storedId, onlyIfExecuting);
109115

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,8 @@ public override Task RestartExecutionReturnsEffectsAndMessages()
227227
[TestMethod]
228228
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
229229
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());
230+
231+
[TestMethod]
232+
public override Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation()
233+
=> FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation(FunctionStoreFactory.Create());
230234
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1900,4 +1900,73 @@ await store.CreateFunction(
19001900
effects.Count.ShouldBe(0);
19011901
messages.Count.ShouldBe(0);
19021902
}
1903+
1904+
public abstract Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation();
1905+
protected async Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation(Task<IFunctionStore> storeTask)
1906+
{
1907+
var store = await storeTask;
1908+
var replicaId1 = ReplicaId.NewId();
1909+
var replicaId2 = ReplicaId.NewId();
1910+
var storedId1 = TestStoredId.Create();
1911+
await store.CreateFunction(
1912+
storedId1,
1913+
humanInstanceId: "SomeInstanceId",
1914+
param: null,
1915+
leaseExpiration: DateTime.UtcNow.Ticks,
1916+
postponeUntil: null,
1917+
timestamp: DateTime.UtcNow.Ticks,
1918+
parent: null,
1919+
owner: replicaId1
1920+
).ShouldBeTrueAsync();
1921+
var storedId2 = TestStoredId.Create();
1922+
await store.CreateFunction(
1923+
storedId2,
1924+
humanInstanceId: "SomeInstanceId1",
1925+
param: null,
1926+
leaseExpiration: DateTime.UtcNow.Ticks,
1927+
postponeUntil: null,
1928+
timestamp: DateTime.UtcNow.Ticks,
1929+
parent: null,
1930+
owner: replicaId1
1931+
).ShouldBeTrueAsync();
1932+
var storedId3 = TestStoredId.Create();
1933+
await store.CreateFunction(
1934+
storedId3,
1935+
humanInstanceId: "SomeInstanceId",
1936+
param: null,
1937+
leaseExpiration: DateTime.UtcNow.Ticks,
1938+
postponeUntil: null,
1939+
timestamp: DateTime.UtcNow.Ticks,
1940+
parent: null,
1941+
owner: replicaId2
1942+
).ShouldBeTrueAsync();
1943+
1944+
var replicas = await store.GetOwnerReplicas();
1945+
replicas.Count.ShouldBe(2);
1946+
replicas.Any(r => r == replicaId1).ShouldBeTrue();
1947+
replicas.Any(r => r == replicaId2).ShouldBeTrue();
1948+
1949+
await store.RescheduleCrashedFunctions(replicaId1);
1950+
1951+
var sf1 = await store.GetFunction(storedId1).ShouldNotBeNullAsync();
1952+
sf1.Status.ShouldBe(Status.Postponed);
1953+
sf1.OwnerId.ShouldBeNull();
1954+
sf1.Epoch.ShouldBe(1);
1955+
sf1.Expires.ShouldBe(0);
1956+
1957+
var sf2 = await store.GetFunction(storedId2).ShouldNotBeNullAsync();
1958+
sf2.Status.ShouldBe(Status.Postponed);
1959+
sf2.OwnerId.ShouldBeNull();
1960+
sf2.Epoch.ShouldBe(1);
1961+
sf2.Expires.ShouldBe(0);
1962+
1963+
var sf3 = await store.GetFunction(storedId3).ShouldNotBeNullAsync();
1964+
sf3.Status.ShouldBe(Status.Executing);
1965+
sf3.OwnerId.ShouldBe(replicaId2);
1966+
sf3.Epoch.ShouldBe(0);
1967+
1968+
replicas = await store.GetOwnerReplicas();
1969+
replicas.Count.ShouldBe(1);
1970+
replicas.Any(r => r == replicaId2).ShouldBeTrue();
1971+
}
19031972
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ public Task<bool> SuspendFunction(
165165
? Task.FromException<bool>(new TimeoutException())
166166
: _inner.SuspendFunction(storedId, timestamp, expectedEpoch, effects, messages, complimentaryState);
167167

168+
public Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas()
169+
=> _crashed
170+
? Task.FromException<IReadOnlyList<ReplicaId>>(new TimeoutException())
171+
: _inner.GetOwnerReplicas();
172+
173+
public Task RescheduleCrashedFunctions(ReplicaId replicaId)
174+
=> _crashed
175+
? Task.FromException(new TimeoutException())
176+
: _inner.RescheduleCrashedFunctions(replicaId);
177+
168178
public Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting)
169179
=> _crashed
170180
? Task.FromException<bool>(new TimeoutException())

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ Task<bool> SuspendFunction(
101101
ComplimentaryState complimentaryState
102102
);
103103

104+
Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas();
105+
Task RescheduleCrashedFunctions(ReplicaId replicaId);
106+
104107
Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting);
105108
Task Interrupt(IReadOnlyList<StoredId> storedIds);
106109
Task<bool?> Interrupted(StoredId storedId);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,32 @@ public Task<bool> SuspendFunction(
333333
}
334334
}
335335

336+
public Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas()
337+
{
338+
lock (_sync)
339+
return _states.Values
340+
.Select(s => s.Owner)
341+
.Where(owner => owner != null)
342+
.Distinct()
343+
.ToList()
344+
.CastTo<IReadOnlyList<ReplicaId>>()
345+
.ToTask();
346+
}
347+
348+
public Task RescheduleCrashedFunctions(ReplicaId replicaId)
349+
{
350+
lock (_sync)
351+
foreach (var state in _states.Values.Where(v => v.Owner == replicaId).ToList())
352+
{
353+
state.Owner = null;
354+
state.Status = Status.Postponed;
355+
state.Expires = 0;
356+
state.Epoch += 1;
357+
}
358+
359+
return Task.CompletedTask;
360+
}
361+
336362
public virtual Task<bool> SetParameters(
337363
StoredId storedId,
338364
byte[]? param,

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,20 @@ public Task<bool> SuspendFunction(
136136
IReadOnlyList<StoredEffect>? effects,
137137
IReadOnlyList<StoredMessage>? messages,
138138
ComplimentaryState complimentaryState
139-
) => _crashed ? Task.FromException<bool>(new TimeoutException())
139+
) => _crashed
140+
? Task.FromException<bool>(new TimeoutException())
140141
: _inner.SuspendFunction(storedId, timestamp, expectedEpoch, effects, messages, complimentaryState);
141142

143+
public Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas()
144+
=> _crashed
145+
? Task.FromException<IReadOnlyList<ReplicaId>>(new TimeoutException())
146+
: _inner.GetOwnerReplicas();
147+
148+
public Task RescheduleCrashedFunctions(ReplicaId replicaId)
149+
=> _crashed
150+
? Task.FromException(new TimeoutException())
151+
: _inner.RescheduleCrashedFunctions(replicaId);
152+
142153
public Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting)
143154
=> _crashed
144155
? Task.FromException<bool>(new TimeoutException())

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,7 @@ public override Task RestartExecutionReturnsEffectsAndMessages()
219219
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
220220
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());
221221

222+
[TestMethod]
223+
public override Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation()
224+
=> FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation(FunctionStoreFactory.Create());
222225
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ instance CHAR(32) NOT NULL,
9898
owner CHAR(32) NULL,
9999
PRIMARY KEY (type, instance),
100100
INDEX (expires, type, instance, status)
101-
);";
101+
);
102+
103+
CREATE INDEX FlowOwnersIdx ON {_tablePrefix}(owner, type, instance);";
102104

103105
await using var command = new MySqlCommand(_initializeSql, conn);
104106
await command.ExecuteNonQueryAsync();
@@ -420,6 +422,51 @@ public async Task<bool> SuspendFunction(
420422
return affectedRows == 1;
421423
}
422424

425+
private string? _getReplicasSql;
426+
public async Task<IReadOnlyList<ReplicaId>> GetOwnerReplicas()
427+
{
428+
await using var conn = await CreateOpenConnection(_connectionString);
429+
_getReplicasSql ??= @$"
430+
SELECT DISTINCT(Owner)
431+
FROM {_tablePrefix}
432+
WHERE Status = {(int) Status.Executing} AND Owner IS NOT NULL";
433+
434+
await using var command = new MySqlCommand(_getReplicasSql, conn);
435+
436+
await using var reader = await command.ExecuteReaderAsync();
437+
var replicas = new List<ReplicaId>();
438+
while (await reader.ReadAsync())
439+
replicas.Add(reader.GetString(0).ToGuid().ToReplicaId());
440+
441+
return replicas;
442+
}
443+
444+
private string? _rescheduleFunctionsSql;
445+
public async Task RescheduleCrashedFunctions(ReplicaId replicaId)
446+
{
447+
await using var conn = await CreateOpenConnection(_connectionString);
448+
449+
_rescheduleFunctionsSql ??= $@"
450+
UPDATE {_tablePrefix}
451+
SET
452+
status = {(int) Status.Postponed},
453+
expires = 0,
454+
owner = NULL,
455+
epoch = epoch + 1
456+
WHERE
457+
owner = ?";
458+
459+
await using var command = new MySqlCommand(_rescheduleFunctionsSql, conn)
460+
{
461+
Parameters =
462+
{
463+
new() { Value = replicaId.AsGuid.ToString("N") },
464+
}
465+
};
466+
467+
await command.ExecuteNonQueryAsync();
468+
}
469+
423470
private string? _interruptSql;
424471
private string? _interruptIfExecutingSql;
425472
public async Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting)

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,7 @@ public override Task RestartExecutionReturnsEffectsAndMessages()
221221
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
222222
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());
223223

224+
[TestMethod]
225+
public override Task FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation()
226+
=> FunctionOwnedByReplicaIsPostponedAfterRescheduleFunctionsInvocation(FunctionStoreFactory.Create());
224227
}

0 commit comments

Comments
 (0)