Skip to content

Commit 8767ef6

Browse files
committed
Improved ReplicaWatchdog
1 parent 2b2f683 commit 8767ef6

File tree

7 files changed

+83
-22
lines changed

7 files changed

+83
-22
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ public override Task NonExistingReplicaIdOffsetIsNull()
4141
[TestMethod]
4242
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4343
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
44+
45+
[TestMethod]
46+
public override Task ReplicaWatchdogUpdatesHeartbeat()
47+
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
4448
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Linq;
33
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.CoreRuntime;
45
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
56
using Cleipnir.ResilientFunctions.Domain;
67
using Cleipnir.ResilientFunctions.Helpers;
@@ -21,7 +22,8 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
2122
using var watchdog1 = new ReplicaWatchdog(
2223
replicaId1,
2324
functionStore,
24-
checkFrequency: TimeSpan.FromHours(1)
25+
checkFrequency: TimeSpan.FromHours(1),
26+
default(UnhandledExceptionHandler)!
2527
);
2628
await watchdog1.Initialize();
2729
var allReplicas = await store.GetAll();
@@ -33,7 +35,8 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
3335
using var watchdog2 = new ReplicaWatchdog(
3436
replicaId2,
3537
functionStore,
36-
checkFrequency: TimeSpan.FromHours(1)
38+
checkFrequency: TimeSpan.FromHours(1),
39+
default(UnhandledExceptionHandler)!
3740
);
3841
await watchdog2.Initialize();
3942
allReplicas = await store.GetAll();
@@ -74,7 +77,8 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
7477
using var watchdog1 = new ReplicaWatchdog(
7578
replicaId1,
7679
functionStore,
77-
checkFrequency: TimeSpan.FromHours(1)
80+
checkFrequency: TimeSpan.FromHours(1),
81+
default(UnhandledExceptionHandler)!
7882
);
7983
await watchdog1.Start();
8084
var allReplicas = await store.GetAll();
@@ -84,7 +88,8 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
8488
using var watchdog2 = new ReplicaWatchdog(
8589
replicaId2,
8690
functionStore,
87-
checkFrequency: TimeSpan.FromHours(1)
91+
checkFrequency: TimeSpan.FromHours(1),
92+
default(UnhandledExceptionHandler)!
8893
);
8994
await watchdog2.Start();
9095
allReplicas = await store.GetAll();
@@ -103,7 +108,8 @@ public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> store
103108
using var watchdog1 = new ReplicaWatchdog(
104109
replicaId1,
105110
functionStore,
106-
checkFrequency: TimeSpan.FromHours(1)
111+
checkFrequency: TimeSpan.FromHours(1),
112+
default(UnhandledExceptionHandler)!
107113
);
108114
await watchdog1.Initialize();
109115
await watchdog1.PerformIteration();
@@ -127,7 +133,8 @@ public async Task RunningWatchdogUpdatesItsOwnHeartbeat(Task<IFunctionStore> sto
127133
using var watchdog1 = new ReplicaWatchdog(
128134
replicaId1,
129135
functionStore,
130-
checkFrequency: TimeSpan.FromMilliseconds(100)
136+
checkFrequency: TimeSpan.FromMilliseconds(100),
137+
default(UnhandledExceptionHandler)!
131138
);
132139

133140
await watchdog1.Start();
@@ -152,9 +159,9 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
152159
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
153160
var replicaId3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
154161

155-
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1));
156-
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1));
157-
var watchdog3 = new ReplicaWatchdog(replicaId3, store, checkFrequency: TimeSpan.FromHours(1));
162+
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
163+
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
164+
var watchdog3 = new ReplicaWatchdog(replicaId3, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
158165

159166
await watchdog1.Initialize();
160167
await watchdog2.Initialize();
@@ -178,9 +185,9 @@ public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFuncti
178185
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
179186
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
180187

181-
var watchdog1 = new ReplicaWatchdog(cluster1, functionStore, checkFrequency: TimeSpan.FromHours(1));
182-
var watchdog2 = new ReplicaWatchdog(cluster2, functionStore, checkFrequency: TimeSpan.FromHours(1));
183-
var watchdog3 = new ReplicaWatchdog(cluster3, functionStore, checkFrequency: TimeSpan.FromHours(1));
188+
var watchdog1 = new ReplicaWatchdog(cluster1, functionStore, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
189+
var watchdog2 = new ReplicaWatchdog(cluster2, functionStore, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
190+
var watchdog3 = new ReplicaWatchdog(cluster3, functionStore, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
184191

185192
await watchdog3.Initialize();
186193
cluster3.Offset.ShouldBe(0);
@@ -227,9 +234,9 @@ public async Task ActiveReplicasDoNotDeleteEachOther(Task<IFunctionStore> storeT
227234
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
228235
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
229236

230-
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1));
231-
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1));
232-
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1));
237+
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
238+
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
239+
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
233240

234241
await watchdog1.Initialize();
235242
await watchdog2.Initialize();
@@ -287,7 +294,8 @@ await functionStore.CreateFunction(
287294
using var watchdog1 = new ReplicaWatchdog(
288295
replicaId1,
289296
functionStore,
290-
checkFrequency: TimeSpan.FromHours(1)
297+
checkFrequency: TimeSpan.FromHours(1),
298+
default(UnhandledExceptionHandler)!
291299
);
292300
await watchdog1.Initialize();
293301
await watchdog1.PerformIteration();
@@ -300,4 +308,24 @@ await functionStore.CreateFunction(
300308
sf.Expires.ShouldBe(0);
301309
sf.OwnerId.ShouldBeNull();
302310
}
311+
312+
public abstract Task ReplicaWatchdogUpdatesHeartbeat();
313+
public async Task ReplicaWatchdogUpdatesHeartbeat(Task<IFunctionStore> storeTask)
314+
{
315+
var functionStore = await storeTask;
316+
var replicaStore = functionStore.ReplicaStore;
317+
318+
var replicaId = ReplicaId.NewId();
319+
var clusterInfo = new ClusterInfo(replicaId);
320+
using var watchdog = new ReplicaWatchdog(clusterInfo, functionStore, checkFrequency: TimeSpan.FromMilliseconds(10), default(UnhandledExceptionHandler)!);
321+
await watchdog.Start();
322+
323+
await Task.Delay(100);
324+
325+
var storedReplicas = await replicaStore.GetAll();
326+
storedReplicas.Count.ShouldBe(1);
327+
storedReplicas.Single().ReplicaId.ShouldBe(replicaId);
328+
storedReplicas.Single().Heartbeat.ShouldBeGreaterThan(1);
329+
330+
}
303331
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/ReplicaWatchdog.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
using System.Threading;
55
using System.Threading.Tasks;
66
using Cleipnir.ResilientFunctions.Domain;
7+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
78
using Cleipnir.ResilientFunctions.Storage;
89

910
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1011

11-
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IFunctionStore functionStore, TimeSpan checkFrequency) : IDisposable
12+
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IFunctionStore functionStore, TimeSpan checkFrequency, UnhandledExceptionHandler unhandledExceptionHandler) : IDisposable
1213
{
1314
private volatile bool _disposed;
1415
private bool _started;
@@ -54,7 +55,15 @@ private async Task Run()
5455
{
5556
while (!_disposed)
5657
{
57-
await PerformIteration();
58+
try
59+
{
60+
await PerformIteration();
61+
}
62+
catch (Exception ex)
63+
{
64+
unhandledExceptionHandler.Invoke(new FrameworkException("ReplicaWatchdog failed during iteration", ex));
65+
}
66+
5867
await Task.Delay(checkFrequency);
5968
}
6069
}
@@ -121,7 +130,7 @@ private async Task DeleteStrikedOutReplicas()
121130
await functionStore.RescheduleCrashedFunctions(strikedOutId);
122131
_ = Task
123132
.Delay(TimeSpan.FromSeconds(5))
124-
.ContinueWith(_ => functionStore.RescheduleCrashedFunctions(strikedOutId));
133+
.ContinueWith(_ => { if (!_disposed) functionStore.RescheduleCrashedFunctions(strikedOutId); });
125134

126135
_strikes.Remove(storedReplica);
127136
}

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class FunctionsRegistry : IDisposable
3232

3333
private volatile bool _disposed;
3434
private readonly Lock _sync = new();
35-
35+
private readonly ReplicaWatchdog _replicaWatchdog;
36+
3637
public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null)
3738
{
3839
_functionStore = functionStore;
@@ -62,6 +63,8 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null
6263
);
6364

6465
ClusterInfo = new ClusterInfo(ReplicaId.NewId());
66+
_replicaWatchdog = new ReplicaWatchdog(ClusterInfo, functionStore, checkFrequency: TimeSpan.FromSeconds(1), _settings.UnhandledExceptionHandler);
67+
6568
}
6669

6770
#region Func overloads
@@ -482,19 +485,24 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
482485
public Task ShutdownGracefully(TimeSpan? maxWait = null)
483486
{
484487
_disposed = true;
488+
485489
// ReSharper disable once InconsistentlySynchronizedField
486490
var shutdownTask = _shutdownCoordinator.PerformShutdown();
487491
if (maxWait == null)
488492
return shutdownTask;
489493

490494
var tcs = new TaskCompletionSource();
491-
shutdownTask.ContinueWith(_ => tcs.TrySetResult());
495+
shutdownTask.ContinueWith(_ =>
496+
{
497+
_replicaWatchdog.Dispose();
498+
tcs.TrySetResult();
499+
});
492500

493501
Task.Delay(maxWait.Value)
494502
.ContinueWith(_ =>
495503
tcs.TrySetException(new TimeoutException("Shutdown did not complete within threshold"))
496504
);
497-
505+
498506
return tcs.Task;
499507
}
500508
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ public override Task ActiveReplicasDoNotDeleteEachOther()
4141
[TestMethod]
4242
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4343
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
44+
45+
[TestMethod]
46+
public override Task ReplicaWatchdogUpdatesHeartbeat()
47+
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
4448
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ public override Task ActiveReplicasDoNotDeleteEachOther()
4141
[TestMethod]
4242
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4343
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
44+
45+
[TestMethod]
46+
public override Task ReplicaWatchdogUpdatesHeartbeat()
47+
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
4448
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ public override Task ActiveReplicasDoNotDeleteEachOther()
4141
[TestMethod]
4242
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4343
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
44+
45+
[TestMethod]
46+
public override Task ReplicaWatchdogUpdatesHeartbeat()
47+
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
4448
}

0 commit comments

Comments
 (0)