From 9e6fc6fe05d6a4988468121f16249063ed910ac6 Mon Sep 17 00:00:00 2001 From: Stidsborg Date: Thu, 5 Jun 2025 11:45:12 +0200 Subject: [PATCH 1/2] WIP --- .../LeaseUpdaterTestFunctionStore.cs | 1 + .../TestTemplates/ReplicaStoreTests.cs | 73 +++++++++++ .../WatchDogsTests/CrashableFunctionStore.cs | 1 + .../Storage/IFunctionStore.cs | 1 + .../Storage/IReplicaStore.cs | 14 +++ .../Storage/InMemoryFunctionStore.cs | 4 +- .../Storage/Types.cs | 4 +- .../Utils/CrashableFunctionStore.cs | 1 + .../ReplicaStoreTests.cs | 11 ++ .../MariaDbFunctionStore.cs | 6 + .../MariaDbReplicaStore.cs | 116 ++++++++++++++++++ .../PostgreSqlFunctionStore.cs | 1 + .../SqlServerFunctionStore.cs | 1 + 13 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/ReplicaStoreTests.cs create mode 100644 Core/Cleipnir.ResilientFunctions/Storage/IReplicaStore.cs create mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/ReplicaStoreTests.cs create mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbReplicaStore.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs index 4cdeebe6..e7c3125d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs @@ -25,6 +25,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore public Utilities Utilities => _inner.Utilities; public IMigrator Migrator => _inner.Migrator; public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore; + public IReplicaStore ReplicaStore => _inner.ReplicaStore; public Task Initialize() => _inner.Initialize(); public Task CreateFunction( diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/ReplicaStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/ReplicaStoreTests.cs new file mode 100644 index 00000000..49d62ffc --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/ReplicaStoreTests.cs @@ -0,0 +1,73 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Helpers; +using Cleipnir.ResilientFunctions.Storage; +using Cleipnir.ResilientFunctions.Tests.Utils; +using Shouldly; + +namespace Cleipnir.ResilientFunctions.Tests.TestTemplates; + +public abstract class ReplicaStoreTests +{ + public abstract Task SunshineScenarioTest(); + protected async Task SunshineScenarioTest(Task storeTask) + { + var store = await storeTask.SelectAsync(s => s.ReplicaStore); + await store.GetAll().ShouldBeEmptyAsync(); + var replicaId1 = Guid.NewGuid(); + var replicaId2 = Guid.NewGuid(); + + { + await store.Insert(replicaId1); + var all = await store.GetAll(); + all.Count.ShouldBe(1); + var stored = all.Single(); + stored.ReplicaId.ShouldBe(replicaId1); + stored.Heartbeat.ShouldBe(0); + } + + { + await store.Insert(replicaId2); + var all = await store.GetAll(); + all.Count.ShouldBe(2); + var stored = all.Single(id => id.ReplicaId == replicaId2); + stored.ReplicaId.ShouldBe(replicaId2); + stored.Heartbeat.ShouldBe(0); + } + + await store.UpdateHeartbeat(replicaId1); + { + var all = await store.GetAll(); + all.Count.ShouldBe(2); + var stored1 = all.Single(r => r.ReplicaId == replicaId1); + stored1.Heartbeat.ShouldBe(1); + var stored2 = all.Single(r => r.ReplicaId == replicaId2); + stored2.Heartbeat.ShouldBe(0); + } + + await store.UpdateHeartbeat(replicaId2); + { + var all = await store.GetAll(); + all.Count.ShouldBe(2); + var stored1 = all.Single(r => r.ReplicaId == replicaId1); + stored1.Heartbeat.ShouldBe(1); + var stored2 = all.Single(r => r.ReplicaId == replicaId2); + stored2.Heartbeat.ShouldBe(1); + } + + await store.Delete(replicaId1); + { + var all = await store.GetAll(); + all.Count.ShouldBe(1); + var stored2 = all.Single(r => r.ReplicaId == replicaId2); + stored2.Heartbeat.ShouldBe(1); + } + + await store.Delete(replicaId2); + { + var all = await store.GetAll(); + all.ShouldBeEmpty(); + } + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index 28a9164a..1947c7fc 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -25,6 +25,7 @@ public class CrashableFunctionStore : IFunctionStore public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities; public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator; public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore; + public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) { diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index 3024bfbf..0b0a91be 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -16,6 +16,7 @@ public interface IFunctionStore public Utilities Utilities { get; } public IMigrator Migrator { get; } public ISemaphoreStore SemaphoreStore { get; } + public IReplicaStore ReplicaStore { get; } public Task Initialize(); Task CreateFunction( diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IReplicaStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IReplicaStore.cs new file mode 100644 index 00000000..292839a4 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Storage/IReplicaStore.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Cleipnir.ResilientFunctions.Storage; + +public interface IReplicaStore +{ + public Task Initialize(); + public Task Insert(Guid replicaId); + public Task Delete(Guid replicaId); + public Task UpdateHeartbeat(Guid replicaId); + public Task> GetAll(); +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 74b4ec5e..9b49d27e 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -30,6 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public IMigrator Migrator { get; } = new InMemoryMigrator(); public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore(); + public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo public Task Initialize() => Task.CompletedTask; diff --git a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs index ba7b569d..8a3d1d34 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs @@ -137,4 +137,6 @@ public static class StoredEffectExtensions { public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId, CrudOperation operation) => new(storedId, effect.StoredEffectId, operation, effect); -} \ No newline at end of file +} + +public record StoredReplica(Guid ReplicaId, int Heartbeat); \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 22a43a06..c898c3a0 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -21,6 +21,7 @@ public class CrashableFunctionStore : IFunctionStore public Utilities Utilities => _inner.Utilities; public IMigrator Migrator => _inner.Migrator; public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore; + public IReplicaStore ReplicaStore => _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/ReplicaStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/ReplicaStoreTests.cs new file mode 100644 index 00000000..8d496f0f --- /dev/null +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/ReplicaStoreTests.cs @@ -0,0 +1,11 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Cleipnir.ResilientFunctions.MariaDb.Tests; + +[TestClass] +public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests +{ + [TestMethod] + public override Task SunshineScenarioTest() + => SunshineScenarioTest(FunctionStoreFactory.Create()); +} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index ae297bd5..3924434a 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -36,6 +36,9 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; + private readonly MariaDbReplicaStore _replicaStore; + public IReplicaStore ReplicaStore => _replicaStore; + public Utilities Utilities { get; } private readonly MariaDbUnderlyingRegister _mariaDbUnderlyingRegister; @@ -57,6 +60,7 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "") _mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix); _typeStore = new MariaDbTypeStore(connectionString, tablePrefix); _migrator = new MariaDbMigrator(connectionString, tablePrefix); + _replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix); Utilities = new Utilities(_mariaDbUnderlyingRegister); } @@ -75,6 +79,7 @@ public async Task Initialize() await _semaphoreStore.Initialize(); await TimeoutStore.Initialize(); await _typeStore.Initialize(); + await _replicaStore.Initialize(); await using var conn = await CreateOpenConnection(_connectionString); _initializeSql ??= $@" CREATE TABLE IF NOT EXISTS {_tablePrefix} ( @@ -108,6 +113,7 @@ public async Task TruncateTables() await _correlationStore.Truncate(); await _semaphoreStore.Truncate(); await _typeStore.Truncate(); + await _replicaStore.Truncate(); await using var conn = await CreateOpenConnection(_connectionString); _truncateTablesSql ??= $"TRUNCATE TABLE {_tablePrefix}"; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbReplicaStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbReplicaStore.cs new file mode 100644 index 00000000..baa649f1 --- /dev/null +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbReplicaStore.cs @@ -0,0 +1,116 @@ +using Cleipnir.ResilientFunctions.Storage; +using MySqlConnector; + +namespace Cleipnir.ResilientFunctions.MariaDb; + +public class MariaDbReplicaStore(string connectionString, string tablePrefix) : IReplicaStore +{ + private string? _initializeSql; + public async Task Initialize() + { + _initializeSql ??= $@" + CREATE TABLE IF NOT EXISTS {tablePrefix}_replicas ( + id CHAR(32) PRIMARY KEY, + heartbeat INT + );"; + await using var conn = await CreateConnection(); + var command = new MySqlCommand(_initializeSql, conn); + await command.ExecuteNonQueryAsync(); + } + + private string? _insertSql; + public async Task Insert(Guid replicaId) + { + _insertSql ??= $@" + INSERT INTO {tablePrefix}_replicas + (id, heartbeat) + VALUES + (?, 0)"; + + await using var conn = await CreateConnection(); + await using var command = new MySqlCommand(_insertSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _deleteSql; + public async Task Delete(Guid replicaId) + { + _deleteSql ??= $"DELETE FROM {tablePrefix}_replicas WHERE id = ?"; + + await using var conn = await CreateConnection(); + await using var command = new MySqlCommand(_deleteSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _updateHeartbeatSql; + public async Task UpdateHeartbeat(Guid replicaId) + { + _updateHeartbeatSql ??= $@" + UPDATE {tablePrefix}_replicas + SET heartbeat = heartbeat + 1 + WHERE id = ?"; + + await using var conn = await CreateConnection(); + await using var command = new MySqlCommand(_updateHeartbeatSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _getAllSql; + public async Task> GetAll() + { + _getAllSql ??= $"SELECT id, heartbeat FROM {tablePrefix}_replicas"; + + await using var conn = await CreateConnection(); + await using var command = new MySqlCommand(_getAllSql, conn); + var storedReplicas = new List(); + + await using var reader = await command.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + var id = Guid.Parse(reader.GetString(0)); + var heartbeat = reader.GetInt32(1); + storedReplicas.Add(new StoredReplica(id, heartbeat)); + } + + return storedReplicas; + } + + private string? _truncateSql; + public async Task Truncate() + { + _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_replicas"; + + await using var conn = await CreateConnection(); + await using var command = new MySqlCommand(_truncateSql, conn); + + await command.ExecuteNonQueryAsync(); + } + + private async Task CreateConnection() + { + var conn = new MySqlConnection(connectionString); + await conn.OpenAsync(); + return conn; + } +} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 8d9709ef..4fc0d7de 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -34,6 +34,7 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly PostgreSqlSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; + public IReplicaStore ReplicaStore => throw new NotImplementedException(); public Utilities Utilities { get; } public IMigrator Migrator => _migrator; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index 4b379323..74a9eddf 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -34,6 +34,7 @@ public class SqlServerFunctionStore : IFunctionStore public IMigrator Migrator => _migrator; private readonly SqlServerSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; + public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo private readonly SqlServerUnderlyingRegister _underlyingRegister; From 3d25d4dc358641bc6e9b04339b85cf0ef45974c8 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Thu, 5 Jun 2025 12:13:57 +0200 Subject: [PATCH 2/2] completed rest of stores --- .../InMemoryTests/ReplicaStoreTests.cs | 12 ++ .../Storage/InMemoryFunctionStore.cs | 2 +- .../Storage/InMemoryReplicaStore.cs | 51 ++++++++ .../ReplicaStoreTests.cs | 12 ++ .../PostgreSqlDbReplicaStore.cs | 119 ++++++++++++++++++ .../PostgreSqlFunctionStore.cs | 6 +- .../ReplicaStoreTests.cs | 12 ++ .../SqlServerFunctionStore.cs | 6 +- .../SqlServerReplicaStore.cs | 119 ++++++++++++++++++ 9 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/ReplicaStoreTests.cs create mode 100644 Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs create mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/ReplicaStoreTests.cs create mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlDbReplicaStore.cs create mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/ReplicaStoreTests.cs create mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerReplicaStore.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/ReplicaStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/ReplicaStoreTests.cs new file mode 100644 index 00000000..4e15176f --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/ReplicaStoreTests.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests; + +[TestClass] +public class ReplicaStoreTests : TestTemplates.ReplicaStoreTests +{ + [TestMethod] + public override Task SunshineScenarioTest() + => SunshineScenarioTest(FunctionStoreFactory.Create()); +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 9b49d27e..04f59372 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -31,7 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public IMigrator Migrator { get; } = new InMemoryMigrator(); public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore(); - public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo + public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore(); public Task Initialize() => Task.CompletedTask; diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs new file mode 100644 index 00000000..c4d22fe5 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Helpers; + +namespace Cleipnir.ResilientFunctions.Storage; + +public class InMemoryReplicaStore : IReplicaStore +{ + private readonly Dictionary _replicas = new(); + private readonly Lock _sync = new(); + + public Task Initialize() => Task.CompletedTask; + + public Task Insert(Guid replicaId) + { + lock (_sync) + _replicas.TryAdd(replicaId, 0); + + return Task.CompletedTask; + } + + public Task Delete(Guid replicaId) + { + lock (_sync) + _replicas.Remove(replicaId); + + return Task.CompletedTask; + } + + public Task UpdateHeartbeat(Guid replicaId) + { + lock (_sync) + if (_replicas.ContainsKey(replicaId)) + _replicas[replicaId]++; + + return Task.CompletedTask; + } + + public Task> GetAll() + { + lock (_sync) + return _replicas + .Select(kv => new StoredReplica(kv.Key, kv.Value)) + .ToList() + .CastTo>() + .ToTask(); + } +} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/ReplicaStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/ReplicaStoreTests.cs new file mode 100644 index 00000000..a19fd15f --- /dev/null +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/ReplicaStoreTests.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests; + +[TestClass] +public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests +{ + [TestMethod] + public override Task SunshineScenarioTest() + => SunshineScenarioTest(FunctionStoreFactory.Create()); +} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlDbReplicaStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlDbReplicaStore.cs new file mode 100644 index 00000000..b69f16e1 --- /dev/null +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlDbReplicaStore.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Storage; +using Npgsql; + +namespace Cleipnir.ResilientFunctions.PostgreSQL; + +public class PostgreSqlDbReplicaStore(string connectionString, string tablePrefix) : IReplicaStore +{ + private string? _initializeSql; + public async Task Initialize() + { + _initializeSql ??= $@" + CREATE TABLE IF NOT EXISTS {tablePrefix}_replicas ( + id CHAR(32) PRIMARY KEY, + heartbeat INT + );"; + await using var conn = await CreateConnection(); + var command = new NpgsqlCommand(_initializeSql, conn); + await command.ExecuteNonQueryAsync(); + } + + private string? _insertSql; + public async Task Insert(Guid replicaId) + { + _insertSql ??= $@" + INSERT INTO {tablePrefix}_replicas + (id, heartbeat) + VALUES + ($1, 0)"; + + await using var conn = await CreateConnection(); + await using var command = new NpgsqlCommand(_insertSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _deleteSql; + public async Task Delete(Guid replicaId) + { + _deleteSql ??= $"DELETE FROM {tablePrefix}_replicas WHERE id = $1"; + + await using var conn = await CreateConnection(); + await using var command = new NpgsqlCommand(_deleteSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _updateHeartbeatSql; + public async Task UpdateHeartbeat(Guid replicaId) + { + _updateHeartbeatSql ??= $@" + UPDATE {tablePrefix}_replicas + SET heartbeat = heartbeat + 1 + WHERE id = $1"; + + await using var conn = await CreateConnection(); + await using var command = new NpgsqlCommand(_updateHeartbeatSql, conn) + { + Parameters = + { + new() {Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _getAllSql; + public async Task> GetAll() + { + _getAllSql ??= $"SELECT id, heartbeat FROM {tablePrefix}_replicas"; + + await using var conn = await CreateConnection(); + await using var command = new NpgsqlCommand(_getAllSql, conn); + var storedReplicas = new List(); + + await using var reader = await command.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + var id = Guid.Parse(reader.GetString(0)); + var heartbeat = reader.GetInt32(1); + storedReplicas.Add(new StoredReplica(id, heartbeat)); + } + + return storedReplicas; + } + + private string? _truncateSql; + public async Task Truncate() + { + _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_replicas"; + + await using var conn = await CreateConnection(); + await using var command = new NpgsqlCommand(_truncateSql, conn); + + await command.ExecuteNonQueryAsync(); + } + + private async Task CreateConnection() + { + var conn = new NpgsqlConnection(connectionString); + await conn.OpenAsync(); + return conn; + } +} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 4fc0d7de..b89062bd 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -34,7 +34,8 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly PostgreSqlSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; - public IReplicaStore ReplicaStore => throw new NotImplementedException(); + private readonly PostgreSqlDbReplicaStore _replicaStore; + public IReplicaStore ReplicaStore => _replicaStore; public Utilities Utilities { get; } public IMigrator Migrator => _migrator; @@ -57,6 +58,7 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); _postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName); _migrator = new PostgreSqlMigrator(connectionString, _tableName); + _replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName); Utilities = new Utilities(_postgresSqlUnderlyingRegister); } @@ -81,6 +83,7 @@ public async Task Initialize() await _correlationStore.Initialize(); await _semaphoreStore.Initialize(); await _typeStore.Initialize(); + await _replicaStore.Initialize(); await using var conn = await CreateConnection(); _initializeSql ??= $@" CREATE TABLE IF NOT EXISTS {_tableName} ( @@ -122,6 +125,7 @@ public async Task TruncateTables() await _correlationStore.Truncate(); await _typeStore.Truncate(); await _semaphoreStore.Truncate(); + await _replicaStore.Truncate(); await using var conn = await CreateConnection(); _truncateTableSql ??= $"TRUNCATE TABLE {_tableName}"; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/ReplicaStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/ReplicaStoreTests.cs new file mode 100644 index 00000000..a0717231 --- /dev/null +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/ReplicaStoreTests.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Cleipnir.ResilientFunctions.SqlServer.Tests; + +[TestClass] +public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests +{ + [TestMethod] + public override Task SunshineScenarioTest() + => SunshineScenarioTest(FunctionStoreFactory.Create()); +} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index 74a9eddf..764968ac 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -34,7 +34,8 @@ public class SqlServerFunctionStore : IFunctionStore public IMigrator Migrator => _migrator; private readonly SqlServerSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; - public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo + private readonly SqlServerReplicaStore _replicaStore; + public IReplicaStore ReplicaStore => _replicaStore; private readonly SqlServerUnderlyingRegister _underlyingRegister; @@ -54,6 +55,7 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _semaphoreStore = new SqlServerSemaphoreStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _migrator = new SqlServerMigrator(connectionString, _tableName); + _replicaStore = new SqlServerReplicaStore(connectionString, _tableName); Utilities = new Utilities(_underlyingRegister); } @@ -81,6 +83,7 @@ public async Task Initialize() await _correlationStore.Initialize(); await _typeStore.Initialize(); await _semaphoreStore.Initialize(); + await _replicaStore.Initialize(); await using var conn = await _connFunc(); _initializeSql ??= @$" CREATE TABLE {_tableName} ( @@ -127,6 +130,7 @@ public async Task TruncateTables() await _correlationStore.Truncate(); await _typeStore.Truncate(); await _semaphoreStore.Truncate(); + await _replicaStore.Truncate(); await using var conn = await _connFunc(); _truncateSql ??= $"TRUNCATE TABLE {_tableName}"; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerReplicaStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerReplicaStore.cs new file mode 100644 index 00000000..06d06414 --- /dev/null +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerReplicaStore.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Storage; +using Microsoft.Data.SqlClient; + +namespace Cleipnir.ResilientFunctions.SqlServer; + +public class SqlServerReplicaStore(string connectionString, string tablePrefix) : IReplicaStore +{ + private string? _initializeSql; + public async Task Initialize() + { + _initializeSql ??= $@" + CREATE TABLE {tablePrefix}Replicas ( + Id CHAR(32) PRIMARY KEY, + Heartbeat INT + );"; + await using var conn = await CreateConnection(); + var command = new SqlCommand(_initializeSql, conn); + await command.ExecuteNonQueryAsync(); + } + + private string? _insertSql; + public async Task Insert(Guid replicaId) + { + _insertSql ??= $@" + INSERT INTO {tablePrefix}Replicas + (Id, Heartbeat) + VALUES + (@Id, 0)"; + + await using var conn = await CreateConnection(); + await using var command = new SqlCommand(_insertSql, conn) + { + Parameters = + { + new() {ParameterName = "Id", Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _deleteSql; + public async Task Delete(Guid replicaId) + { + _deleteSql ??= $"DELETE FROM {tablePrefix}Replicas WHERE Id = @Id"; + + await using var conn = await CreateConnection(); + await using var command = new SqlCommand(_deleteSql, conn) + { + Parameters = + { + new() {ParameterName = "Id", Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _updateHeartbeatSql; + public async Task UpdateHeartbeat(Guid replicaId) + { + _updateHeartbeatSql ??= $@" + UPDATE {tablePrefix}Replicas + SET heartbeat = heartbeat + 1 + WHERE id = @Id"; + + await using var conn = await CreateConnection(); + await using var command = new SqlCommand(_updateHeartbeatSql, conn) + { + Parameters = + { + new() {ParameterName = "Id", Value = replicaId.ToString("N")} + } + }; + + await command.ExecuteNonQueryAsync(); + } + + private string? _getAllSql; + public async Task> GetAll() + { + _getAllSql ??= $"SELECT Id, Heartbeat FROM {tablePrefix}Replicas"; + + await using var conn = await CreateConnection(); + await using var command = new SqlCommand(_getAllSql, conn); + var storedReplicas = new List(); + + await using var reader = await command.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + var id = Guid.Parse(reader.GetString(0)); + var heartbeat = reader.GetInt32(1); + storedReplicas.Add(new StoredReplica(id, heartbeat)); + } + + return storedReplicas; + } + + private string? _truncateSql; + public async Task Truncate() + { + _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}Replicas"; + + await using var conn = await CreateConnection(); + await using var command = new SqlCommand(_truncateSql, conn); + + await command.ExecuteNonQueryAsync(); + } + + private async Task CreateConnection() + { + var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); + return conn; + } +} \ No newline at end of file