Skip to content

Commit e2a4cd7

Browse files
committed
Added ClusterInfo type
1 parent 162cb3e commit e2a4cd7

File tree

8 files changed

+148
-59
lines changed

8 files changed

+148
-59
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public override Task RunningWatchdogUpdatesItsOwnHeartbeat()
2626
public override Task ReplicaIdOffsetIfCalculatedCorrectly()
2727
=> ReplicaIdOffsetIfCalculatedCorrectly(FunctionStoreFactory.Create());
2828

29+
[TestMethod]
30+
public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
31+
=> ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(FunctionStoreFactory.Create());
32+
2933
[TestMethod]
3034
public override Task NonExistingReplicaIdOffsetIsNull()
3135
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public abstract class ReplicaWatchdogTests
1616
public async Task SunshineScenario(Task<IFunctionStore> storeTask)
1717
{
1818
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
19-
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
19+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
2020
using var watchdog1 = new ReplicaWatchdog(
2121
replicaId1,
2222
store,
@@ -26,10 +26,10 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
2626
await watchdog1.Initialize();
2727
var allReplicas = await store.GetAll();
2828
allReplicas.Count.ShouldBe(1);
29-
var storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.Id);
29+
var storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId);
3030
storedReplica1.Heartbeat.ShouldBe(0);
3131

32-
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
32+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
3333
using var watchdog2 = new ReplicaWatchdog(
3434
replicaId2,
3535
store,
@@ -39,38 +39,38 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
3939
await watchdog2.Initialize();
4040
allReplicas = await store.GetAll();
4141
allReplicas.Count.ShouldBe(2);
42-
storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.Id);
42+
storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId);
4343
storedReplica1.Heartbeat.ShouldBe(0);
44-
var storedReplica2 = allReplicas.Single(sr => sr.ReplicaId == replicaId2.Id);
44+
var storedReplica2 = allReplicas.Single(sr => sr.ReplicaId == replicaId2.ReplicaId);
4545
storedReplica2.Heartbeat.ShouldBe(0);
4646

4747
await watchdog1.PerformIteration();
4848
var replicas = await store.GetAll();
49-
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(1);
50-
replicas.Single(sr => sr.ReplicaId == replicaId2.Id).Heartbeat.ShouldBe(0);
51-
watchdog1.Strikes[new StoredReplica(replicaId2.Id, Heartbeat: 0)].ShouldBe(0);
52-
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 1)].ShouldBe(0);
49+
replicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId).Heartbeat.ShouldBe(1);
50+
replicas.Single(sr => sr.ReplicaId == replicaId2.ReplicaId).Heartbeat.ShouldBe(0);
51+
watchdog1.Strikes[new StoredReplica(replicaId2.ReplicaId, Heartbeat: 0)].ShouldBe(0);
52+
watchdog1.Strikes[new StoredReplica(replicaId1.ReplicaId, Heartbeat: 1)].ShouldBe(0);
5353

5454
await watchdog1.PerformIteration();
5555
replicas = await store.GetAll();
56-
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(2);
57-
replicas.Single(sr => sr.ReplicaId == replicaId2.Id).Heartbeat.ShouldBe(0);
58-
watchdog1.Strikes[new StoredReplica(replicaId2.Id, Heartbeat: 0)].ShouldBe(1);
59-
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 2)].ShouldBe(0);
56+
replicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId).Heartbeat.ShouldBe(2);
57+
replicas.Single(sr => sr.ReplicaId == replicaId2.ReplicaId).Heartbeat.ShouldBe(0);
58+
watchdog1.Strikes[new StoredReplica(replicaId2.ReplicaId, Heartbeat: 0)].ShouldBe(1);
59+
watchdog1.Strikes[new StoredReplica(replicaId1.ReplicaId, Heartbeat: 2)].ShouldBe(0);
6060

6161
await watchdog1.PerformIteration();
6262
replicas = await store.GetAll();
6363
replicas.Count.ShouldBe(1);
64-
replicas.Single(sr => sr.ReplicaId == replicaId1.Id).Heartbeat.ShouldBe(3);
64+
replicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId).Heartbeat.ShouldBe(3);
6565
watchdog1.Strikes.Count.ShouldBe(1);
66-
watchdog1.Strikes[new StoredReplica(replicaId1.Id, Heartbeat: 3)].ShouldBe(0);
66+
watchdog1.Strikes[new StoredReplica(replicaId1.ReplicaId, Heartbeat: 3)].ShouldBe(0);
6767
}
6868

6969
public abstract Task ReplicaWatchdogStartResultsInAddedReplicaInStore();
7070
public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctionStore> storeTask)
7171
{
7272
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
73-
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
73+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
7474
using var watchdog1 = new ReplicaWatchdog(
7575
replicaId1,
7676
store,
@@ -81,7 +81,7 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
8181
var allReplicas = await store.GetAll();
8282
allReplicas.Count.ShouldBe(1);
8383

84-
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
84+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
8585
using var watchdog2 = new ReplicaWatchdog(
8686
replicaId2,
8787
store,
@@ -100,7 +100,7 @@ public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> store
100100
var toBeStrikedOut = Guid.NewGuid();
101101
Guid? strikedOut = null;
102102
await store.Insert(toBeStrikedOut);
103-
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
103+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
104104
using var watchdog1 = new ReplicaWatchdog(
105105
replicaId1,
106106
store,
@@ -117,15 +117,15 @@ public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> store
117117

118118
var all = await store.GetAll();
119119
all.Count.ShouldBe(1);
120-
all.Single().ReplicaId.ShouldBe(replicaId1.Id);
120+
all.Single().ReplicaId.ShouldBe(replicaId1.ReplicaId);
121121
}
122122

123123
public abstract Task RunningWatchdogUpdatesItsOwnHeartbeat();
124124
public async Task RunningWatchdogUpdatesItsOwnHeartbeat(Task<IFunctionStore> storeTask)
125125
{
126126
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
127127
var anyStrikesOut = false;
128-
var replicaId1 = new ReplicaId(Guid.NewGuid());
128+
var replicaId1 = new ClusterInfo(Guid.NewGuid());
129129
using var watchdog1 = new ReplicaWatchdog(
130130
replicaId1,
131131
store,
@@ -140,7 +140,7 @@ await BusyWait.Until(async () =>
140140
var all = await store.GetAll();
141141
all.Count.ShouldBe(1);
142142
var single = all.Single();
143-
single.ReplicaId.ShouldBe(replicaId1.Id);
143+
single.ReplicaId.ShouldBe(replicaId1.ReplicaId);
144144
return single.Heartbeat > 0;
145145
});
146146

@@ -152,9 +152,9 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
152152
{
153153
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
154154

155-
var replicaId1 = new ReplicaId(Guid.Parse("10000000-0000-0000-0000-000000000000"));
156-
var replicaId2 = new ReplicaId(Guid.Parse("20000000-0000-0000-0000-000000000000"));
157-
var replicaId3 = new ReplicaId(Guid.Parse("30000000-0000-0000-0000-000000000000"));
155+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
156+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
157+
var replicaId3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000"));
158158

159159
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
160160
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
@@ -172,6 +172,54 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
172172
replicaId1.Offset.ShouldBe(0);
173173
}
174174

175+
public abstract Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted();
176+
public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFunctionStore> storeTask)
177+
{
178+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
179+
180+
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
181+
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
182+
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000"));
183+
184+
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
185+
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
186+
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
187+
188+
await watchdog3.Initialize();
189+
cluster3.Offset.ShouldBe(0);
190+
cluster3.ReplicaCount.ShouldBe(1);
191+
192+
await watchdog2.Initialize();
193+
await watchdog3.PerformIteration();
194+
cluster3.Offset.ShouldBe(1);
195+
cluster3.ReplicaCount.ShouldBe(2);
196+
cluster2.Offset.ShouldBe(0);
197+
cluster2.ReplicaCount.ShouldBe(2);
198+
199+
await watchdog1.Initialize();
200+
await watchdog2.PerformIteration();
201+
await watchdog3.PerformIteration();
202+
cluster3.Offset.ShouldBe(2);
203+
cluster3.ReplicaCount.ShouldBe(3);
204+
cluster2.Offset.ShouldBe(1);
205+
cluster2.ReplicaCount.ShouldBe(3);
206+
cluster1.Offset.ShouldBe(0);
207+
cluster1.ReplicaCount.ShouldBe(3);
208+
209+
await store.Delete(cluster1.ReplicaId);
210+
await watchdog3.PerformIteration();
211+
await watchdog2.PerformIteration();
212+
cluster3.Offset.ShouldBe(1);
213+
cluster3.ReplicaCount.ShouldBe(2);
214+
cluster2.Offset.ShouldBe(0);
215+
cluster2.ReplicaCount.ShouldBe(2);
216+
217+
await store.Delete(cluster2.ReplicaId);
218+
await watchdog3.PerformIteration();
219+
cluster3.Offset.ShouldBe(0);
220+
cluster3.ReplicaCount.ShouldBe(1);
221+
}
222+
175223
public abstract Task NonExistingReplicaIdOffsetIsNull();
176224
public Task NonExistingReplicaIdOffsetIsNull(Task<IFunctionStore> storeTask)
177225
{

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/ReplicaWatchdog.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1010

11-
internal class ReplicaWatchdog(ReplicaId replicaId, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<Guid> onStrikeOut) : IDisposable
11+
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<Guid> onStrikeOut) : IDisposable
1212
{
1313
private volatile bool _disposed;
1414
private bool _started;
@@ -29,7 +29,14 @@ public async Task Start()
2929

3030
public async Task Initialize()
3131
{
32-
await replicaStore.Insert(replicaId.Id);
32+
await replicaStore.Insert(clusterInfo.ReplicaId);
33+
var replicas = await replicaStore.GetAll();
34+
var offset = CalculateOffset(replicas.Select(sr => sr.ReplicaId), clusterInfo.ReplicaId);
35+
if (offset is null)
36+
throw new InvalidOperationException("Replica offset was null after initialization");
37+
38+
clusterInfo.ReplicaCount = replicas.Count;
39+
clusterInfo.Offset = offset.Value;
3340
_initialized = true;
3441
}
3542

@@ -44,16 +51,19 @@ private async Task Run()
4451

4552
public async Task PerformIteration()
4653
{
47-
await replicaStore.UpdateHeartbeat(replicaId.Id);
54+
await replicaStore.UpdateHeartbeat(clusterInfo.ReplicaId);
4855

4956
var storedReplicas = await replicaStore.GetAll();
50-
var offset = CalculateOffset(storedReplicas.Select(sr => sr.ReplicaId), replicaId.Id);
57+
var offset = CalculateOffset(storedReplicas.Select(sr => sr.ReplicaId), clusterInfo.ReplicaId);
5158

5259
if (offset is not null)
53-
replicaId.Offset = offset.Value;
60+
{
61+
clusterInfo.Offset = offset.Value;
62+
clusterInfo.ReplicaCount = storedReplicas.Count;
63+
}
5464
else
5565
{
56-
await replicaStore.Insert(replicaId.Id);
66+
await replicaStore.Insert(clusterInfo.ReplicaId);
5767
_strikes.Clear();
5868
await PerformIteration();
5969
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace Cleipnir.ResilientFunctions.Domain;
5+
6+
public class ClusterInfo
7+
{
8+
public Guid ReplicaId { get; }
9+
10+
private int _offset;
11+
public int Offset
12+
{
13+
get
14+
{
15+
lock (_sync)
16+
return _offset;
17+
}
18+
set
19+
{
20+
lock (_sync)
21+
_offset = value;
22+
}
23+
}
24+
25+
private int _replicaCount;
26+
public int ReplicaCount
27+
{
28+
get
29+
{
30+
lock (_sync)
31+
return _replicaCount;
32+
}
33+
set
34+
{
35+
lock (_sync)
36+
_replicaCount = value;
37+
}
38+
}
39+
40+
private readonly Lock _sync = new();
41+
42+
public ClusterInfo(Guid replicaId) => ReplicaId = replicaId;
43+
}

Core/Cleipnir.ResilientFunctions/Domain/ReplicaId.cs

Lines changed: 0 additions & 28 deletions
This file was deleted.

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ public override Task ReplicaIdOffsetIfCalculatedCorrectly()
2929
[TestMethod]
3030
public override Task NonExistingReplicaIdOffsetIsNull()
3131
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
32+
33+
[TestMethod]
34+
public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
35+
=> ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(FunctionStoreFactory.Create());
3236
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,9 @@ public override Task ReplicaIdOffsetIfCalculatedCorrectly()
2828

2929
[TestMethod]
3030
public override Task NonExistingReplicaIdOffsetIsNull()
31-
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
31+
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
32+
33+
[TestMethod]
34+
public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
35+
=> ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(FunctionStoreFactory.Create());
3236
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ public override Task ReplicaIdOffsetIfCalculatedCorrectly()
2929
[TestMethod]
3030
public override Task NonExistingReplicaIdOffsetIsNull()
3131
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
32+
33+
[TestMethod]
34+
public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
35+
=> ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(FunctionStoreFactory.Create());
3236
}

0 commit comments

Comments
 (0)