Skip to content

Commit 59a0e9d

Browse files
authored
Added ReplicaStore
1 parent 4aa334e commit 59a0e9d

File tree

19 files changed

+565
-2
lines changed

19 files changed

+565
-2
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore
2525
public Utilities Utilities => _inner.Utilities;
2626
public IMigrator Migrator => _inner.Migrator;
2727
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
28+
public IReplicaStore ReplicaStore => _inner.ReplicaStore;
2829
public Task Initialize() => _inner.Initialize();
2930

3031
public Task<bool> CreateFunction(
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;
5+
6+
[TestClass]
7+
public class ReplicaStoreTests : TestTemplates.ReplicaStoreTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenarioTest()
11+
=> SunshineScenarioTest(FunctionStoreFactory.Create());
12+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.Helpers;
5+
using Cleipnir.ResilientFunctions.Storage;
6+
using Cleipnir.ResilientFunctions.Tests.Utils;
7+
using Shouldly;
8+
9+
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates;
10+
11+
public abstract class ReplicaStoreTests
12+
{
13+
public abstract Task SunshineScenarioTest();
14+
protected async Task SunshineScenarioTest(Task<IFunctionStore> storeTask)
15+
{
16+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
17+
await store.GetAll().ShouldBeEmptyAsync();
18+
var replicaId1 = Guid.NewGuid();
19+
var replicaId2 = Guid.NewGuid();
20+
21+
{
22+
await store.Insert(replicaId1);
23+
var all = await store.GetAll();
24+
all.Count.ShouldBe(1);
25+
var stored = all.Single();
26+
stored.ReplicaId.ShouldBe(replicaId1);
27+
stored.Heartbeat.ShouldBe(0);
28+
}
29+
30+
{
31+
await store.Insert(replicaId2);
32+
var all = await store.GetAll();
33+
all.Count.ShouldBe(2);
34+
var stored = all.Single(id => id.ReplicaId == replicaId2);
35+
stored.ReplicaId.ShouldBe(replicaId2);
36+
stored.Heartbeat.ShouldBe(0);
37+
}
38+
39+
await store.UpdateHeartbeat(replicaId1);
40+
{
41+
var all = await store.GetAll();
42+
all.Count.ShouldBe(2);
43+
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
44+
stored1.Heartbeat.ShouldBe(1);
45+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
46+
stored2.Heartbeat.ShouldBe(0);
47+
}
48+
49+
await store.UpdateHeartbeat(replicaId2);
50+
{
51+
var all = await store.GetAll();
52+
all.Count.ShouldBe(2);
53+
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
54+
stored1.Heartbeat.ShouldBe(1);
55+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
56+
stored2.Heartbeat.ShouldBe(1);
57+
}
58+
59+
await store.Delete(replicaId1);
60+
{
61+
var all = await store.GetAll();
62+
all.Count.ShouldBe(1);
63+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
64+
stored2.Heartbeat.ShouldBe(1);
65+
}
66+
67+
await store.Delete(replicaId2);
68+
{
69+
var all = await store.GetAll();
70+
all.ShouldBeEmpty();
71+
}
72+
}
73+
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class CrashableFunctionStore : IFunctionStore
2525
public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities;
2626
public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator;
2727
public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore;
28+
public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore;
2829

2930
public CrashableFunctionStore(IFunctionStore inner)
3031
{

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public interface IFunctionStore
1616
public Utilities Utilities { get; }
1717
public IMigrator Migrator { get; }
1818
public ISemaphoreStore SemaphoreStore { get; }
19+
public IReplicaStore ReplicaStore { get; }
1920
public Task Initialize();
2021

2122
Task<bool> CreateFunction(
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
5+
namespace Cleipnir.ResilientFunctions.Storage;
6+
7+
public interface IReplicaStore
8+
{
9+
public Task Initialize();
10+
public Task Insert(Guid replicaId);
11+
public Task Delete(Guid replicaId);
12+
public Task UpdateHeartbeat(Guid replicaId);
13+
public Task<IReadOnlyList<StoredReplica>> GetAll();
14+
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -30,6 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore
3031

3132
public IMigrator Migrator { get; } = new InMemoryMigrator();
3233
public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore();
34+
public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore();
3335

3436
public Task Initialize() => Task.CompletedTask;
3537

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.Helpers;
7+
8+
namespace Cleipnir.ResilientFunctions.Storage;
9+
10+
public class InMemoryReplicaStore : IReplicaStore
11+
{
12+
private readonly Dictionary<Guid, int> _replicas = new();
13+
private readonly Lock _sync = new();
14+
15+
public Task Initialize() => Task.CompletedTask;
16+
17+
public Task Insert(Guid replicaId)
18+
{
19+
lock (_sync)
20+
_replicas.TryAdd(replicaId, 0);
21+
22+
return Task.CompletedTask;
23+
}
24+
25+
public Task Delete(Guid replicaId)
26+
{
27+
lock (_sync)
28+
_replicas.Remove(replicaId);
29+
30+
return Task.CompletedTask;
31+
}
32+
33+
public Task UpdateHeartbeat(Guid replicaId)
34+
{
35+
lock (_sync)
36+
if (_replicas.ContainsKey(replicaId))
37+
_replicas[replicaId]++;
38+
39+
return Task.CompletedTask;
40+
}
41+
42+
public Task<IReadOnlyList<StoredReplica>> GetAll()
43+
{
44+
lock (_sync)
45+
return _replicas
46+
.Select(kv => new StoredReplica(kv.Key, kv.Value))
47+
.ToList()
48+
.CastTo<IReadOnlyList<StoredReplica>>()
49+
.ToTask();
50+
}
51+
}

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,6 @@ public static class StoredEffectExtensions
137137
{
138138
public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId, CrudOperation operation)
139139
=> new(storedId, effect.StoredEffectId, operation, effect);
140-
}
140+
}
141+
142+
public record StoredReplica(Guid ReplicaId, int Heartbeat);

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class CrashableFunctionStore : IFunctionStore
2121
public Utilities Utilities => _inner.Utilities;
2222
public IMigrator Migrator => _inner.Migrator;
2323
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
24+
public IReplicaStore ReplicaStore => _inner.ReplicaStore;
2425

2526
public CrashableFunctionStore(IFunctionStore inner) => _inner = inner;
2627

0 commit comments

Comments
 (0)