Skip to content

Commit d5e54fb

Browse files
committed
Implemented ReplicaWatchdog
1 parent ed33a62 commit d5e54fb

File tree

7 files changed

+449
-0
lines changed

7 files changed

+449
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests.WatchDogsTests;
5+
6+
[TestClass]
7+
public class ReplicaWatchdogTests : TestTemplates.WatchDogsTests.ReplicaWatchdogTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenario()
11+
=> SunshineScenario(FunctionStoreFactory.Create());
12+
13+
[TestMethod]
14+
public override Task ReplicaWatchdogStartResultsInAddedReplicaInStore()
15+
=> ReplicaWatchdogStartResultsInAddedReplicaInStore(FunctionStoreFactory.Create());
16+
17+
[TestMethod]
18+
public override Task StrikedOutReplicaIsRemovedFromStore()
19+
=> StrikedOutReplicaIsRemovedFromStore(FunctionStoreFactory.Create());
20+
21+
[TestMethod]
22+
public override Task RunningWatchdogUpdatesItsOwnHeartbeat()
23+
=> RunningWatchdogUpdatesItsOwnHeartbeat(FunctionStoreFactory.Create());
24+
25+
[TestMethod]
26+
public override Task ReplicaIdOffsetIfCalculatedCorrectly()
27+
=> ReplicaIdOffsetIfCalculatedCorrectly(FunctionStoreFactory.Create());
28+
29+
[TestMethod]
30+
public override Task NonExistingReplicaIdOffsetIsNull()
31+
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
32+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
5+
using Cleipnir.ResilientFunctions.Domain;
6+
using Cleipnir.ResilientFunctions.Helpers;
7+
using Cleipnir.ResilientFunctions.Storage;
8+
using Cleipnir.ResilientFunctions.Tests.Utils;
9+
using Shouldly;
10+
11+
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
12+
13+
public abstract class ReplicaWatchdogTests
14+
{
15+
public abstract Task SunshineScenario();
16+
public async Task SunshineScenario(Task<IFunctionStore> storeTask)
17+
{
18+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
19+
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
20+
using var watchdog1 = new ReplicaWatchdog(
21+
replicaId1,
22+
store,
23+
checkFrequency: TimeSpan.FromHours(1),
24+
onStrikeOut: _ => {}
25+
);
26+
await watchdog1.Initialize();
27+
var allReplicas = await store.GetAll();
28+
allReplicas.Count.ShouldBe(1);
29+
var storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.Id);
30+
storedReplica1.Heartbeat.ShouldBe(0);
31+
32+
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
33+
using var watchdog2 = new ReplicaWatchdog(
34+
replicaId2,
35+
store,
36+
checkFrequency: TimeSpan.FromHours(1),
37+
onStrikeOut: _ => {}
38+
);
39+
await watchdog2.Initialize();
40+
allReplicas = await store.GetAll();
41+
allReplicas.Count.ShouldBe(2);
42+
storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.Id);
43+
storedReplica1.Heartbeat.ShouldBe(0);
44+
var storedReplica2 = allReplicas.Single(sr => sr.ReplicaId == replicaId2.Id);
45+
storedReplica2.Heartbeat.ShouldBe(0);
46+
47+
await watchdog1.PerformIteration();
48+
var replicas = await store.GetAll();
49+
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(1);
50+
replicas.Single(sr => sr.ReplicaId == replicaId2.Id).Heartbeat.ShouldBe(0);
51+
watchdog1.Strikes[new StoredReplica(replicaId2.Id, Heartbeat: 0)].ShouldBe(0);
52+
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 1)].ShouldBe(0);
53+
54+
await watchdog1.PerformIteration();
55+
replicas = await store.GetAll();
56+
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(2);
57+
replicas.Single(sr => sr.ReplicaId == replicaId2.Id).Heartbeat.ShouldBe(0);
58+
watchdog1.Strikes[new StoredReplica(replicaId2.Id, Heartbeat: 0)].ShouldBe(1);
59+
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 2)].ShouldBe(0);
60+
61+
await watchdog1.PerformIteration();
62+
replicas = await store.GetAll();
63+
replicas.Count.ShouldBe(1);
64+
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(3);
65+
watchdog1.Strikes.Count.ShouldBe(1);
66+
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 3)].ShouldBe(0);
67+
}
68+
69+
public abstract Task ReplicaWatchdogStartResultsInAddedReplicaInStore();
70+
public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctionStore> storeTask)
71+
{
72+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
73+
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
74+
using var watchdog1 = new ReplicaWatchdog(
75+
replicaId1,
76+
store,
77+
checkFrequency: TimeSpan.FromHours(1),
78+
onStrikeOut: _ => {}
79+
);
80+
await watchdog1.Start();
81+
var allReplicas = await store.GetAll();
82+
allReplicas.Count.ShouldBe(1);
83+
84+
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
85+
using var watchdog2 = new ReplicaWatchdog(
86+
replicaId2,
87+
store,
88+
checkFrequency: TimeSpan.FromHours(1),
89+
onStrikeOut: _ => {}
90+
);
91+
await watchdog2.Start();
92+
allReplicas = await store.GetAll();
93+
allReplicas.Count.ShouldBe(2);
94+
}
95+
96+
public abstract Task StrikedOutReplicaIsRemovedFromStore();
97+
public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> storeTask)
98+
{
99+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
100+
var toBeStrikedOut = Guid.NewGuid();
101+
Guid? strikedOut = null;
102+
await store.Insert(toBeStrikedOut);
103+
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
104+
using var watchdog1 = new ReplicaWatchdog(
105+
replicaId1,
106+
store,
107+
checkFrequency: TimeSpan.FromHours(1),
108+
onStrikeOut: id => strikedOut = id
109+
);
110+
await watchdog1.Initialize();
111+
await watchdog1.PerformIteration();
112+
strikedOut.ShouldBeNull();
113+
await watchdog1.PerformIteration();
114+
strikedOut.ShouldBeNull();
115+
await watchdog1.PerformIteration();
116+
strikedOut.ShouldBe(toBeStrikedOut);
117+
118+
var all = await store.GetAll();
119+
all.Count.ShouldBe(1);
120+
all.Single().ReplicaId.ShouldBe(replicaId1.Id);
121+
}
122+
123+
public abstract Task RunningWatchdogUpdatesItsOwnHeartbeat();
124+
public async Task RunningWatchdogUpdatesItsOwnHeartbeat(Task<IFunctionStore> storeTask)
125+
{
126+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
127+
var anyStrikesOut = false;
128+
var replicaId1 = new ReplicaId(Guid.NewGuid());
129+
using var watchdog1 = new ReplicaWatchdog(
130+
replicaId1,
131+
store,
132+
checkFrequency: TimeSpan.FromMilliseconds(100),
133+
onStrikeOut: _ => anyStrikesOut = true
134+
);
135+
136+
await watchdog1.Start();
137+
138+
await BusyWait.Until(async () =>
139+
{
140+
var all = await store.GetAll();
141+
all.Count.ShouldBe(1);
142+
var single = all.Single();
143+
single.ReplicaId.ShouldBe(replicaId1.Id);
144+
return single.Heartbeat > 0;
145+
});
146+
147+
anyStrikesOut.ShouldBe(false);
148+
}
149+
150+
public abstract Task ReplicaIdOffsetIfCalculatedCorrectly();
151+
public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> storeTask)
152+
{
153+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
154+
155+
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
156+
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
157+
var replicaId3 = new ReplicaId(Guid.Parse("30000000-0000-0000-0000-000000000000"));
158+
159+
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
160+
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
161+
var watchdog3 = new ReplicaWatchdog(replicaId3, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
162+
163+
await watchdog1.Initialize();
164+
await watchdog2.Initialize();
165+
await watchdog3.Initialize();
166+
167+
await watchdog3.PerformIteration();
168+
replicaId3.Offset.ShouldBe(2);
169+
await watchdog2.PerformIteration();
170+
replicaId2.Offset.ShouldBe(1);
171+
await watchdog1.PerformIteration();
172+
replicaId1.Offset.ShouldBe(0);
173+
}
174+
175+
public abstract Task NonExistingReplicaIdOffsetIsNull();
176+
public Task NonExistingReplicaIdOffsetIsNull(Task<IFunctionStore> storeTask)
177+
{
178+
var offset = ReplicaWatchdog.CalculateOffset(allReplicaIds: [], ownReplicaId: Guid.NewGuid());
179+
offset.ShouldBeNull();
180+
181+
return Task.CompletedTask;
182+
}
183+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.Domain;
7+
using Cleipnir.ResilientFunctions.Storage;
8+
9+
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
10+
11+
internal class ReplicaWatchdog(ReplicaId replicaId, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<Guid> onStrikeOut) : IDisposable
12+
{
13+
private volatile bool _disposed;
14+
private bool _started;
15+
private bool _initialized;
16+
private readonly Dictionary<StoredReplica, int> _strikes = new();
17+
18+
public async Task Start()
19+
{
20+
var originalValue = Interlocked.CompareExchange(ref _started, value: true, comparand: false);
21+
if (originalValue is true)
22+
return;
23+
24+
if (!_initialized)
25+
await Initialize();
26+
27+
_ = Task.Run(Run);
28+
}
29+
30+
public async Task Initialize()
31+
{
32+
await replicaStore.Insert(replicaId.Id);
33+
_initialized = true;
34+
}
35+
36+
private async Task Run()
37+
{
38+
while (!_disposed)
39+
{
40+
await PerformIteration();
41+
await Task.Delay(checkFrequency);
42+
}
43+
}
44+
45+
public async Task PerformIteration()
46+
{
47+
await replicaStore.UpdateHeartbeat(replicaId.Id);
48+
49+
var storedReplicas = await replicaStore.GetAll();
50+
var offset = CalculateOffset(storedReplicas.Select(sr => sr.ReplicaId), replicaId.Id);
51+
52+
if (offset is not null)
53+
replicaId.Offset = offset.Value;
54+
else
55+
{
56+
await replicaStore.Insert(replicaId.Id);
57+
_strikes.Clear();
58+
await PerformIteration();
59+
}
60+
61+
IncrementStrikesCount();
62+
ClearNonRelevantStrikes(storedReplicas);
63+
AddNewStrikes(storedReplicas);
64+
65+
await DeleteStrikedOutReplicas();
66+
}
67+
68+
public static int? CalculateOffset(IEnumerable<Guid> allReplicaIds, Guid ownReplicaId)
69+
=> allReplicaIds
70+
.Select(s => s)
71+
.Order()
72+
.Select((id, i) => new { Id = id, Index = i })
73+
.FirstOrDefault(a => a.Id == ownReplicaId)
74+
?.Index;
75+
76+
private void ClearNonRelevantStrikes(IReadOnlyList<StoredReplica> storedReplicas)
77+
{
78+
foreach (var strikeKey in _strikes.Keys.ToList())
79+
if (storedReplicas.All(sr => sr != strikeKey))
80+
_strikes.Remove(strikeKey);
81+
}
82+
83+
private void AddNewStrikes(IReadOnlyList<StoredReplica> storedReplicas)
84+
{
85+
foreach (var storedReplica in storedReplicas)
86+
_strikes.TryAdd(storedReplica, 0);
87+
}
88+
89+
private void IncrementStrikesCount()
90+
{
91+
foreach (var storedReplica in _strikes.Keys)
92+
_strikes[storedReplica]++;
93+
}
94+
95+
private async Task DeleteStrikedOutReplicas()
96+
{
97+
foreach (var (storedReplica, strikes) in _strikes.Where(kv => kv.Value >= 2))
98+
{
99+
var strikedOutId = storedReplica.ReplicaId;
100+
await replicaStore.Delete(strikedOutId);
101+
_strikes.Remove(storedReplica);
102+
onStrikeOut(strikedOutId);
103+
}
104+
}
105+
106+
public IReadOnlyDictionary<StoredReplica, int> Strikes => _strikes;
107+
108+
public void Stop() => Dispose();
109+
public void Dispose() => _disposed = true;
110+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace Cleipnir.ResilientFunctions.Domain;
5+
6+
public class ReplicaId
7+
{
8+
public Guid Id { get; }
9+
10+
private int _offset;
11+
public int Offset
12+
{
13+
get
14+
{
15+
lock (_sync)
16+
return _offset;
17+
}
18+
set
19+
{
20+
lock (_sync)
21+
_offset = value;
22+
}
23+
}
24+
25+
private readonly Lock _sync = new();
26+
27+
public ReplicaId(Guid id) => Id = id;
28+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using Cleipnir.ResilientFunctions.MariaDb.Tests;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.MariaDB.Tests.WatchDogsTests;
5+
6+
[TestClass]
7+
public class ReplicaWatchdogTests : ResilientFunctions.Tests.TestTemplates.WatchDogsTests.ReplicaWatchdogTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenario()
11+
=> SunshineScenario(FunctionStoreFactory.Create());
12+
13+
[TestMethod]
14+
public override Task ReplicaWatchdogStartResultsInAddedReplicaInStore()
15+
=> ReplicaWatchdogStartResultsInAddedReplicaInStore(FunctionStoreFactory.Create());
16+
17+
[TestMethod]
18+
public override Task StrikedOutReplicaIsRemovedFromStore()
19+
=> StrikedOutReplicaIsRemovedFromStore(FunctionStoreFactory.Create());
20+
21+
[TestMethod]
22+
public override Task RunningWatchdogUpdatesItsOwnHeartbeat()
23+
=> RunningWatchdogUpdatesItsOwnHeartbeat(FunctionStoreFactory.Create());
24+
25+
[TestMethod]
26+
public override Task ReplicaIdOffsetIfCalculatedCorrectly()
27+
=> ReplicaIdOffsetIfCalculatedCorrectly(FunctionStoreFactory.Create());
28+
29+
[TestMethod]
30+
public override Task NonExistingReplicaIdOffsetIsNull()
31+
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
32+
}

0 commit comments

Comments
 (0)