Skip to content

Added ReplicaStore #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore
public Utilities Utilities => _inner.Utilities;
public IMigrator Migrator => _inner.Migrator;
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
public IReplicaStore ReplicaStore => _inner.ReplicaStore;
public Task Initialize() => _inner.Initialize();

public Task<bool> CreateFunction(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;

[TestClass]
public class ReplicaStoreTests : TestTemplates.ReplicaStoreTests
{
[TestMethod]
public override Task SunshineScenarioTest()
=> SunshineScenarioTest(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Shouldly;

namespace Cleipnir.ResilientFunctions.Tests.TestTemplates;

public abstract class ReplicaStoreTests
{
public abstract Task SunshineScenarioTest();
protected async Task SunshineScenarioTest(Task<IFunctionStore> storeTask)
{
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
await store.GetAll().ShouldBeEmptyAsync();
var replicaId1 = Guid.NewGuid();
var replicaId2 = Guid.NewGuid();

{
await store.Insert(replicaId1);
var all = await store.GetAll();
all.Count.ShouldBe(1);
var stored = all.Single();
stored.ReplicaId.ShouldBe(replicaId1);
stored.Heartbeat.ShouldBe(0);
}

{
await store.Insert(replicaId2);
var all = await store.GetAll();
all.Count.ShouldBe(2);
var stored = all.Single(id => id.ReplicaId == replicaId2);
stored.ReplicaId.ShouldBe(replicaId2);
stored.Heartbeat.ShouldBe(0);
}

await store.UpdateHeartbeat(replicaId1);
{
var all = await store.GetAll();
all.Count.ShouldBe(2);
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
stored1.Heartbeat.ShouldBe(1);
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
stored2.Heartbeat.ShouldBe(0);
}

await store.UpdateHeartbeat(replicaId2);
{
var all = await store.GetAll();
all.Count.ShouldBe(2);
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
stored1.Heartbeat.ShouldBe(1);
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
stored2.Heartbeat.ShouldBe(1);
}

await store.Delete(replicaId1);
{
var all = await store.GetAll();
all.Count.ShouldBe(1);
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
stored2.Heartbeat.ShouldBe(1);
}

await store.Delete(replicaId2);
{
var all = await store.GetAll();
all.ShouldBeEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class CrashableFunctionStore : IFunctionStore
public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities;
public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator;
public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore;
public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore;

public CrashableFunctionStore(IFunctionStore inner)
{
Expand Down
1 change: 1 addition & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public interface IFunctionStore
public Utilities Utilities { get; }
public IMigrator Migrator { get; }
public ISemaphoreStore SemaphoreStore { get; }
public IReplicaStore ReplicaStore { get; }
public Task Initialize();

Task<bool> CreateFunction(
Expand Down
14 changes: 14 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/IReplicaStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.Storage;

public interface IReplicaStore
{
public Task Initialize();
public Task Insert(Guid replicaId);
public Task Delete(Guid replicaId);
public Task UpdateHeartbeat(Guid replicaId);
public Task<IReadOnlyList<StoredReplica>> GetAll();
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -30,6 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore

public IMigrator Migrator { get; } = new InMemoryMigrator();
public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore();
public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore();

public Task Initialize() => Task.CompletedTask;

Expand Down
51 changes: 51 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;

namespace Cleipnir.ResilientFunctions.Storage;

public class InMemoryReplicaStore : IReplicaStore
{
private readonly Dictionary<Guid, int> _replicas = new();
private readonly Lock _sync = new();

public Task Initialize() => Task.CompletedTask;

public Task Insert(Guid replicaId)
{
lock (_sync)
_replicas.TryAdd(replicaId, 0);

return Task.CompletedTask;
}

public Task Delete(Guid replicaId)
{
lock (_sync)
_replicas.Remove(replicaId);

return Task.CompletedTask;
}

public Task UpdateHeartbeat(Guid replicaId)
{
lock (_sync)
if (_replicas.ContainsKey(replicaId))
_replicas[replicaId]++;

return Task.CompletedTask;
}

public Task<IReadOnlyList<StoredReplica>> GetAll()
{
lock (_sync)
return _replicas
.Select(kv => new StoredReplica(kv.Key, kv.Value))
.ToList()
.CastTo<IReadOnlyList<StoredReplica>>()
.ToTask();
}
}
4 changes: 3 additions & 1 deletion Core/Cleipnir.ResilientFunctions/Storage/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,6 @@ public static class StoredEffectExtensions
{
public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId, CrudOperation operation)
=> new(storedId, effect.StoredEffectId, operation, effect);
}
}

public record StoredReplica(Guid ReplicaId, int Heartbeat);
1 change: 1 addition & 0 deletions Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class CrashableFunctionStore : IFunctionStore
public Utilities Utilities => _inner.Utilities;
public IMigrator Migrator => _inner.Migrator;
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
public IReplicaStore ReplicaStore => _inner.ReplicaStore;

public CrashableFunctionStore(IFunctionStore inner) => _inner = inner;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.MariaDb.Tests;

[TestClass]
public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests
{
[TestMethod]
public override Task SunshineScenarioTest()
=> SunshineScenarioTest(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class MariaDbFunctionStore : IFunctionStore
private readonly MariaDbSemaphoreStore _semaphoreStore;
public ISemaphoreStore SemaphoreStore => _semaphoreStore;

private readonly MariaDbReplicaStore _replicaStore;
public IReplicaStore ReplicaStore => _replicaStore;

public Utilities Utilities { get; }
private readonly MariaDbUnderlyingRegister _mariaDbUnderlyingRegister;

Expand All @@ -57,6 +60,7 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "")
_mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix);
_typeStore = new MariaDbTypeStore(connectionString, tablePrefix);
_migrator = new MariaDbMigrator(connectionString, tablePrefix);
_replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix);

Utilities = new Utilities(_mariaDbUnderlyingRegister);
}
Expand All @@ -75,6 +79,7 @@ public async Task Initialize()
await _semaphoreStore.Initialize();
await TimeoutStore.Initialize();
await _typeStore.Initialize();
await _replicaStore.Initialize();
await using var conn = await CreateOpenConnection(_connectionString);
_initializeSql ??= $@"
CREATE TABLE IF NOT EXISTS {_tablePrefix} (
Expand Down Expand Up @@ -108,6 +113,7 @@ public async Task TruncateTables()
await _correlationStore.Truncate();
await _semaphoreStore.Truncate();
await _typeStore.Truncate();
await _replicaStore.Truncate();

await using var conn = await CreateOpenConnection(_connectionString);
_truncateTablesSql ??= $"TRUNCATE TABLE {_tablePrefix}";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using Cleipnir.ResilientFunctions.Storage;
using MySqlConnector;

namespace Cleipnir.ResilientFunctions.MariaDb;

public class MariaDbReplicaStore(string connectionString, string tablePrefix) : IReplicaStore
{
private string? _initializeSql;
public async Task Initialize()
{
_initializeSql ??= $@"
CREATE TABLE IF NOT EXISTS {tablePrefix}_replicas (
id CHAR(32) PRIMARY KEY,
heartbeat INT
);";
await using var conn = await CreateConnection();
var command = new MySqlCommand(_initializeSql, conn);
await command.ExecuteNonQueryAsync();
}

private string? _insertSql;
public async Task Insert(Guid replicaId)
{
_insertSql ??= $@"
INSERT INTO {tablePrefix}_replicas
(id, heartbeat)
VALUES
(?, 0)";

await using var conn = await CreateConnection();
await using var command = new MySqlCommand(_insertSql, conn)
{
Parameters =
{
new() {Value = replicaId.ToString("N")}
}
};

await command.ExecuteNonQueryAsync();
}

private string? _deleteSql;
public async Task Delete(Guid replicaId)
{
_deleteSql ??= $"DELETE FROM {tablePrefix}_replicas WHERE id = ?";

await using var conn = await CreateConnection();
await using var command = new MySqlCommand(_deleteSql, conn)
{
Parameters =
{
new() {Value = replicaId.ToString("N")}
}
};

await command.ExecuteNonQueryAsync();
}

private string? _updateHeartbeatSql;
public async Task UpdateHeartbeat(Guid replicaId)
{
_updateHeartbeatSql ??= $@"
UPDATE {tablePrefix}_replicas
SET heartbeat = heartbeat + 1
WHERE id = ?";

await using var conn = await CreateConnection();
await using var command = new MySqlCommand(_updateHeartbeatSql, conn)
{
Parameters =
{
new() {Value = replicaId.ToString("N")}
}
};

await command.ExecuteNonQueryAsync();
}

private string? _getAllSql;
public async Task<IReadOnlyList<StoredReplica>> GetAll()
{
_getAllSql ??= $"SELECT id, heartbeat FROM {tablePrefix}_replicas";

await using var conn = await CreateConnection();
await using var command = new MySqlCommand(_getAllSql, conn);
var storedReplicas = new List<StoredReplica>();

await using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var id = Guid.Parse(reader.GetString(0));
var heartbeat = reader.GetInt32(1);
storedReplicas.Add(new StoredReplica(id, heartbeat));
}

return storedReplicas;
}

private string? _truncateSql;
public async Task Truncate()
{
_truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_replicas";

await using var conn = await CreateConnection();
await using var command = new MySqlCommand(_truncateSql, conn);

await command.ExecuteNonQueryAsync();
}

private async Task<MySqlConnection> CreateConnection()
{
var conn = new MySqlConnection(connectionString);
await conn.OpenAsync();
return conn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests;

[TestClass]
public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests
{
[TestMethod]
public override Task SunshineScenarioTest()
=> SunshineScenarioTest(FunctionStoreFactory.Create());
}
Loading