From 9a1f70e1021e77b9d0cf215fb4d9147158ca8d82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Thu, 19 Jun 2025 17:03:10 +0100 Subject: [PATCH 1/7] core implementation --- .../Polling/CompleteProduceMessagesRetrier.cs | 57 +++++++++++++ src/Core/Polling/ICompleteRetrier.cs | 15 ++++ src/Core/Polling/PollingBackgroundService.cs | 51 +++++++----- .../{Producer.cs => PollingProducer.cs} | 29 +++++-- src/Core/Retrier.cs | 80 ++++++++++++++++++ src/Core/ServiceCollectionExtensions.cs | 19 ++++- src/Directory.Build.props | 2 +- .../Polling/PollingBackgroundServiceTests.cs | 62 ++++++++++++-- .../Polling/PollingProducerTests.cs | 60 ++++++++++---- tests/Core.Tests/RetrierTests.cs | 82 +++++++++++++++++++ 10 files changed, 410 insertions(+), 47 deletions(-) create mode 100644 src/Core/Polling/CompleteProduceMessagesRetrier.cs create mode 100644 src/Core/Polling/ICompleteRetrier.cs rename src/Core/Polling/{Producer.cs => PollingProducer.cs} (57%) create mode 100644 src/Core/Retrier.cs create mode 100644 tests/Core.Tests/RetrierTests.cs diff --git a/src/Core/Polling/CompleteProduceMessagesRetrier.cs b/src/Core/Polling/CompleteProduceMessagesRetrier.cs new file mode 100644 index 0000000..4a8469e --- /dev/null +++ b/src/Core/Polling/CompleteProduceMessagesRetrier.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.Logging; + +namespace YakShaveFx.OutboxKit.Core.Polling; + +internal interface ICollectProducedMessagesToRetryCompletion +{ + void Collect(IReadOnlyCollection messages); +} + +internal interface IRetryCompletionOfProducedMessages +{ + ValueTask RetryCompleteAsync(CancellationToken ct); +} + +// not thread safe, as it is only used in the context of a producing flow, which has no concurrency +internal sealed class CompleteProduceMessagesRetrier( + ICompleteRetrier completeRetrier, + RetrierBuilderFactory retrierBuilderFactory, + ILogger logger) + : ICollectProducedMessagesToRetryCompletion, IRetryCompletionOfProducedMessages +{ + private readonly Retrier _retrier = retrierBuilderFactory.Create() + .WithMaxRetries(int.MaxValue) + .WithShouldRetryDecider(ex => + { + // retry on all exceptions except cancellation + if (ex is OperationCanceledException oce) return oce.CancellationToken == CancellationToken.None; + return true; + }) + .Build(); + + private List _messages = new(); + + public void Collect(IReadOnlyCollection messages) => _messages.AddRange(messages); + + public ValueTask RetryCompleteAsync(CancellationToken ct) + => _messages.Count == 0 + ? ValueTask.CompletedTask + : new(InnerRetryCompleteAsync(ct)); + + private async Task InnerRetryCompleteAsync(CancellationToken ct) + { + try + { + await _retrier.ExecuteWithRetryAsync( + () => completeRetrier.RetryCompleteAsync(_messages, ct), + ct); + + // clear messages by creating a new list, so the old one can be garbage collected + _messages = new(); + } + catch (Exception ex) + { + logger.LogError(ex, "Error retrying completion of previously produced messages"); + } + } +} \ No newline at end of file diff --git a/src/Core/Polling/ICompleteRetrier.cs b/src/Core/Polling/ICompleteRetrier.cs new file mode 100644 index 0000000..e824f9d --- /dev/null +++ b/src/Core/Polling/ICompleteRetrier.cs @@ -0,0 +1,15 @@ +namespace YakShaveFx.OutboxKit.Core.Polling; + +/// +/// Interface to be implemented by library users, to make it possible to retry completing messages already produced. +/// +public interface ICompleteRetrier +{ + /// + /// Completes the given collection of messages. + /// + /// The messages that were previously successfully produced. + /// The async cancellation token. + /// The task representing the asynchronous operation + Task RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct); +} \ No newline at end of file diff --git a/src/Core/Polling/PollingBackgroundService.cs b/src/Core/Polling/PollingBackgroundService.cs index c56b537..b35ef44 100644 --- a/src/Core/Polling/PollingBackgroundService.cs +++ b/src/Core/Polling/PollingBackgroundService.cs @@ -9,6 +9,7 @@ internal sealed partial class PollingBackgroundService( IPollingProducer producer, TimeProvider timeProvider, CorePollingSettings settings, + IRetryCompletionOfProducedMessages completeRetrier, ILogger logger) : BackgroundService { private readonly TimeSpan _pollingInterval = settings.PollingInterval; @@ -23,6 +24,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { + await completeRetrier.RetryCompleteAsync(stoppingToken); + try { await producer.ProducePendingAsync(stoppingToken); @@ -38,26 +41,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) LogUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex); } - // to avoid letting the delays running in the background, wasting resources - // we create a linked token, to cancel them - using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); - - var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token); - var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token); - - // wait for whatever occurs first: - // - being notified of new messages added to the outbox - // - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted - // something but didn't produce it, or some error occurred when producing and there are pending messages - await Task.WhenAny(listenerTask, delayTask); - - LogWakeUp( - logger, - key.ProviderKey, - key.ClientKey, - listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed"); - - await linkedTokenSource.CancelAsync(); + await WaitBeforeNextIteration(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { @@ -68,6 +52,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) LogStopping(logger, key.ProviderKey, key.ClientKey); } + private async Task WaitBeforeNextIteration(CancellationToken ct) + { + // no need to even try to wait if the service is stopping + if (ct.IsCancellationRequested) return; + + // to avoid letting the delays running in the background, wasting resources + // we create a linked token, to cancel them + using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct); + + var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token); + var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token); + + // wait for whatever occurs first: + // - being notified of new messages added to the outbox + // - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted + // something but didn't produce it, or some error occurred when producing and there are pending messages + await Task.WhenAny(listenerTask, delayTask); + + LogWakeUp( + logger, + key.ProviderKey, + key.ClientKey, + listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed"); + + await linkedTokenSource.CancelAsync(); + } + [LoggerMessage(LogLevel.Debug, Message = "Starting outbox polling background service for provider key \"{providerKey}\" and client key \"{clientKey}\", with polling interval {pollingInterval}")] diff --git a/src/Core/Polling/Producer.cs b/src/Core/Polling/PollingProducer.cs similarity index 57% rename from src/Core/Polling/Producer.cs rename to src/Core/Polling/PollingProducer.cs index eb49afb..a7fc069 100644 --- a/src/Core/Polling/Producer.cs +++ b/src/Core/Polling/PollingProducer.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging; using YakShaveFx.OutboxKit.Core.OpenTelemetry; namespace YakShaveFx.OutboxKit.Core.Polling; @@ -8,11 +9,13 @@ internal interface IPollingProducer Task ProducePendingAsync(CancellationToken ct); } -internal sealed class PollingProducer( +internal sealed partial class PollingProducer( OutboxKey key, IBatchFetcher fetcher, IBatchProducer producer, - ProducerMetrics metrics) : IPollingProducer + ICollectProducedMessagesToRetryCompletion completeRetryCollector, + ProducerMetrics metrics, + ILogger logger) : IPollingProducer { public async Task ProducePendingAsync(CancellationToken ct) { @@ -41,10 +44,26 @@ private async Task ProduceBatchAsync(CancellationToken ct) metrics.BatchProduced(key, messages.Count == result.Ok.Count); metrics.MessagesProduced(key, result.Ok.Count); - // messages already produced, try to ack them - // not passing the actual cancellation token to try to complete the batch even if the application is shutting down - await batchContext.CompleteAsync(result.Ok, CancellationToken.None); + try + { + // messages already produced, try to ack them + // not passing the actual cancellation token to try to complete the batch even if the application is shutting down + await batchContext.CompleteAsync(result.Ok, CancellationToken.None); + } + catch (Exception ex) + { + LogCompletionUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex); + completeRetryCollector.Collect(result.Ok); + + // return false to break the loop, as we don't want to produce more messages until we're able to complete the batch + return false; + } return await batchContext.HasNextAsync(ct); } + + [LoggerMessage(LogLevel.Error, + Message = + "Unexpected error while completing produced outbox messages for provider key \"{providerKey}\" and client key \"{clientKey}\"")] + private static partial void LogCompletionUnexpectedError(ILogger logger, string providerKey, string clientKey, Exception ex); } \ No newline at end of file diff --git a/src/Core/Retrier.cs b/src/Core/Retrier.cs new file mode 100644 index 0000000..ec4b043 --- /dev/null +++ b/src/Core/Retrier.cs @@ -0,0 +1,80 @@ +namespace YakShaveFx.OutboxKit.Core; + +// very basic implementation of retry logic, to avoid taking in external dependencies, otherwise would just use Polly + +// would love to have a less hideous name, but can't think of one right now +internal sealed class RetrierBuilderFactory(TimeProvider timeProvider) +{ + public RetrierBuilder Create() => new(timeProvider); +} + +internal sealed class RetrierBuilder(TimeProvider timeProvider) +{ + private int _maxRetries = int.MaxValue; + + private Func _delayCalculator + = retries => retries switch + { + 1 => TimeSpan.FromSeconds(1), + 2 => TimeSpan.FromSeconds(5), + 3 => TimeSpan.FromSeconds(30), + 4 => TimeSpan.FromMinutes(1), + _ => TimeSpan.FromMinutes(5) + }; + + private Func _shouldRetryDecider = _ => true; + + public RetrierBuilder WithMaxRetries(int maxRetries) + { + ArgumentOutOfRangeException.ThrowIfLessThan(maxRetries, 1); + _maxRetries = maxRetries; + return this; + } + + public RetrierBuilder WithDelayCalculator(Func delayCalculator) + { + ArgumentNullException.ThrowIfNull(delayCalculator); + _delayCalculator = delayCalculator; + return this; + } + + public RetrierBuilder WithShouldRetryDecider(Func shouldRetry) + { + ArgumentNullException.ThrowIfNull(shouldRetry); + _shouldRetryDecider = shouldRetry; + return this; + } + + public Retrier Build() => new(_maxRetries, _delayCalculator, _shouldRetryDecider, timeProvider); +} + +internal sealed class Retrier( + int maxRetries, + Func delayCalculator, + Func shouldRetryDecider, + TimeProvider timeProvider) +{ + public async Task ExecuteWithRetryAsync(Func action, TArgs args, CancellationToken ct) + { + var retries = 0; + while (!ct.IsCancellationRequested) + { + try + { + await action(args); + return; + } + catch (Exception ex) when (shouldRetryDecider(ex)) + { + if (retries >= maxRetries) throw; + + ++retries; + + await Task.Delay(delayCalculator(retries), timeProvider, ct); + } + } + } + + public Task ExecuteWithRetryAsync(Func action, CancellationToken ct) + => ExecuteWithRetryAsync(static action => action(), action, ct); +} \ No newline at end of file diff --git a/src/Core/ServiceCollectionExtensions.cs b/src/Core/ServiceCollectionExtensions.cs index 8959c0c..aab502a 100644 --- a/src/Core/ServiceCollectionExtensions.cs +++ b/src/Core/ServiceCollectionExtensions.cs @@ -39,6 +39,7 @@ public static IServiceCollection AddOutboxKit( private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitConfigurator configurator) { services.AddSingleton(); + services.AddSingleton(); if (configurator.PollingConfigurators.Count == 1) { @@ -74,13 +75,29 @@ private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitCo s.GetRequiredKeyedService(key), s.GetRequiredService(), corePollingSettings, + s.GetRequiredKeyedService(key), s.GetRequiredService>())); + services.AddKeyedSingleton( + key, + (s, _) => new CompleteProduceMessagesRetrier( + s.GetRequiredService(), + s.GetRequiredService(), + s.GetRequiredService>())); + + services.AddKeyedSingleton(key, + (s, _) => s.GetRequiredKeyedService(key)); + + services.AddKeyedSingleton(key, + (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, (s, _) => new PollingProducer( key, s.GetRequiredKeyedService(key), s.GetRequiredService(), - s.GetRequiredService())); + s.GetRequiredKeyedService(key), + s.GetRequiredService(), + s.GetRequiredService>())); if (corePollingSettings.EnableCleanUp) { diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 39cf3c9..af58e7a 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -53,7 +53,7 @@ - 0.2 + 0.3 v nightly diff --git a/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs b/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs index 35d1ed2..2bad20a 100644 --- a/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs +++ b/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs @@ -13,13 +13,15 @@ public class PollingBackgroundServiceTests private readonly Listener _listener = new(); private readonly FakeTimeProvider _timeProvider = new(); private readonly CorePollingSettings _settings = new(); + private readonly IRetryCompletionOfProducedMessages _completeRetrierStub = new CompleteRetrierStub(); private readonly CancellationToken _ct = TestContext.Current.CancellationToken; [Fact] public async Task WhenServiceStartsTheProducerIsInvoked() { var producerSpy = Substitute.For(); - var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, + _completeRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -31,7 +33,8 @@ public async Task WhenServiceStartsTheProducerIsInvoked() public async Task UntilPollingIntervalIsReachedTheProducerIsNotInvokedAgain() { var producerSpy = Substitute.For(); - var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, + _completeRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -47,7 +50,8 @@ public async Task UntilPollingIntervalIsReachedTheProducerIsNotInvokedAgain() public async Task WhenPollingIntervalIsReachedThenTheProducerIsInvokedAgain() { var producerSpy = Substitute.For(); - var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, + _completeRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -64,7 +68,8 @@ public async Task WhenPollingIntervalIsReachedThenTheProducerIsInvokedAgain() public async Task WhenListenerIsTriggeredThenTheProducerIsInvokedAgain() { var producerSpy = Substitute.For(); - var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, + _completeRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -80,7 +85,8 @@ public async Task WhenListenerIsTriggeredThenTheProducerIsInvokedAgain() public async Task WhenCancellationTokenIsSignaledThenTheServiceStops() { var producerStub = Substitute.For(); - var sut = new PollingBackgroundService(Key, _listener, producerStub, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerStub, _timeProvider, _settings, + _completeRetrierStub, Logger); var cts = new CancellationTokenSource(); @@ -101,11 +107,55 @@ public async Task WhenTheProducerThrowsTheServiceRemainsRunning() .When(x => x.ProducePendingAsync(Arg.Any())) .Throw(new InvalidOperationException("test")); - var sut = new PollingBackgroundService(Key, _listener, producerMock, _timeProvider, _settings, Logger); + var sut = new PollingBackgroundService(Key, _listener, producerMock, _timeProvider, _settings, + _completeRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); // give it a bit to run and block sut.ExecuteTask.Should().BeEquivalentTo(new { IsCompleted = false }); } + + [Fact] + public async Task WhenThereAreMessagesToRetryCompletingThenTheRetrierIsInvoked() + { + var producerStub = Substitute.For(); + var retrierSpy = Substitute.For(); + var sut = new PollingBackgroundService(Key, _listener, producerStub, _timeProvider, _settings, + retrierSpy, Logger); + + await sut.StartAsync(CancellationToken.None); + await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run + + await retrierSpy.Received(1).RetryCompleteAsync(Arg.Any()); + } + + [Fact] + public async Task WhenThereAreMessagesToRetryCompletingThenTheProducerIsNotInvokedUntilTheyAreCompleted() + { + var retryCompletionSource = new TaskCompletionSource(); + var producerSpy = Substitute.For(); + var retrierMock = Substitute.For(); +#pragma warning disable CA2012 - mocking, not actually calling the method + retrierMock.RetryCompleteAsync(Arg.Any()).Returns(new ValueTask(retryCompletionSource.Task)); +#pragma warning restore CA2012 + + var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, + retrierMock, Logger); + + await sut.StartAsync(CancellationToken.None); + await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run + + await retrierMock.Received(1).RetryCompleteAsync(Arg.Any()); + await producerSpy.DidNotReceive().ProducePendingAsync(Arg.Any()); + + retryCompletionSource.SetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run + await producerSpy.Received(1).ProducePendingAsync(Arg.Any()); + } +} + +file sealed class CompleteRetrierStub : IRetryCompletionOfProducedMessages +{ + public ValueTask RetryCompleteAsync(CancellationToken ct) => ValueTask.CompletedTask; } \ No newline at end of file diff --git a/tests/Core.Tests/Polling/PollingProducerTests.cs b/tests/Core.Tests/Polling/PollingProducerTests.cs index 11a60c8..cdc180e 100644 --- a/tests/Core.Tests/Polling/PollingProducerTests.cs +++ b/tests/Core.Tests/Polling/PollingProducerTests.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using YakShaveFx.OutboxKit.Core.OpenTelemetry; using YakShaveFx.OutboxKit.Core.Polling; @@ -9,13 +10,17 @@ public class PollingProducerTests { private static readonly OutboxKey Key = new("sample-provider", "some-key"); private static readonly ProducerMetrics Metrics = new(CreateMeterFactoryStub()); + private static readonly NullLogger Logger = NullLogger.Instance; + + private static readonly ICollectProducedMessagesToRetryCompletion CompleteRetrierStub = + new CompleteRetryCollectorStub(); [Fact] public async Task WhenBatchIsEmptyThenProducerIsNotInvoked() { var producerSpy = CreateProducer(); var fetcherStub = new BatchFetcherStub([new BatchContextStub([], false)]); - var sut = new PollingProducer(Key, fetcherStub, producerSpy, Metrics); + var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetrierStub, Metrics, Logger); await sut.ProducePendingAsync(CancellationToken.None); @@ -32,7 +37,7 @@ public async Task WhileThereAreAvailableBatchesProducerIsInvoked(int numberOfBat { var producerSpy = CreateProducer(); var fetcherStub = new BatchFetcherStub(CreateBatchContexts(numberOfBatches)); - var sut = new PollingProducer(Key, fetcherStub, producerSpy, Metrics); + var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetrierStub, Metrics, Logger); await sut.ProducePendingAsync(CancellationToken.None); @@ -41,10 +46,27 @@ await producerSpy .ProduceAsync(Arg.Any(), Arg.Any>(), Arg.Any()); } - private static BatchContextStub[] CreateBatchContexts(int numberOfBatches) - => Enumerable.Range(0, numberOfBatches) - .Select(i => new BatchContextStub([new MessageStub()], i + 1 < numberOfBatches)) - .ToArray(); + [Fact] + public async Task WhenCompletingBatchThenMessagesAreCollectedForRetry() + { + var producerSpy = CreateProducer(); + var fetcherStub = new BatchFetcherStub([new BatchContextStub([new MessageStub()], false, true)]); + var collectorSpy = Substitute.For(); + var sut = new PollingProducer(Key, fetcherStub, producerSpy, collectorSpy, Metrics, Logger); + + await sut.ProducePendingAsync(CancellationToken.None); + + collectorSpy + .Received(1) + .Collect(Arg.Any>()); + } + + private static IBatchContext[] CreateBatchContexts(int numberOfBatches) + => + [ + .. Enumerable.Range(0, numberOfBatches) + .Select(i => new BatchContextStub([new MessageStub()], i + 1 < numberOfBatches)) + ]; private static IBatchProducer CreateProducer() { @@ -57,24 +79,34 @@ private static IBatchProducer CreateProducer() } } -public sealed class MessageStub : IMessage; +file sealed class MessageStub : IMessage; -public sealed class BatchFetcherStub(BatchContextStub[] contexts) : IBatchFetcher +file sealed class BatchFetcherStub(IBatchContext[] contexts) : IBatchFetcher { private int _index = 0; public Task FetchAndHoldAsync(CancellationToken ct) - { - return _index >= contexts.Length + => _index >= contexts.Length ? Task.FromResult(EmptyBatchContext.Instance) - : Task.FromResult(contexts[_index++]); - } + : Task.FromResult(contexts[_index++]); } -public sealed class BatchContextStub(IReadOnlyCollection messages, bool hasNext) : IBatchContext +file sealed class BatchContextStub(IReadOnlyCollection messages, bool hasNext, bool throwOnComplete = false) + : IBatchContext { public IReadOnlyCollection Messages => messages; - public Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) => Task.CompletedTask; + + public Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) => !throwOnComplete + ? Task.CompletedTask + : throw new Exception("Simulated failure on complete"); + public Task HasNextAsync(CancellationToken ct) => Task.FromResult(hasNext); public ValueTask DisposeAsync() => ValueTask.CompletedTask; +} + +file sealed class CompleteRetryCollectorStub : ICollectProducedMessagesToRetryCompletion +{ + public void Collect(IReadOnlyCollection messages) + { + } } \ No newline at end of file diff --git a/tests/Core.Tests/RetrierTests.cs b/tests/Core.Tests/RetrierTests.cs new file mode 100644 index 0000000..21ea3d9 --- /dev/null +++ b/tests/Core.Tests/RetrierTests.cs @@ -0,0 +1,82 @@ +using FluentAssertions; + +namespace YakShaveFx.OutboxKit.Core.Tests; + +public class RetrierTests +{ + private readonly CancellationToken _ct = TestContext.Current.CancellationToken; + + [Fact] + public async Task WhenRetryingForeverThenOnlyReturnsOnSuccess() + { + var sut = new RetrierBuilderFactory(TimeProvider.System) + .Create() + .WithMaxRetries(int.MaxValue) + .WithDelayCalculator(_ => TimeSpan.FromMilliseconds(10)) + .WithShouldRetryDecider(_ => true) + .Build(); + + const int maxAttempts = 10; + var attempts = 0; + var retryTask = sut.ExecuteWithRetryAsync(() => + { + attempts++; + if (attempts < maxAttempts) throw new Exception("Failed attempt"); + return Task.CompletedTask; + }, + _ct); + + await Task.Delay(TimeSpan.FromMilliseconds(maxAttempts * 10), _ct); // wait for all attempts to be made + + await retryTask; // should complete successfully + + attempts.Should().Be(maxAttempts); + } + + [Fact] + public async Task WhenRetryingWithLimitedAttemptsThenThrowsWhenRetriesExhausted() + { + const int maxAttempts = 3; + var sut = new RetrierBuilderFactory(TimeProvider.System) + .Create() + .WithMaxRetries(maxAttempts - 1) // -1 because the first attempt is not considered a retry + .WithDelayCalculator(_ => TimeSpan.FromMilliseconds(10)) + .WithShouldRetryDecider(_ => true) + .Build(); + + + var attempts = 0; + var act = () => sut.ExecuteWithRetryAsync(() => + { + attempts++; + throw new Exception("Failed attempt"); + }, + _ct); + + await act.Should().ThrowAsync().WithMessage("Failed attempt"); + attempts.Should().Be(maxAttempts); + } + + [Fact] + public async Task WhenExceptionIsNotRetryableThenThrowsImmediately() + { + var sut = new RetrierBuilderFactory(TimeProvider.System) + .Create() + .WithMaxRetries(3) + .WithDelayCalculator(_ => TimeSpan.FromMilliseconds(10)) + .WithShouldRetryDecider(_ => false) + .Build(); + + var attempts = 0; + var act = () => sut.ExecuteWithRetryAsync( + () => + { + attempts++; + throw new Exception("Non-retryable exception"); + }, + _ct); + + await act.Should().ThrowAsync().WithMessage("Non-retryable exception"); + attempts.Should().Be(1); // should not retry at all + } +} \ No newline at end of file From df234306bf32659665575aa38231ec3bfa58f489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Thu, 19 Jun 2025 17:34:28 +0100 Subject: [PATCH 2/7] MongoDB implementation of ICompleteRetrier --- src/Core/ServiceCollectionExtensions.cs | 2 +- .../Polling/ConfigurationImplementation.cs | 13 +++ src/MongoDb/Polling/OutboxBatchCompleter.cs | 102 ++++++++++++++++++ src/MongoDb/Polling/OutboxBatchFetcher.cs | 71 ++---------- .../Polling/BatchFetcherTests.cs | 14 ++- 5 files changed, 135 insertions(+), 67 deletions(-) create mode 100644 src/MongoDb/Polling/OutboxBatchCompleter.cs diff --git a/src/Core/ServiceCollectionExtensions.cs b/src/Core/ServiceCollectionExtensions.cs index aab502a..ed26455 100644 --- a/src/Core/ServiceCollectionExtensions.cs +++ b/src/Core/ServiceCollectionExtensions.cs @@ -81,7 +81,7 @@ private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitCo services.AddKeyedSingleton( key, (s, _) => new CompleteProduceMessagesRetrier( - s.GetRequiredService(), + s.GetRequiredKeyedService(key), s.GetRequiredService(), s.GetRequiredService>())); diff --git a/src/MongoDb/Polling/ConfigurationImplementation.cs b/src/MongoDb/Polling/ConfigurationImplementation.cs index dec6b3a..7bf07d7 100644 --- a/src/MongoDb/Polling/ConfigurationImplementation.cs +++ b/src/MongoDb/Polling/ConfigurationImplementation.cs @@ -211,6 +211,19 @@ public void ConfigureCollection( s.GetRequiredKeyedService>(key)(key, s))); } + services.AddKeyedSingleton( + key, + (s, _) => ActivatorUtilities.CreateInstance>( + s, + key, + pollingSettings, + collectionSettings, + s.GetRequiredKeyedService>(key)(key, s))); + + services.AddKeyedSingleton( + key, + (s, _) => s.GetRequiredKeyedService>(key)); + services.AddKeyedSingleton( key, (s, _) => ActivatorUtilities.CreateInstance>( diff --git a/src/MongoDb/Polling/OutboxBatchCompleter.cs b/src/MongoDb/Polling/OutboxBatchCompleter.cs new file mode 100644 index 0000000..3501ff0 --- /dev/null +++ b/src/MongoDb/Polling/OutboxBatchCompleter.cs @@ -0,0 +1,102 @@ +using System.Linq.Expressions; +using MongoDB.Driver; +using YakShaveFx.OutboxKit.Core; +using YakShaveFx.OutboxKit.Core.Polling; + +namespace YakShaveFx.OutboxKit.MongoDb.Polling; + +internal sealed class OutboxBatchCompleter : ICompleteRetrier where TMessage : IMessage +{ + private readonly IMongoCollection _collection; + private readonly TimeProvider _timeProvider; + + private readonly Expression> _idSelector; + private readonly Func _idGetter; + private readonly Expression>? _processedAtSelector; + private readonly Func, CancellationToken, Task> _complete; + + public OutboxBatchCompleter( + OutboxKey key, + MongoDbPollingSettings pollingSettings, + MongoDbPollingCollectionSettings collectionSettings, + IMongoDatabase db, + TimeProvider timeProvider) + { + _timeProvider = timeProvider; + _idSelector = collectionSettings.IdSelector; + var compiledIdSelector = collectionSettings.IdSelector.Compile(); + _idGetter = m => compiledIdSelector((TMessage)m); + _processedAtSelector = (pollingSettings.CompletionMode, collectionSettings.ProcessedAtSelector) switch + { + (CompletionMode.Delete, _) => null, + (CompletionMode.Update, not null) => collectionSettings.ProcessedAtSelector, + _ => throw new ArgumentException("processed at selector not configured", nameof(collectionSettings)) + }; + _collection = db.GetCollection(collectionSettings.Name); + _complete = pollingSettings.CompletionMode switch + { + CompletionMode.Delete => CompleteDeleteAsync, + CompletionMode.Update => CompleteUpdateAsync, + _ => throw new ArgumentException("invalid completion mode", nameof(pollingSettings)) + }; + } + + public Task CompleteAsync(IReadOnlyCollection messages, CancellationToken ct) => _complete(messages, ct); + + Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + => _complete(messages, ct); + + private async Task CompleteDeleteAsync(IReadOnlyCollection messages, CancellationToken ct) + { + if (messages.Count <= 0) return; + + var result = await _collection.DeleteManyAsync( + Builders.Filter.In(_idSelector, messages.Select(_idGetter)), + new DeleteOptions(), + ct); + + // other than something unexpected, this could happen if a concurrent process has already completed the messages + if (result.DeletedCount != messages.Count) + { + var leftCount = await _collection.CountDocumentsAsync( + Builders.Filter.In(_idSelector, messages.Select(_idGetter)), + new CountOptions(), + ct); + + if (leftCount > 0) + { + throw new InvalidOperationException( + "Failed to delete all messages, some messages are still present in the collection"); + } + } + } + + private async Task CompleteUpdateAsync(IReadOnlyCollection messages, CancellationToken ct) + { + if (messages.Count <= 0) return; + + var now = _timeProvider.GetUtcNow().DateTime; + var result = await _collection.UpdateManyAsync( + Builders.Filter.In(_idSelector, messages.Select(_idGetter)), + Builders.Update.Set(_processedAtSelector, now), + new UpdateOptions(), + ct); + + // other than something unexpected, this could happen if a concurrent process has already completed the messages + if (result.ModifiedCount != messages.Count) + { + var leftCount = await _collection.CountDocumentsAsync( + Builders.Filter.And( + Builders.Filter.In(_idSelector, messages.Select(_idGetter)), + Builders.Filter.Eq(_processedAtSelector, null)), + new CountOptions(), + ct); + + if (leftCount > 0) + { + throw new InvalidOperationException( + "Failed to update all messages, some messages are still not marked as processed"); + } + } + } +} \ No newline at end of file diff --git a/src/MongoDb/Polling/OutboxBatchFetcher.cs b/src/MongoDb/Polling/OutboxBatchFetcher.cs index 58c42b0..09e9766 100644 --- a/src/MongoDb/Polling/OutboxBatchFetcher.cs +++ b/src/MongoDb/Polling/OutboxBatchFetcher.cs @@ -1,4 +1,3 @@ -using System.Linq.Expressions; using MongoDB.Driver; using YakShaveFx.OutboxKit.Core; using YakShaveFx.OutboxKit.Core.Polling; @@ -11,7 +10,6 @@ internal sealed class OutboxBatchFetcher : IBatchFetcher where TM { private readonly IMongoCollection _collection; private readonly DistributedLockThingy _lockThingy; - private readonly TimeProvider _timeProvider; private readonly int _batchSize; private readonly string _lockId; @@ -19,11 +17,8 @@ internal sealed class OutboxBatchFetcher : IBatchFetcher where TM private readonly string _lockContext; private readonly FilterDefinition _findFilter; private readonly SortDefinition _sort; - private readonly Expression> _idSelector; - private readonly Func _idGetter; - private readonly Expression>? _processedAtSelector; - private readonly Func, CancellationToken, Task> _complete; private readonly Func> _hasNext; + private readonly OutboxBatchCompleter _completer; public OutboxBatchFetcher( OutboxKey key, @@ -32,13 +27,12 @@ public OutboxBatchFetcher( MongoDbPollingDistributedLockSettings lockSettings, IMongoDatabase db, DistributedLockThingy lockThingy, - TimeProvider timeProvider) + OutboxBatchCompleter completer) { _lockId = lockSettings.Id; _lockOwner = lockSettings.Owner; _lockContext = key.ToString(); _lockThingy = lockThingy; - _timeProvider = timeProvider; _batchSize = pollingSettings.BatchSize; _findFilter = (pollingSettings.CompletionMode, collectionSettings.ProcessedAtSelector) switch { @@ -48,22 +42,8 @@ public OutboxBatchFetcher( _ => throw new ArgumentException("processed at selector not configured", nameof(collectionSettings)) }; _sort = collectionSettings.Sort; - _idSelector = collectionSettings.IdSelector; - var compiledIdSelector = collectionSettings.IdSelector.Compile(); - _idGetter = m => compiledIdSelector((TMessage)m); - _processedAtSelector = (pollingSettings.CompletionMode, collectionSettings.ProcessedAtSelector) switch - { - (CompletionMode.Delete, _) => null, - (CompletionMode.Update, not null) => collectionSettings.ProcessedAtSelector, - _ => throw new ArgumentException("processed at selector not configured", nameof(collectionSettings)) - }; _collection = db.GetCollection(collectionSettings.Name); - _complete = pollingSettings.CompletionMode switch - { - CompletionMode.Delete => CompleteDeleteAsync, - CompletionMode.Update => CompleteUpdateAsync, - _ => throw new ArgumentException("invalid completion mode", nameof(pollingSettings)) - }; + _completer = completer; _hasNext = HasNextAsync; } @@ -82,7 +62,7 @@ public async Task FetchAndHoldAsync(CancellationToken ct) if (messages.Count > 0) { - return new BatchContext(messages, _complete, _hasNext, @lock); + return new BatchContext(messages, _completer, _hasNext, @lock); } await @lock.DisposeAsync(); @@ -107,55 +87,18 @@ private async Task> FetchMessagesAsync(Cancellatio return cast; } - private async Task CompleteDeleteAsync(IReadOnlyCollection ok, CancellationToken ct) - { - if (ok.Count <= 0) return; - - using var session = await _collection.Database.Client.StartSessionAsync(new ClientSessionOptions(), ct); - session.StartTransaction(); - - var result = await _collection.DeleteManyAsync( - session, - Builders.Filter.In(_idSelector, ok.Select(_idGetter)), - new DeleteOptions(), - ct); - - if (result.DeletedCount != ok.Count) throw new InvalidOperationException("Failed to delete all messages"); - - await session.CommitTransactionAsync(ct); - } - - private async Task CompleteUpdateAsync(IReadOnlyCollection ok, CancellationToken ct) - { - if (ok.Count <= 0) return; - - using var session = await _collection.Database.Client.StartSessionAsync(new ClientSessionOptions(), ct); - session.StartTransaction(); - - var now = _timeProvider.GetUtcNow().DateTime; - var result = await _collection.UpdateManyAsync( - session, - Builders.Filter.In(_idSelector, ok.Select(_idGetter)), - Builders.Update.Set(_processedAtSelector, now), - new UpdateOptions(), - ct); - - if (result.ModifiedCount != ok.Count) throw new InvalidOperationException("Failed to update all messages"); - - await session.CommitTransactionAsync(ct); - } - private Task HasNextAsync(CancellationToken ct) => _collection.Find(_findFilter).Limit(1).AnyAsync(ct); private class BatchContext( IReadOnlyCollection messages, - Func, CancellationToken, Task> complete, + OutboxBatchCompleter completer, Func> hasNext, IDistributedLock @lock) : IBatchContext { public IReadOnlyCollection Messages => messages; - public Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) => complete(ok, ct); + public Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) + => completer.CompleteAsync(ok, ct); public Task HasNextAsync(CancellationToken ct) => hasNext(ct); diff --git a/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs b/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs index 270b1c9..2238746 100644 --- a/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs +++ b/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs @@ -168,7 +168,12 @@ private IBatchFetcher CreateSut( _db, timeProvider, NullLogger.Instance), - timeProvider), + new OutboxBatchCompleter( + MongoDbPollingProvider.CreateKey("test"), + Defaults.Delete.MongoDbPollingSettings, + Defaults.Delete.CollectionConfig, + _db, + timeProvider)), CompletionMode.Update => new OutboxBatchFetcher( MongoDbPollingProvider.CreateKey("test"), Defaults.Update.MongoDbPollingSettings, @@ -186,7 +191,12 @@ private IBatchFetcher CreateSut( _db, timeProvider, NullLogger.Instance), - timeProvider), + new OutboxBatchCompleter( + MongoDbPollingProvider.CreateKey("test"), + Defaults.Update.MongoDbPollingSettings, + Defaults.Update.CollectionConfigWithProcessedAt, + _db, + timeProvider)), _ => throw new ArgumentOutOfRangeException(nameof(completionMode)) }; From d400d6aef652142afe16897ddcbeaa36bfe0c870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Fri, 20 Jun 2025 10:49:05 +0100 Subject: [PATCH 3/7] MySQL implementation of ICompleteRetrier --- .../Polling/CompleteProduceMessagesRetrier.cs | 2 + src/MySql/Polling/AdvisoryLockBatchFetcher.cs | 91 ++-------- src/MySql/Polling/BatchCompleter.cs | 163 ++++++++++++++++++ src/MySql/Polling/ConfigurationExtensions.cs | 26 ++- .../Polling/SelectForUpdateBatchFetcher.cs | 87 +--------- .../Polling/AdvisoryLockBatchFetcherTests.cs | 10 +- .../Polling/BaseBatchFetcherTests.cs | 2 +- .../Polling/BatchCompleterTests.cs | 151 ++++++++++++++++ .../SelectForUpdateBatchFetcherTests.cs | 6 +- ...chFetcherTestHelpers.cs => TestHelpers.cs} | 2 +- 10 files changed, 365 insertions(+), 175 deletions(-) create mode 100644 src/MySql/Polling/BatchCompleter.cs create mode 100644 tests/MySql.Tests/Polling/BatchCompleterTests.cs rename tests/MySql.Tests/Polling/{BatchFetcherTestHelpers.cs => TestHelpers.cs} (95%) diff --git a/src/Core/Polling/CompleteProduceMessagesRetrier.cs b/src/Core/Polling/CompleteProduceMessagesRetrier.cs index 4a8469e..9bf5bd8 100644 --- a/src/Core/Polling/CompleteProduceMessagesRetrier.cs +++ b/src/Core/Polling/CompleteProduceMessagesRetrier.cs @@ -46,7 +46,9 @@ await _retrier.ExecuteWithRetryAsync( () => completeRetrier.RetryCompleteAsync(_messages, ct), ct); + // since most of the time there are no messages to retry // clear messages by creating a new list, so the old one can be garbage collected + // avoiding the underlying array to be kept in memory _messages = new(); } catch (Exception ex) diff --git a/src/MySql/Polling/AdvisoryLockBatchFetcher.cs b/src/MySql/Polling/AdvisoryLockBatchFetcher.cs index ec3c07a..7f490ea 100644 --- a/src/MySql/Polling/AdvisoryLockBatchFetcher.cs +++ b/src/MySql/Polling/AdvisoryLockBatchFetcher.cs @@ -13,9 +13,6 @@ internal sealed class AdvisoryLockBatchFetcher : IBatchFetcher private delegate BatchContext BatchContextFactory( IReadOnlyCollection messages, MySqlConnection connection); - private delegate MySqlCommand CompleteCommandFactory( - IReadOnlyCollection ok, MySqlConnection connection, MySqlTransaction tx); - private readonly int _batchSize; private readonly string _selectQuery; private readonly Func _messageFactory; @@ -27,13 +24,11 @@ public AdvisoryLockBatchFetcher( MySqlPollingSettings pollingSettings, TableConfiguration tableCfg, MySqlDataSource dataSource, - TimeProvider timeProvider) + BatchCompleter completer) { _dataSource = dataSource; _batchSize = pollingSettings.BatchSize; _selectQuery = SetupSelectQuery(pollingSettings, tableCfg); - var deleteQuery = SetupDeleteQuery(tableCfg); - var updateQuery = SetupUpdateQuery(tableCfg); var hasNextQuery = SetupHasNextQuery(pollingSettings, tableCfg); _messageFactory = tableCfg.MessageFactory; _batchContextFactory = @@ -41,15 +36,7 @@ public AdvisoryLockBatchFetcher( okMessages, connection, hasNextQuery, - pollingSettings.CompletionMode switch - { - CompletionMode.Delete => (ok, conn, tx) => - CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), - CompletionMode.Update => (ok, conn, tx) => - CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), - _ => throw new InvalidOperationException( - $"Invalid completion mode {pollingSettings.CompletionMode}") - }); + completer); _lockName = SetupLockName(dataSource); } @@ -129,32 +116,20 @@ private sealed class BatchContext( IReadOnlyCollection messages, MySqlConnection connection, string hasNextQuery, - CompleteCommandFactory completeCommandFactory) + BatchCompleter completer) : IBatchContext { private bool _lockReleased; - + public IReadOnlyCollection Messages => messages; public async Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) { - if (ok.Count > 0) - { - var tx = await connection.BeginTransactionAsync(ct); - await using var command = completeCommandFactory(ok, connection, tx); - var completed = await command.ExecuteNonQueryAsync(ct); - - if (completed != ok.Count) - { - // think if this is the best way to handle this (considering this shouldn't happen, probably it's good enough) - await tx.RollbackAsync(ct); - throw new InvalidOperationException("Failed to complete messages"); - } - - await tx.CommitAsync(ct); - await ReleaseLockAsync(connection, ct); // release immediately, to allow other fetchers to proceed - _lockReleased = true; - } + if (ok.Count <= 0) return; + + await completer.CompleteAsync(ok, connection, null, ct); + await ReleaseLockAsync(connection, ct); // release immediately, to allow other fetchers to proceed + _lockReleased = true; } public async Task HasNextAsync(CancellationToken ct) @@ -176,51 +151,11 @@ public async ValueTask DisposeAsync() { await ReleaseLockAsync(connection, CancellationToken.None); } - await connection.DisposeAsync(); - } - } - - private static MySqlCommand CreateDeleteCommand( - string deleteQuery, - Func idGetter, - IReadOnlyCollection ok, - MySqlConnection connection, - MySqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new MySqlCommand(string.Format(deleteQuery, idParams), connection, tx); - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; + await connection.DisposeAsync(); } - - return command; } - private static MySqlCommand CreateUpdateCommand( - string updateQuery, - TimeProvider timeProvider, - Func idGetter, - IReadOnlyCollection ok, - MySqlConnection connection, - MySqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new MySqlCommand(string.Format(updateQuery, idParams), connection, tx); - command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } private static async Task TryAcquireLockAsync(string lockName, MySqlConnection connection, @@ -254,12 +189,6 @@ private static string SetupHasNextQuery(MySqlPollingSettings pollingSettings, Ta ? $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name} LIMIT 1);" : $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name} WHERE {tableCfg.ProcessedAtColumn} IS NULL LIMIT 1);"; - private static string SetupUpdateQuery(TableConfiguration tableCfg) => - $"UPDATE {tableCfg.Name} SET {tableCfg.ProcessedAtColumn} = @processedAt WHERE {tableCfg.IdColumn} IN ({{0}});"; - - private static string SetupDeleteQuery(TableConfiguration tableCfg) => - $"DELETE FROM {tableCfg.Name} WHERE {tableCfg.IdColumn} IN ({{0}});"; - private static string SetupSelectQuery(MySqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $""" diff --git a/src/MySql/Polling/BatchCompleter.cs b/src/MySql/Polling/BatchCompleter.cs new file mode 100644 index 0000000..2c76ddc --- /dev/null +++ b/src/MySql/Polling/BatchCompleter.cs @@ -0,0 +1,163 @@ +using MySqlConnector; +using YakShaveFx.OutboxKit.Core; +using YakShaveFx.OutboxKit.Core.Polling; +using YakShaveFx.OutboxKit.MySql.Shared; + +namespace YakShaveFx.OutboxKit.MySql.Polling; + +// ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI +internal sealed class BatchCompleter : ICompleteRetrier +{ + private delegate MySqlCommand CompleteCommandFactory( + IReadOnlyCollection ok, MySqlConnection connection, MySqlTransaction? tx); + + private delegate MySqlCommand CheckLeftBehindCommandFactory( + IReadOnlyCollection messages, + MySqlConnection connection); + + private readonly MySqlDataSource _dataSource; + private readonly CompleteCommandFactory _completeCommandFactory; + private readonly CheckLeftBehindCommandFactory _checkLeftBehindCommandFactory; + + public BatchCompleter( + MySqlPollingSettings pollingSettings, + TableConfiguration tableCfg, + MySqlDataSource dataSource, + TimeProvider timeProvider) + { + _dataSource = dataSource; + var deleteQuery = SetupDeleteQuery(tableCfg); + var updateQuery = SetupUpdateQuery(tableCfg); + _completeCommandFactory = pollingSettings.CompletionMode switch + { + CompletionMode.Delete => (ok, conn, tx) => + CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), + CompletionMode.Update => (ok, conn, tx) => + CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), + _ => throw new InvalidOperationException($"Invalid completion mode {pollingSettings.CompletionMode}") + }; + _checkLeftBehindCommandFactory = + (messages, connection) => CreateCheckLeftBehindCommand( + SetupCheckLeftBehindQuery(pollingSettings, tableCfg), + tableCfg.IdGetter, + messages, + connection); + } + + public async Task CompleteAsync( + IReadOnlyCollection messages, + MySqlConnection connection, + MySqlTransaction? tx, + CancellationToken ct) + { + if (messages.Count <= 0) return; + + await using var command = _completeCommandFactory(messages, connection, tx); + var completed = await command.ExecuteNonQueryAsync(ct); + + // can't think of a reason why this would happen, but checking and throwing just in case + if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); + } + + async Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + { + if (messages.Count <= 0) return; + + await using var connection = await _dataSource.OpenConnectionAsync(ct); + await using var command = _completeCommandFactory(messages, connection, null); + var completed = await command.ExecuteNonQueryAsync(ct); + + // other than something unexpected, this could happen if a concurrent process has already completed the messages + // unlike the original CompleteAsync, where concurrency should be controlled, hence the slight difference in behavior + if (completed != messages.Count) + { + await using var checkLeftBehindCommand = _checkLeftBehindCommandFactory(messages, connection); + var result = await checkLeftBehindCommand.ExecuteScalarAsync(ct); + + var anyMessagesLeftBehind = result switch + { + bool b => b, + int i => i == 1, + long l => l == 1, + _ => false + }; + + if (anyMessagesLeftBehind) + { + throw new InvalidOperationException("Failed to complete all messages"); + } + } + } + + private static MySqlCommand CreateDeleteCommand( + string deleteQuery, + Func idGetter, + IReadOnlyCollection ok, + MySqlConnection connection, + MySqlTransaction? tx) + { + var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); + var command = new MySqlCommand(string.Format(deleteQuery, idParams), connection, tx); + + var i = 0; + foreach (var m in ok) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static MySqlCommand CreateUpdateCommand( + string updateQuery, + TimeProvider timeProvider, + Func idGetter, + IReadOnlyCollection ok, + MySqlConnection connection, + MySqlTransaction? tx) + { + var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); + var command = new MySqlCommand(string.Format(updateQuery, idParams), connection, tx); + command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); + + var i = 0; + foreach (var m in ok) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static MySqlCommand CreateCheckLeftBehindCommand( + string checkPendingQuery, + Func idGetter, + IReadOnlyCollection messages, + MySqlConnection connection) + { + var idParams = string.Join(", ", Enumerable.Range(0, messages.Count).Select(i => $"@id{i}")); + var command = new MySqlCommand(string.Format(checkPendingQuery, idParams), connection); + + var i = 0; + foreach (var m in messages) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static string SetupCheckLeftBehindQuery(MySqlPollingSettings pollingSettings, TableConfiguration tableCfg) + => pollingSettings.CompletionMode == CompletionMode.Delete + ? $"SELECT EXISTS (SELECT 1 FROM {tableCfg.Name} WHERE {tableCfg.IdColumn} IN ({{0}}));" + : $"SELECT EXISTS (SELECT 1 FROM {tableCfg.Name} WHERE {tableCfg.IdColumn} IN ({{0}}) AND {tableCfg.ProcessedAtColumn} IS NULL);"; + + private static string SetupUpdateQuery(TableConfiguration tableCfg) => + $"UPDATE {tableCfg.Name} SET {tableCfg.ProcessedAtColumn} = @processedAt WHERE {tableCfg.IdColumn} IN ({{0}}) AND {tableCfg.ProcessedAtColumn} IS NULL;"; + + private static string SetupDeleteQuery(TableConfiguration tableCfg) => + $"DELETE FROM {tableCfg.Name} WHERE {tableCfg.IdColumn} IN ({{0}});"; +} \ No newline at end of file diff --git a/src/MySql/Polling/ConfigurationExtensions.cs b/src/MySql/Polling/ConfigurationExtensions.cs index e772193..c5382de 100644 --- a/src/MySql/Polling/ConfigurationExtensions.cs +++ b/src/MySql/Polling/ConfigurationExtensions.cs @@ -88,14 +88,14 @@ public interface IMySqlPollingOutboxKitConfigurator /// OutboxKit assumes the "processed at" column is a in UTC, /// and uses to obtain the time when completing and cleaning up the messages. IMySqlPollingOutboxKitConfigurator WithUpdateProcessed(Action? configure); - + /// /// Configures the outbox to use "SELECT ... FOR UPDATE" for concurrency control. /// /// The instance for chaining calls. /// This is the default concurrency control if nothing is explicitly set. IMySqlPollingOutboxKitConfigurator WithSelectForUpdateConcurrencyControl(); - + /// /// Configures the outbox to use advisory locks for concurrency control. /// See for more info about this type of locks @@ -194,13 +194,13 @@ public IMySqlPollingOutboxKitConfigurator WithUpdateProcessed(Action())); } + services.AddKeyedSingleton(key, (s, _) => new BatchCompleter( + _settings, + tableCfg, + s.GetRequiredKeyedService(key), + s.GetRequiredService())); + + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + services .AddKeyedMySqlDataSource(key, _connectionString) .AddKeyedSingleton( @@ -244,13 +252,14 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) _settings, tableCfg, s.GetRequiredKeyedService(key), - s.GetRequiredService()), + s.GetRequiredKeyedService(key)), ConcurrencyControl.AdvisoryLock => new AdvisoryLockBatchFetcher( _settings, tableCfg, s.GetRequiredKeyedService(key), - s.GetRequiredService()), - _ => throw new InvalidOperationException($"Invalid concurrency control {_settings.ConcurrencyControl}") + s.GetRequiredKeyedService(key)), + _ => throw new InvalidOperationException( + $"Invalid concurrency control {_settings.ConcurrencyControl}") }); } @@ -280,9 +289,8 @@ internal sealed record MySqlPollingSettings public int BatchSize { get; init; } = 100; public CompletionMode CompletionMode { get; init; } = CompletionMode.Delete; - + public ConcurrencyControl ConcurrencyControl { get; init; } = ConcurrencyControl.SelectForUpdate; - } internal enum CompletionMode diff --git a/src/MySql/Polling/SelectForUpdateBatchFetcher.cs b/src/MySql/Polling/SelectForUpdateBatchFetcher.cs index efeb05c..4b273fe 100644 --- a/src/MySql/Polling/SelectForUpdateBatchFetcher.cs +++ b/src/MySql/Polling/SelectForUpdateBatchFetcher.cs @@ -11,9 +11,6 @@ internal sealed class SelectForUpdateBatchFetcher : IBatchFetcher private delegate BatchContext BatchContextFactory( IReadOnlyCollection messages, MySqlConnection connection, MySqlTransaction tx); - private delegate MySqlCommand CompleteCommandFactory( - IReadOnlyCollection ok, MySqlConnection connection, MySqlTransaction tx); - private readonly int _batchSize; private readonly string _selectQuery; private readonly Func _messageFactory; @@ -24,13 +21,11 @@ public SelectForUpdateBatchFetcher( MySqlPollingSettings pollingSettings, TableConfiguration tableCfg, MySqlDataSource dataSource, - TimeProvider timeProvider) + BatchCompleter completer) { _dataSource = dataSource; _batchSize = pollingSettings.BatchSize; _selectQuery = SetupSelectQuery(pollingSettings, tableCfg); - var deleteQuery = SetupDeleteQuery(tableCfg); - var updateQuery = SetupUpdateQuery(tableCfg); var hasNextQuery = SetupHasNextQuery(pollingSettings, tableCfg); _messageFactory = tableCfg.MessageFactory; _batchContextFactory = @@ -39,15 +34,7 @@ public SelectForUpdateBatchFetcher( connection, transaction, hasNextQuery, - pollingSettings.CompletionMode switch - { - CompletionMode.Delete => (ok, conn, tx) => - CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), - CompletionMode.Update => (ok, conn, tx) => - CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), - _ => throw new InvalidOperationException( - $"Invalid completion mode {pollingSettings.CompletionMode}") - }); + completer); } public async Task FetchAndHoldAsync(CancellationToken ct) @@ -102,31 +89,21 @@ private sealed class BatchContext( MySqlConnection connection, MySqlTransaction tx, string hasNextQuery, - CompleteCommandFactory completeCommandFactory) + BatchCompleter completer) : IBatchContext { public IReadOnlyCollection Messages => messages; public async Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) { - if (ok.Count > 0) - { - await using var command = completeCommandFactory(ok, connection, tx); - var completed = await command.ExecuteNonQueryAsync(ct); - - if (completed != ok.Count) - { - // think if this is the best way to handle this (considering this shouldn't happen, probably it's good enough) - await tx.RollbackAsync(ct); - throw new InvalidOperationException("Failed to complete messages"); - } - - await tx.CommitAsync(ct); - } - else + if (ok.Count <= 0) { await tx.RollbackAsync(ct); + return; } + + await completer.CompleteAsync(ok, connection, tx, ct); + await tx.CommitAsync(ct); } public async Task HasNextAsync(CancellationToken ct) @@ -145,59 +122,11 @@ public async Task HasNextAsync(CancellationToken ct) public ValueTask DisposeAsync() => connection.DisposeAsync(); } - private static MySqlCommand CreateDeleteCommand( - string deleteQuery, - Func idGetter, - IReadOnlyCollection ok, - MySqlConnection connection, - MySqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new MySqlCommand(string.Format(deleteQuery, idParams), connection, tx); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - - private static MySqlCommand CreateUpdateCommand( - string updateQuery, - TimeProvider timeProvider, - Func idGetter, - IReadOnlyCollection ok, - MySqlConnection connection, - MySqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new MySqlCommand(string.Format(updateQuery, idParams), connection, tx); - command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - private static string SetupHasNextQuery(MySqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name} LIMIT 1);" : $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name} WHERE {tableCfg.ProcessedAtColumn} IS NULL LIMIT 1);"; - private static string SetupUpdateQuery(TableConfiguration tableCfg) => - $"UPDATE {tableCfg.Name} SET {tableCfg.ProcessedAtColumn} = @processedAt WHERE {tableCfg.IdColumn} IN ({{0}});"; - - private static string SetupDeleteQuery(TableConfiguration tableCfg) => - $"DELETE FROM {tableCfg.Name} WHERE {tableCfg.IdColumn} IN ({{0}});"; - private static string SetupSelectQuery(MySqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $""" diff --git a/tests/MySql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs b/tests/MySql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs index a58190c..eba68d5 100644 --- a/tests/MySql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs +++ b/tests/MySql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs @@ -1,6 +1,6 @@ using Dapper; using YakShaveFx.OutboxKit.MySql.Polling; -using static YakShaveFx.OutboxKit.MySql.Tests.Polling.BatchFetcherTestHelpers; +using static YakShaveFx.OutboxKit.MySql.Tests.Polling.TestHelpers; namespace YakShaveFx.OutboxKit.MySql.Tests.Polling; @@ -10,7 +10,11 @@ public class AdvisoryLockBatchFetcherTests(MySqlFixture mySqlFixture) private readonly BaseBatchFetcherTests _baseTests = new( mySqlFixture, (pollingSettings, tableCfg, dataSource, timeProvider) => - new AdvisoryLockBatchFetcher(pollingSettings, tableCfg, dataSource, timeProvider)); + new AdvisoryLockBatchFetcher( + pollingSettings, + tableCfg, + dataSource, + new BatchCompleter(pollingSettings, tableCfg, dataSource, timeProvider))); [Theory] [InlineData(CompletionMode.Delete)] @@ -58,7 +62,7 @@ public async Task WhenFetchingABatchLockShouldBeRetainedIfMessagesAreFound(Compl .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - var sut = new AdvisoryLockBatchFetcher(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var sut = new AdvisoryLockBatchFetcher(mySqlSettings, tableConfig, dbCtx.DataSource, new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System)); await using var _ = await sut.FetchAndHoldAsync(CancellationToken.None); diff --git a/tests/MySql.Tests/Polling/BaseBatchFetcherTests.cs b/tests/MySql.Tests/Polling/BaseBatchFetcherTests.cs index 1411398..a2fa2e3 100644 --- a/tests/MySql.Tests/Polling/BaseBatchFetcherTests.cs +++ b/tests/MySql.Tests/Polling/BaseBatchFetcherTests.cs @@ -4,7 +4,7 @@ using YakShaveFx.OutboxKit.Core.Polling; using YakShaveFx.OutboxKit.MySql.Polling; using YakShaveFx.OutboxKit.MySql.Shared; -using static YakShaveFx.OutboxKit.MySql.Tests.Polling.BatchFetcherTestHelpers; +using static YakShaveFx.OutboxKit.MySql.Tests.Polling.TestHelpers; namespace YakShaveFx.OutboxKit.MySql.Tests.Polling; diff --git a/tests/MySql.Tests/Polling/BatchCompleterTests.cs b/tests/MySql.Tests/Polling/BatchCompleterTests.cs new file mode 100644 index 0000000..c47ebfc --- /dev/null +++ b/tests/MySql.Tests/Polling/BatchCompleterTests.cs @@ -0,0 +1,151 @@ +using Dapper; +using MySqlConnector; +using YakShaveFx.OutboxKit.Core.Polling; +using YakShaveFx.OutboxKit.MySql.Polling; +using static YakShaveFx.OutboxKit.MySql.Tests.Polling.TestHelpers; + +namespace YakShaveFx.OutboxKit.MySql.Tests.Polling; + +public class BatchCompleterTests(MySqlFixture mySqlFixture) +{ + private readonly CancellationToken _ct = TestContext.Current.CancellationToken; + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenCompletingMessagesThenTheyShouldBeCompleted(CompletionMode completionMode) + { + var (schemaSettings, mySqlSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await mySqlFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + var sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var messages = await GetMessagesAsync(connection, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); + await sut.CompleteAsync(messages, connection, null, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenCompletingAlreadyCompletedMessagesThenAnExceptionIsThrown(CompletionMode completionMode) + { + var (schemaSettings, mySqlSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await mySqlFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + var sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + + var messages = await GetMessagesAsync(connection, _ct); + await CompleteMessagesAsync(messages, connection, completionMode); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + var act = () => sut.CompleteAsync(messages, connection, null, _ct); + await act.Should().ThrowAsync(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(CompletionMode completionMode) + { + var (schemaSettings, mySqlSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await mySqlFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + ICompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var messages = await GetMessagesAsync(connection, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); + await sut.RetryCompleteAsync(messages, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionIsThrown( + CompletionMode completionMode) + { + var (schemaSettings, mySqlSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await mySqlFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + ICompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + + var messages = await GetMessagesAsync(connection, _ct); + await CompleteMessagesAsync(messages, connection, completionMode); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + await sut.RetryCompleteAsync(messages, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + private async Task> GetMessagesAsync( + MySqlConnection connection, + CancellationToken ct) + => (await connection.QueryAsync( + // lang=mysql + "SELECT id, type, payload, created_at, trace_context FROM outbox_messages;", + ct)).ToArray(); + + private async Task CompleteMessagesAsync( + IReadOnlyCollection messages, + MySqlConnection connection, + CompletionMode completionMode) + { + if (completionMode == CompletionMode.Delete) + { + var result = await connection.ExecuteAsync( + new CommandDefinition( + // lang=mysql + "DELETE FROM outbox_messages WHERE id IN @Ids;", + new { Ids = messages.Select(m => m.Id) }, + cancellationToken: _ct)); + } + else + { + var result = await connection.ExecuteAsync( + new CommandDefinition( + // lang=mysql + "UPDATE outbox_messages SET processed_at = @CompletedAt WHERE id IN @Ids;", + new { CompletedAt = DateTime.UtcNow, Ids = messages.Select(m => m.Id) }, + cancellationToken: _ct)); + } + } + + private async Task AreMessagesCompletedAsync( + IReadOnlyCollection messages, + MySqlConnection connection, + CompletionMode completionMode) + { + if (completionMode == CompletionMode.Delete) + { + return await connection.ExecuteScalarAsync( + new CommandDefinition( + // lang=mysql + "SELECT NOT EXISTS(SELECT 1 FROM outbox_messages WHERE id IN @Ids);", + new { Ids = messages.Select(m => m.Id) }, + cancellationToken: _ct)); + } + else + { + return await connection.ExecuteScalarAsync( + new CommandDefinition( + // lang=mysql + "SELECT NOT EXISTS(SELECT 1 FROM outbox_messages WHERE id IN @Ids AND processed_at IS NULL);", + new { Ids = messages.Select(m => m.Id) }, + cancellationToken: _ct)); + } + } +} \ No newline at end of file diff --git a/tests/MySql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs b/tests/MySql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs index 6d6599c..cc2238e 100644 --- a/tests/MySql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs +++ b/tests/MySql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs @@ -7,7 +7,11 @@ public class SelectForUpdateBatchFetcherTests(MySqlFixture mySqlFixture) private readonly BaseBatchFetcherTests _baseTests = new( mySqlFixture, (pollingSettings, tableCfg, dataSource, timeProvider) => - new SelectForUpdateBatchFetcher(pollingSettings, tableCfg, dataSource, timeProvider)); + new SelectForUpdateBatchFetcher( + pollingSettings, + tableCfg, + dataSource, + new BatchCompleter(pollingSettings, tableCfg, dataSource, timeProvider))); [Theory] [InlineData(CompletionMode.Delete)] diff --git a/tests/MySql.Tests/Polling/BatchFetcherTestHelpers.cs b/tests/MySql.Tests/Polling/TestHelpers.cs similarity index 95% rename from tests/MySql.Tests/Polling/BatchFetcherTestHelpers.cs rename to tests/MySql.Tests/Polling/TestHelpers.cs index da35edc..508958f 100644 --- a/tests/MySql.Tests/Polling/BatchFetcherTestHelpers.cs +++ b/tests/MySql.Tests/Polling/TestHelpers.cs @@ -3,7 +3,7 @@ namespace YakShaveFx.OutboxKit.MySql.Tests.Polling; -internal static class BatchFetcherTestHelpers +internal static class TestHelpers { public record Config( DefaultSchemaSettings DefaultSchemaSettings, From 39c96ce1dc58e5033051d4a56194f391d3430d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Fri, 20 Jun 2025 11:37:54 +0100 Subject: [PATCH 4/7] PostgreSQL implementation of ICompleteRetrier --- .../Polling/AdvisoryLockBatchFetcher.cs | 90 +--------- src/PostgreSql/Polling/BatchCompleter.cs | 163 ++++++++++++++++++ .../Polling/ConfigurationExtensions.cs | 13 +- .../Polling/SelectForUpdateBatchFetcher.cs | 89 +--------- .../Polling/AdvisoryLockBatchFetcherTests.cs | 6 +- .../Polling/BaseBatchFetcherTests.cs | 2 +- .../Polling/BatchCompleterTests.cs | 151 ++++++++++++++++ .../SelectForUpdateBatchFetcherTests.cs | 6 +- ...chFetcherTestHelpers.cs => TestHelpers.cs} | 2 +- 9 files changed, 351 insertions(+), 171 deletions(-) create mode 100644 src/PostgreSql/Polling/BatchCompleter.cs create mode 100644 tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs rename tests/PostgreSql.Tests/Polling/{BatchFetcherTestHelpers.cs => TestHelpers.cs} (95%) diff --git a/src/PostgreSql/Polling/AdvisoryLockBatchFetcher.cs b/src/PostgreSql/Polling/AdvisoryLockBatchFetcher.cs index 2ce2d5c..ffb3950 100644 --- a/src/PostgreSql/Polling/AdvisoryLockBatchFetcher.cs +++ b/src/PostgreSql/Polling/AdvisoryLockBatchFetcher.cs @@ -1,5 +1,3 @@ -using System.Security.Cryptography; -using System.Text; using Npgsql; using YakShaveFx.OutboxKit.Core; using YakShaveFx.OutboxKit.Core.Polling; @@ -13,9 +11,6 @@ internal sealed class AdvisoryLockBatchFetcher : IBatchFetcher private delegate BatchContext BatchContextFactory( IReadOnlyCollection messages, NpgsqlConnection connection); - private delegate NpgsqlCommand CompleteCommandFactory( - IReadOnlyCollection ok, NpgsqlConnection connection, NpgsqlTransaction tx); - private readonly int _batchSize; private readonly string _selectQuery; private readonly Func _messageFactory; @@ -27,13 +22,11 @@ public AdvisoryLockBatchFetcher( PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg, NpgsqlDataSource dataSource, - TimeProvider timeProvider) + BatchCompleter completer) { _dataSource = dataSource; _batchSize = pollingSettings.BatchSize; _selectQuery = SetupSelectQuery(pollingSettings, tableCfg); - var deleteQuery = SetupDeleteQuery(tableCfg); - var updateQuery = SetupUpdateQuery(tableCfg); var hasNextQuery = SetupHasNextQuery(pollingSettings, tableCfg); _messageFactory = tableCfg.MessageFactory; _batchContextFactory = @@ -41,15 +34,7 @@ public AdvisoryLockBatchFetcher( okMessages, connection, hasNextQuery, - pollingSettings.CompletionMode switch - { - CompletionMode.Delete => (ok, conn, tx) => - CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), - CompletionMode.Update => (ok, conn, tx) => - CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), - _ => throw new InvalidOperationException( - $"Invalid completion mode {pollingSettings.CompletionMode}") - }); + completer); _lockKey = SetupLockKey(dataSource); } @@ -117,7 +102,7 @@ private sealed class BatchContext( IReadOnlyCollection messages, NpgsqlConnection connection, string hasNextQuery, - CompleteCommandFactory completeCommandFactory) + BatchCompleter completer) : IBatchContext { private bool _lockReleased; @@ -126,23 +111,11 @@ private sealed class BatchContext( public async Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) { - if (ok.Count > 0) - { - var tx = await connection.BeginTransactionAsync(ct); - await using var command = completeCommandFactory(ok, connection, tx); - var completed = await command.ExecuteNonQueryAsync(ct); - - if (completed != ok.Count) - { - // think if this is the best way to handle this (considering this shouldn't happen, probably it's good enough) - await tx.RollbackAsync(ct); - throw new InvalidOperationException("Failed to complete messages"); - } + if (ok.Count <= 0) return; - await tx.CommitAsync(ct); - await ReleaseLockAsync(connection, ct); // release immediately, to allow other fetchers to proceed - _lockReleased = true; - } + await completer.CompleteAsync(ok, connection, null, ct); + await ReleaseLockAsync(connection, ct); // release immediately, to allow other fetchers to proceed + _lockReleased = true; } public async Task HasNextAsync(CancellationToken ct) @@ -169,48 +142,6 @@ public async ValueTask DisposeAsync() } } - private static NpgsqlCommand CreateDeleteCommand( - string deleteQuery, - Func idGetter, - IReadOnlyCollection ok, - NpgsqlConnection connection, - NpgsqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new NpgsqlCommand(string.Format(deleteQuery, idParams), connection, tx); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - - private static NpgsqlCommand CreateUpdateCommand( - string updateQuery, - TimeProvider timeProvider, - Func idGetter, - IReadOnlyCollection ok, - NpgsqlConnection connection, - NpgsqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new NpgsqlCommand(string.Format(updateQuery, idParams), connection, tx); - command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - private static async Task TryAcquireLockAsync( AdvisoryLockKey lockKey, NpgsqlConnection connection, @@ -244,12 +175,6 @@ private static string SetupHasNextQuery(PostgreSqlPollingSettings pollingSetting ? $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name.Quote()} LIMIT 1);" : $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name.Quote()} WHERE {tableCfg.ProcessedAtColumn.Quote()} IS NULL LIMIT 1);"; - private static string SetupUpdateQuery(TableConfiguration tableCfg) => - $"UPDATE {tableCfg.Name.Quote()} SET {tableCfg.ProcessedAtColumn.Quote()} = @processedAt WHERE {tableCfg.IdColumn.Quote()} IN ({{0}});"; - - private static string SetupDeleteQuery(TableConfiguration tableCfg) => - $"DELETE FROM {tableCfg.Name.Quote()} WHERE {tableCfg.IdColumn.Quote()} IN ({{0}});"; - private static string SetupSelectQuery(PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $""" @@ -279,4 +204,3 @@ private static string DirectionToString(SortDirection direction) => private sealed record AdvisoryLockKey(int Key1, int Key2); } - diff --git a/src/PostgreSql/Polling/BatchCompleter.cs b/src/PostgreSql/Polling/BatchCompleter.cs new file mode 100644 index 0000000..a3eb8fb --- /dev/null +++ b/src/PostgreSql/Polling/BatchCompleter.cs @@ -0,0 +1,163 @@ +using Npgsql; +using YakShaveFx.OutboxKit.Core; +using YakShaveFx.OutboxKit.Core.Polling; +using YakShaveFx.OutboxKit.PostgreSql.Shared; + +namespace YakShaveFx.OutboxKit.PostgreSql.Polling; + +// ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI +internal sealed class BatchCompleter : ICompleteRetrier +{ + private delegate NpgsqlCommand CompleteCommandFactory( + IReadOnlyCollection ok, NpgsqlConnection connection, NpgsqlTransaction? tx); + + private delegate NpgsqlCommand CheckLeftBehindCommandFactory( + IReadOnlyCollection messages, + NpgsqlConnection connection); + + private readonly NpgsqlDataSource _dataSource; + private readonly CompleteCommandFactory _completeCommandFactory; + private readonly CheckLeftBehindCommandFactory _checkLeftBehindCommandFactory; + + public BatchCompleter( + PostgreSqlPollingSettings pollingSettings, + TableConfiguration tableCfg, + NpgsqlDataSource dataSource, + TimeProvider timeProvider) + { + _dataSource = dataSource; + var deleteQuery = SetupDeleteQuery(tableCfg); + var updateQuery = SetupUpdateQuery(tableCfg); + _completeCommandFactory = pollingSettings.CompletionMode switch + { + CompletionMode.Delete => (ok, conn, tx) => + CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), + CompletionMode.Update => (ok, conn, tx) => + CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), + _ => throw new InvalidOperationException($"Invalid completion mode {pollingSettings.CompletionMode}") + }; + _checkLeftBehindCommandFactory = + (messages, connection) => CreateCheckLeftBehindCommand( + SetupCheckLeftBehindQuery(pollingSettings, tableCfg), + tableCfg.IdGetter, + messages, + connection); + } + + public async Task CompleteAsync( + IReadOnlyCollection messages, + NpgsqlConnection connection, + NpgsqlTransaction? tx, + CancellationToken ct) + { + if (messages.Count <= 0) return; + + await using var command = _completeCommandFactory(messages, connection, tx); + var completed = await command.ExecuteNonQueryAsync(ct); + + // can't think of a reason why this would happen, but checking and throwing just in case + if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); + } + + async Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + { + if (messages.Count <= 0) return; + + await using var connection = await _dataSource.OpenConnectionAsync(ct); + await using var command = _completeCommandFactory(messages, connection, null); + var completed = await command.ExecuteNonQueryAsync(ct); + + // other than something unexpected, this could happen if a concurrent process has already completed the messages + // unlike the original CompleteAsync, where concurrency should be controlled, hence the slight difference in behavior + if (completed != messages.Count) + { + await using var checkLeftBehindCommand = _checkLeftBehindCommandFactory(messages, connection); + var result = await checkLeftBehindCommand.ExecuteScalarAsync(ct); + + var anyMessagesLeftBehind = result switch + { + bool b => b, + int i => i == 1, + long l => l == 1, + _ => false + }; + + if (anyMessagesLeftBehind) + { + throw new InvalidOperationException("Failed to complete all messages"); + } + } + } + + private static NpgsqlCommand CreateDeleteCommand( + string deleteQuery, + Func idGetter, + IReadOnlyCollection ok, + NpgsqlConnection connection, + NpgsqlTransaction? tx) + { + var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); + var command = new NpgsqlCommand(string.Format(deleteQuery, idParams), connection, tx); + + var i = 0; + foreach (var m in ok) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static NpgsqlCommand CreateUpdateCommand( + string updateQuery, + TimeProvider timeProvider, + Func idGetter, + IReadOnlyCollection ok, + NpgsqlConnection connection, + NpgsqlTransaction? tx) + { + var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); + var command = new NpgsqlCommand(string.Format(updateQuery, idParams), connection, tx); + command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); + + var i = 0; + foreach (var m in ok) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static NpgsqlCommand CreateCheckLeftBehindCommand( + string checkPendingQuery, + Func idGetter, + IReadOnlyCollection messages, + NpgsqlConnection connection) + { + var idParams = string.Join(", ", Enumerable.Range(0, messages.Count).Select(i => $"@id{i}")); + var command = new NpgsqlCommand(string.Format(checkPendingQuery, idParams), connection); + + var i = 0; + foreach (var m in messages) + { + command.Parameters.AddWithValue($"id{i}", idGetter(m)); + i++; + } + + return command; + } + + private static string SetupCheckLeftBehindQuery(PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg) + => pollingSettings.CompletionMode == CompletionMode.Delete + ? $"SELECT EXISTS (SELECT 1 FROM {tableCfg.Name.Quote()} WHERE {tableCfg.IdColumn.Quote()} IN ({{0}}));" + : $"SELECT EXISTS (SELECT 1 FROM {tableCfg.Name.Quote()} WHERE {tableCfg.IdColumn.Quote()} IN ({{0}}) AND {tableCfg.ProcessedAtColumn.Quote()} IS NULL);"; + + private static string SetupUpdateQuery(TableConfiguration tableCfg) => + $"UPDATE {tableCfg.Name.Quote()} SET {tableCfg.ProcessedAtColumn.Quote()} = @processedAt WHERE {tableCfg.IdColumn.Quote()} IN ({{0}}) AND {tableCfg.ProcessedAtColumn.Quote()} IS NULL;"; + + private static string SetupDeleteQuery(TableConfiguration tableCfg) => + $"DELETE FROM {tableCfg.Name.Quote()} WHERE {tableCfg.IdColumn.Quote()} IN ({{0}});"; +} \ No newline at end of file diff --git a/src/PostgreSql/Polling/ConfigurationExtensions.cs b/src/PostgreSql/Polling/ConfigurationExtensions.cs index cf65577..f1ca5fe 100644 --- a/src/PostgreSql/Polling/ConfigurationExtensions.cs +++ b/src/PostgreSql/Polling/ConfigurationExtensions.cs @@ -232,6 +232,15 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) s.GetRequiredService())); } + services.AddKeyedSingleton(key, (s, _) => new BatchCompleter( + _settings, + tableCfg, + s.GetRequiredKeyedService(key), + s.GetRequiredService())); + + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + + services .AddNpgsqlDataSource(_connectionString, serviceKey: key) .AddKeyedSingleton( @@ -243,12 +252,12 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) _settings, tableCfg, s.GetRequiredKeyedService(key), - s.GetRequiredService()), + s.GetRequiredKeyedService(key)), ConcurrencyControl.AdvisoryLock => new AdvisoryLockBatchFetcher( _settings, tableCfg, s.GetRequiredKeyedService(key), - s.GetRequiredService()), + s.GetRequiredKeyedService(key)), _ => throw new InvalidOperationException($"Invalid concurrency control {_settings.ConcurrencyControl}") }); } diff --git a/src/PostgreSql/Polling/SelectForUpdateBatchFetcher.cs b/src/PostgreSql/Polling/SelectForUpdateBatchFetcher.cs index e6b4fc7..2a2a086 100644 --- a/src/PostgreSql/Polling/SelectForUpdateBatchFetcher.cs +++ b/src/PostgreSql/Polling/SelectForUpdateBatchFetcher.cs @@ -11,9 +11,6 @@ internal sealed class SelectForUpdateBatchFetcher : IBatchFetcher private delegate BatchContext BatchContextFactory( IReadOnlyCollection messages, NpgsqlConnection connection, NpgsqlTransaction tx); - private delegate NpgsqlCommand CompleteCommandFactory( - IReadOnlyCollection ok, NpgsqlConnection connection, NpgsqlTransaction tx); - private readonly int _batchSize; private readonly string _selectQuery; private readonly Func _messageFactory; @@ -24,13 +21,11 @@ public SelectForUpdateBatchFetcher( PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg, NpgsqlDataSource dataSource, - TimeProvider timeProvider) + BatchCompleter completer) { _dataSource = dataSource; _batchSize = pollingSettings.BatchSize; _selectQuery = SetupSelectQuery(pollingSettings, tableCfg); - var deleteQuery = SetupDeleteQuery(tableCfg); - var updateQuery = SetupUpdateQuery(tableCfg); var hasNextQuery = SetupHasNextQuery(pollingSettings, tableCfg); _messageFactory = tableCfg.MessageFactory; _batchContextFactory = @@ -39,15 +34,7 @@ public SelectForUpdateBatchFetcher( connection, transaction, hasNextQuery, - pollingSettings.CompletionMode switch - { - CompletionMode.Delete => (ok, conn, tx) => - CreateDeleteCommand(deleteQuery, tableCfg.IdGetter, ok, conn, tx), - CompletionMode.Update => (ok, conn, tx) => - CreateUpdateCommand(updateQuery, timeProvider, tableCfg.IdGetter, ok, conn, tx), - _ => throw new InvalidOperationException( - $"Invalid completion mode {pollingSettings.CompletionMode}") - }); + completer); } public async Task FetchAndHoldAsync(CancellationToken ct) @@ -102,31 +89,21 @@ private sealed class BatchContext( NpgsqlConnection connection, NpgsqlTransaction tx, string hasNextQuery, - CompleteCommandFactory completeCommandFactory) + BatchCompleter completer) : IBatchContext { public IReadOnlyCollection Messages => messages; public async Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct) { - if (ok.Count > 0) - { - await using var command = completeCommandFactory(ok, connection, tx); - var completed = await command.ExecuteNonQueryAsync(ct); - - if (completed != ok.Count) - { - // think if this is the best way to handle this (considering this shouldn't happen, probably it's good enough) - await tx.RollbackAsync(ct); - throw new InvalidOperationException("Failed to complete messages"); - } - - await tx.CommitAsync(ct); - } - else + if (ok.Count <= 0) { await tx.RollbackAsync(ct); + return; } + + await completer.CompleteAsync(ok, connection, tx, ct); + await tx.CommitAsync(ct); } public async Task HasNextAsync(CancellationToken ct) @@ -145,59 +122,11 @@ public async Task HasNextAsync(CancellationToken ct) public ValueTask DisposeAsync() => connection.DisposeAsync(); } - private static NpgsqlCommand CreateDeleteCommand( - string deleteQuery, - Func idGetter, - IReadOnlyCollection ok, - NpgsqlConnection connection, - NpgsqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new NpgsqlCommand(string.Format(deleteQuery, idParams), connection, tx); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - - private static NpgsqlCommand CreateUpdateCommand( - string updateQuery, - TimeProvider timeProvider, - Func idGetter, - IReadOnlyCollection ok, - NpgsqlConnection connection, - NpgsqlTransaction tx) - { - var idParams = string.Join(", ", Enumerable.Range(0, ok.Count).Select(i => $"@id{i}")); - var command = new NpgsqlCommand(string.Format(updateQuery, idParams), connection, tx); - command.Parameters.AddWithValue("processedAt", timeProvider.GetUtcNow().DateTime); - - var i = 0; - foreach (var m in ok) - { - command.Parameters.AddWithValue($"id{i}", idGetter(m)); - i++; - } - - return command; - } - private static string SetupHasNextQuery(PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name.Quote()} LIMIT 1);" : $"SELECT EXISTS(SELECT 1 FROM {tableCfg.Name.Quote()} WHERE {tableCfg.ProcessedAtColumn.Quote()} IS NULL LIMIT 1);"; - - private static string SetupUpdateQuery(TableConfiguration tableCfg) => - $"UPDATE {tableCfg.Name.Quote()} SET {tableCfg.ProcessedAtColumn.Quote()} = @processedAt WHERE {tableCfg.IdColumn.Quote()} IN ({{0}});"; - - private static string SetupDeleteQuery(TableConfiguration tableCfg) => - $"DELETE FROM {tableCfg.Name.Quote()} WHERE {tableCfg.IdColumn.Quote()} IN ({{0}});"; - + private static string SetupSelectQuery(PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg) => pollingSettings.CompletionMode == CompletionMode.Delete ? $""" diff --git a/tests/PostgreSql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs b/tests/PostgreSql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs index 1d72390..b3b43bc 100644 --- a/tests/PostgreSql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs +++ b/tests/PostgreSql.Tests/Polling/AdvisoryLockBatchFetcherTests.cs @@ -3,7 +3,7 @@ using YakShaveFx.OutboxKit.Core.Polling; using YakShaveFx.OutboxKit.PostgreSql.Polling; using YakShaveFx.OutboxKit.PostgreSql.Shared; -using static YakShaveFx.OutboxKit.PostgreSql.Tests.Polling.BatchFetcherTestHelpers; +using static YakShaveFx.OutboxKit.PostgreSql.Tests.Polling.TestHelpers; namespace YakShaveFx.OutboxKit.PostgreSql.Tests.Polling; @@ -95,7 +95,7 @@ public async Task WhenFetchingABatchLockShouldBeRetainedIfMessagesAreFound(Compl .WithSeed(seed ? 10 : 0) .InitAsync(); - var sut = new AdvisoryLockBatchFetcher(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var sut = new AdvisoryLockBatchFetcher(postgresSettings, tableConfig, dbCtx.DataSource, new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System)); await using var _ = await sut.FetchAndHoldAsync(CancellationToken.None); @@ -113,5 +113,5 @@ public async Task WhenFetchingABatchLockShouldBeRetainedIfMessagesAreFound(Compl } private static IBatchFetcher SutFactory(PostgreSqlPollingSettings pollingSettings, TableConfiguration tableCfg, NpgsqlDataSource dataSource, TimeProvider timeProvider) - => new AdvisoryLockBatchFetcher(pollingSettings, tableCfg, dataSource, timeProvider); + => new AdvisoryLockBatchFetcher(pollingSettings, tableCfg, dataSource, new BatchCompleter(pollingSettings, tableCfg, dataSource, timeProvider)); } \ No newline at end of file diff --git a/tests/PostgreSql.Tests/Polling/BaseBatchFetcherTests.cs b/tests/PostgreSql.Tests/Polling/BaseBatchFetcherTests.cs index 27bc819..26dce8f 100644 --- a/tests/PostgreSql.Tests/Polling/BaseBatchFetcherTests.cs +++ b/tests/PostgreSql.Tests/Polling/BaseBatchFetcherTests.cs @@ -4,7 +4,7 @@ using YakShaveFx.OutboxKit.Core.Polling; using YakShaveFx.OutboxKit.PostgreSql.Polling; using YakShaveFx.OutboxKit.PostgreSql.Shared; -using static YakShaveFx.OutboxKit.PostgreSql.Tests.Polling.BatchFetcherTestHelpers; +using static YakShaveFx.OutboxKit.PostgreSql.Tests.Polling.TestHelpers; namespace YakShaveFx.OutboxKit.PostgreSql.Tests.Polling; diff --git a/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs new file mode 100644 index 0000000..aaee2f8 --- /dev/null +++ b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs @@ -0,0 +1,151 @@ +using Dapper; +using Npgsql; +using YakShaveFx.OutboxKit.Core.Polling; +using YakShaveFx.OutboxKit.PostgreSql.Polling; +using static YakShaveFx.OutboxKit.PostgreSql.Tests.Polling.TestHelpers; + +namespace YakShaveFx.OutboxKit.PostgreSql.Tests.Polling; + +public class BatchCompleterTests(PostgreSqlFixture postgresFixture) +{ + private readonly CancellationToken _ct = TestContext.Current.CancellationToken; + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenCompletingMessagesThenTheyShouldBeCompleted(CompletionMode completionMode) + { + var (schemaSettings, postgresSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await postgresFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + var sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var messages = await GetMessagesAsync(connection, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); + await sut.CompleteAsync(messages, connection, null, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenCompletingAlreadyCompletedMessagesThenAnExceptionIsThrown(CompletionMode completionMode) + { + var (schemaSettings, postgresSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await postgresFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + var sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + + var messages = await GetMessagesAsync(connection, _ct); + await CompleteMessagesAsync(messages, connection, completionMode); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + var act = () => sut.CompleteAsync(messages, connection, null, _ct); + await act.Should().ThrowAsync(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(CompletionMode completionMode) + { + var (schemaSettings, postgresSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await postgresFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + ICompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + var messages = await GetMessagesAsync(connection, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); + await sut.RetryCompleteAsync(messages, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + [Theory] + [InlineData(CompletionMode.Delete)] + [InlineData(CompletionMode.Update)] + public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionIsThrown( + CompletionMode completionMode) + { + var (schemaSettings, postgresSettings, tableConfig) = GetConfigs(completionMode); + await using var dbCtx = await postgresFixture.DbInit + .WithDefaultSchema(schemaSettings) + .WithSeed() + .InitAsync(); + await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); + + ICompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + + var messages = await GetMessagesAsync(connection, _ct); + await CompleteMessagesAsync(messages, connection, completionMode); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + await sut.RetryCompleteAsync(messages, _ct); + (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); + } + + private async Task> GetMessagesAsync( + NpgsqlConnection connection, + CancellationToken ct) + => (await connection.QueryAsync( + // lang=mysql + """SELECT "Id", "Type", "Payload", "CreatedAt", "TraceContext" FROM "OutboxMessages";""", + ct)).ToArray(); + + private async Task CompleteMessagesAsync( + IReadOnlyCollection messages, + NpgsqlConnection connection, + CompletionMode completionMode) + { + if (completionMode == CompletionMode.Delete) + { + var result = await connection.ExecuteAsync( + new CommandDefinition( + // lang=mysql + """DELETE FROM "OutboxMessages" WHERE "Id" = ANY (@Ids);""", + new { Ids = messages.Select(m => m.Id).ToArray() }, + cancellationToken: _ct)); + } + else + { + var result = await connection.ExecuteAsync( + new CommandDefinition( + // lang=mysql + """UPDATE "OutboxMessages" SET "ProcessedAt" = @CompletedAt WHERE "Id" = ANY (@Ids);""", + new { CompletedAt = DateTime.UtcNow, Ids = messages.Select(m => m.Id).ToArray() }, + cancellationToken: _ct)); + } + } + + private async Task AreMessagesCompletedAsync( + IReadOnlyCollection messages, + NpgsqlConnection connection, + CompletionMode completionMode) + { + if (completionMode == CompletionMode.Delete) + { + return await connection.ExecuteScalarAsync( + new CommandDefinition( + // lang=mysql + """SELECT NOT EXISTS(SELECT 1 FROM "OutboxMessages" WHERE "Id" = ANY (@Ids));""", + new { Ids = messages.Select(m => m.Id).ToArray() }, + cancellationToken: _ct)); + } + else + { + return await connection.ExecuteScalarAsync( + new CommandDefinition( + // lang=mysql + """SELECT NOT EXISTS(SELECT 1 FROM "OutboxMessages" WHERE "Id" = ANY (@Ids) AND "ProcessedAt" IS NULL);""", + new { Ids = messages.Select(m => m.Id).ToArray() }, + cancellationToken: _ct)); + } + } +} \ No newline at end of file diff --git a/tests/PostgreSql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs b/tests/PostgreSql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs index 904ac1a..abc02ad 100644 --- a/tests/PostgreSql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs +++ b/tests/PostgreSql.Tests/Polling/SelectForUpdateBatchFetcherTests.cs @@ -7,7 +7,11 @@ public class SelectForUpdateBatchFetcherTests(PostgreSqlFixture postgresFixture) private readonly BaseBatchFetcherTests _baseTests = new( postgresFixture, (pollingSettings, tableCfg, dataSource, timeProvider) => - new SelectForUpdateBatchFetcher(pollingSettings, tableCfg, dataSource, timeProvider)); + new SelectForUpdateBatchFetcher( + pollingSettings, + tableCfg, + dataSource, + new BatchCompleter(pollingSettings, tableCfg, dataSource, timeProvider))); [Theory] [InlineData(CompletionMode.Delete)] diff --git a/tests/PostgreSql.Tests/Polling/BatchFetcherTestHelpers.cs b/tests/PostgreSql.Tests/Polling/TestHelpers.cs similarity index 95% rename from tests/PostgreSql.Tests/Polling/BatchFetcherTestHelpers.cs rename to tests/PostgreSql.Tests/Polling/TestHelpers.cs index b4a45aa..37ef133 100644 --- a/tests/PostgreSql.Tests/Polling/BatchFetcherTestHelpers.cs +++ b/tests/PostgreSql.Tests/Polling/TestHelpers.cs @@ -3,7 +3,7 @@ namespace YakShaveFx.OutboxKit.PostgreSql.Tests.Polling; -internal static class BatchFetcherTestHelpers +internal static class TestHelpers { public record Config( DefaultSchemaSettings DefaultSchemaSettings, From cc3079edce83175f8856b73299b3ae3f0cb6ced6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Fri, 20 Jun 2025 11:53:01 +0100 Subject: [PATCH 5/7] Small tweaks and naming adjustments --- .../Polling/CompleteProduceMessagesRetrier.cs | 27 ++++++------------- ... => IProducedMessagesCompletionRetrier.cs} | 2 +- src/Core/ServiceCollectionExtensions.cs | 5 ++-- .../Polling/ConfigurationImplementation.cs | 2 +- src/MongoDb/Polling/OutboxBatchCompleter.cs | 4 +-- src/MySql/Polling/BatchCompleter.cs | 4 +-- src/MySql/Polling/ConfigurationExtensions.cs | 2 +- src/PostgreSql/Polling/BatchCompleter.cs | 4 +-- .../Polling/ConfigurationExtensions.cs | 2 +- .../Polling/BatchCompleterTests.cs | 4 +-- .../Polling/BatchCompleterTests.cs | 4 +-- 11 files changed, 24 insertions(+), 36 deletions(-) rename src/Core/Polling/{ICompleteRetrier.cs => IProducedMessagesCompletionRetrier.cs} (92%) diff --git a/src/Core/Polling/CompleteProduceMessagesRetrier.cs b/src/Core/Polling/CompleteProduceMessagesRetrier.cs index 9bf5bd8..6ad422e 100644 --- a/src/Core/Polling/CompleteProduceMessagesRetrier.cs +++ b/src/Core/Polling/CompleteProduceMessagesRetrier.cs @@ -1,5 +1,3 @@ -using Microsoft.Extensions.Logging; - namespace YakShaveFx.OutboxKit.Core.Polling; internal interface ICollectProducedMessagesToRetryCompletion @@ -14,9 +12,8 @@ internal interface IRetryCompletionOfProducedMessages // not thread safe, as it is only used in the context of a producing flow, which has no concurrency internal sealed class CompleteProduceMessagesRetrier( - ICompleteRetrier completeRetrier, - RetrierBuilderFactory retrierBuilderFactory, - ILogger logger) + IProducedMessagesCompletionRetrier completionRetrier, + RetrierBuilderFactory retrierBuilderFactory) : ICollectProducedMessagesToRetryCompletion, IRetryCompletionOfProducedMessages { private readonly Retrier _retrier = retrierBuilderFactory.Create() @@ -40,20 +37,12 @@ public ValueTask RetryCompleteAsync(CancellationToken ct) private async Task InnerRetryCompleteAsync(CancellationToken ct) { - try - { - await _retrier.ExecuteWithRetryAsync( - () => completeRetrier.RetryCompleteAsync(_messages, ct), - ct); + await _retrier.ExecuteWithRetryAsync( + () => completionRetrier.RetryCompleteAsync(_messages, ct), + ct); - // since most of the time there are no messages to retry - // clear messages by creating a new list, so the old one can be garbage collected - // avoiding the underlying array to be kept in memory - _messages = new(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error retrying completion of previously produced messages"); - } + // since most of the time there are no messages to retry, we clear messages by creating a new list, + // so the old one can be garbage collected, avoiding the underlying array to be kept in memory + _messages = new(); } } \ No newline at end of file diff --git a/src/Core/Polling/ICompleteRetrier.cs b/src/Core/Polling/IProducedMessagesCompletionRetrier.cs similarity index 92% rename from src/Core/Polling/ICompleteRetrier.cs rename to src/Core/Polling/IProducedMessagesCompletionRetrier.cs index e824f9d..3eb5005 100644 --- a/src/Core/Polling/ICompleteRetrier.cs +++ b/src/Core/Polling/IProducedMessagesCompletionRetrier.cs @@ -3,7 +3,7 @@ namespace YakShaveFx.OutboxKit.Core.Polling; /// /// Interface to be implemented by library users, to make it possible to retry completing messages already produced. /// -public interface ICompleteRetrier +public interface IProducedMessagesCompletionRetrier { /// /// Completes the given collection of messages. diff --git a/src/Core/ServiceCollectionExtensions.cs b/src/Core/ServiceCollectionExtensions.cs index ed26455..3ab9469 100644 --- a/src/Core/ServiceCollectionExtensions.cs +++ b/src/Core/ServiceCollectionExtensions.cs @@ -81,9 +81,8 @@ private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitCo services.AddKeyedSingleton( key, (s, _) => new CompleteProduceMessagesRetrier( - s.GetRequiredKeyedService(key), - s.GetRequiredService(), - s.GetRequiredService>())); + s.GetRequiredKeyedService(key), + s.GetRequiredService())); services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); diff --git a/src/MongoDb/Polling/ConfigurationImplementation.cs b/src/MongoDb/Polling/ConfigurationImplementation.cs index 7bf07d7..5b57fbe 100644 --- a/src/MongoDb/Polling/ConfigurationImplementation.cs +++ b/src/MongoDb/Polling/ConfigurationImplementation.cs @@ -220,7 +220,7 @@ public void ConfigureCollection( collectionSettings, s.GetRequiredKeyedService>(key)(key, s))); - services.AddKeyedSingleton( + services.AddKeyedSingleton( key, (s, _) => s.GetRequiredKeyedService>(key)); diff --git a/src/MongoDb/Polling/OutboxBatchCompleter.cs b/src/MongoDb/Polling/OutboxBatchCompleter.cs index 3501ff0..53f5a02 100644 --- a/src/MongoDb/Polling/OutboxBatchCompleter.cs +++ b/src/MongoDb/Polling/OutboxBatchCompleter.cs @@ -5,7 +5,7 @@ namespace YakShaveFx.OutboxKit.MongoDb.Polling; -internal sealed class OutboxBatchCompleter : ICompleteRetrier where TMessage : IMessage +internal sealed class OutboxBatchCompleter : IProducedMessagesCompletionRetrier where TMessage : IMessage { private readonly IMongoCollection _collection; private readonly TimeProvider _timeProvider; @@ -43,7 +43,7 @@ public OutboxBatchCompleter( public Task CompleteAsync(IReadOnlyCollection messages, CancellationToken ct) => _complete(messages, ct); - Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) => _complete(messages, ct); private async Task CompleteDeleteAsync(IReadOnlyCollection messages, CancellationToken ct) diff --git a/src/MySql/Polling/BatchCompleter.cs b/src/MySql/Polling/BatchCompleter.cs index 2c76ddc..7362c78 100644 --- a/src/MySql/Polling/BatchCompleter.cs +++ b/src/MySql/Polling/BatchCompleter.cs @@ -6,7 +6,7 @@ namespace YakShaveFx.OutboxKit.MySql.Polling; // ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI -internal sealed class BatchCompleter : ICompleteRetrier +internal sealed class BatchCompleter : IProducedMessagesCompletionRetrier { private delegate MySqlCommand CompleteCommandFactory( IReadOnlyCollection ok, MySqlConnection connection, MySqlTransaction? tx); @@ -59,7 +59,7 @@ public async Task CompleteAsync( if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); } - async Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + async Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) { if (messages.Count <= 0) return; diff --git a/src/MySql/Polling/ConfigurationExtensions.cs b/src/MySql/Polling/ConfigurationExtensions.cs index c5382de..ec222c5 100644 --- a/src/MySql/Polling/ConfigurationExtensions.cs +++ b/src/MySql/Polling/ConfigurationExtensions.cs @@ -239,7 +239,7 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) s.GetRequiredKeyedService(key), s.GetRequiredService())); - services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); services .AddKeyedMySqlDataSource(key, _connectionString) diff --git a/src/PostgreSql/Polling/BatchCompleter.cs b/src/PostgreSql/Polling/BatchCompleter.cs index a3eb8fb..bf2150c 100644 --- a/src/PostgreSql/Polling/BatchCompleter.cs +++ b/src/PostgreSql/Polling/BatchCompleter.cs @@ -6,7 +6,7 @@ namespace YakShaveFx.OutboxKit.PostgreSql.Polling; // ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI -internal sealed class BatchCompleter : ICompleteRetrier +internal sealed class BatchCompleter : IProducedMessagesCompletionRetrier { private delegate NpgsqlCommand CompleteCommandFactory( IReadOnlyCollection ok, NpgsqlConnection connection, NpgsqlTransaction? tx); @@ -59,7 +59,7 @@ public async Task CompleteAsync( if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); } - async Task ICompleteRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + async Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) { if (messages.Count <= 0) return; diff --git a/src/PostgreSql/Polling/ConfigurationExtensions.cs b/src/PostgreSql/Polling/ConfigurationExtensions.cs index f1ca5fe..3c0cae5 100644 --- a/src/PostgreSql/Polling/ConfigurationExtensions.cs +++ b/src/PostgreSql/Polling/ConfigurationExtensions.cs @@ -238,7 +238,7 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) s.GetRequiredKeyedService(key), s.GetRequiredService())); - services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); services diff --git a/tests/MySql.Tests/Polling/BatchCompleterTests.cs b/tests/MySql.Tests/Polling/BatchCompleterTests.cs index c47ebfc..1a1c735 100644 --- a/tests/MySql.Tests/Polling/BatchCompleterTests.cs +++ b/tests/MySql.Tests/Polling/BatchCompleterTests.cs @@ -62,7 +62,7 @@ public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(Comple .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - ICompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IProducedMessagesCompletionRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); await sut.RetryCompleteAsync(messages, _ct); @@ -82,7 +82,7 @@ public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionI .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - ICompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IProducedMessagesCompletionRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); await CompleteMessagesAsync(messages, connection, completionMode); diff --git a/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs index aaee2f8..6204c10 100644 --- a/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs +++ b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs @@ -62,7 +62,7 @@ public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(Comple .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - ICompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IProducedMessagesCompletionRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); await sut.RetryCompleteAsync(messages, _ct); @@ -82,7 +82,7 @@ public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionI .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - ICompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IProducedMessagesCompletionRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); await CompleteMessagesAsync(messages, connection, completionMode); From 21fafd13f23432106a2585138d8e15d5c4bf4746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Fri, 20 Jun 2025 12:16:34 +0100 Subject: [PATCH 6/7] adding docs --- docs/building-a-provider/polling.md | 13 ++++++++++++- docs/building-a-provider/push.md | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/building-a-provider/polling.md b/docs/building-a-provider/polling.md index f6de812..cdce2e9 100644 --- a/docs/building-a-provider/polling.md +++ b/docs/building-a-provider/polling.md @@ -7,6 +7,7 @@ outline: deep To implement a polling provider, there are three main things you need to do: - implement the `IBatchFetcher` and `IBatchContext` interfaces +- implement the `IProducedMessagesCompletionRetrier` interface, so that OutboxKit can retry completing already produced messages in case of failures - implement the `IOutboxCleaner` interface (assuming you want to support the update processed messages feature) - implement OutboxKit setup, which includes collecting any configuration you require, and calling core's `WithPolling` method @@ -26,6 +27,16 @@ Additionally, `IBatchContext` implements the `IAsyncDisposable` interface, which In terms of lifetime, `IBatchFetcher` is created once per outbox, so should be registered in the DI container as a keyed singleton (more on the keyed part later). `IBatchContext` isn't fetched from the DI container, so it's up to you how you manage the lifetime. Normally, a new instance of `IBatchContext` would be created each time `FetchAndHoldAsync` is called, but if you have some good reasons, you could have a pool of `IBatchContext` instances, or even reuse the same instance. Just make sure there's no issues with reusing it across different fetches and outboxes. +## Implementing `IProducedMessagesCompletionRetrier` + +`IProducedMessagesCompletionRetrier` is an interface that allows OutboxKit to retry completing messages that were produced successfully, in case of failures when completing them. The goal is to minimize the amount of duplicate messages produced. + +This interface has a single method, `RetryCompleteAsync`, which takes the messages that were produced but have potentially failed to complete (no way to be sure, as an exception could occur immediately after the completion was persisted to the database). The implementation is likely similar to `IBatchContext.CompleteAsync`, though a bit of extra care has to be taken, to account for the fact that since the failure happened, the messages might have been completed by another running process. + +You don't need to implement any retry logic, just throw an exception if completion fails. The core library will handle the exception and keep calling `RetryCompleteAsync` until it returns successfully. + +Note that OutboxKit keeps the potentially failed to complete messages in memory, so this is a best effort approach to minimize duplicates. This means that if either the process restarts or another instance of the process is running, the messages will be produced again. + ## Implementing `IOutboxCleaner` `IOutboxCleaner` is a rather simple interface, with a single method, `CleanAsync`, which returns the amount of messages that were cleaned, with no arguments (other than the typical `CancellationToken`). @@ -40,6 +51,6 @@ To provide a simple way for library users to configure OutboxKit, your setup cod After you collect all the required configuration, calling `WithPolling` should include the aforementioned key, and something that implements `IPollingOutboxKitConfigurator`. This interface exposes a couple of methods: `ConfigureServices` and `GetCoreSettings`. -`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher` and `IOutboxCleaner` (you could use it for any other services you require, of course). +`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher`, `IProducedMessagesCompletionRetrier` and `IOutboxCleaner` (you could use it for any other services you require, of course). `GetCoreSettings` is invoked when the core library is setting stuff up, and requires the configuration you collected. diff --git a/docs/building-a-provider/push.md b/docs/building-a-provider/push.md index c6024c4..a242cd6 100644 --- a/docs/building-a-provider/push.md +++ b/docs/building-a-provider/push.md @@ -4,4 +4,4 @@ outline: deep # Push -🚧 coming soon +🚧 coming soon (maybe 😂) From 7221ef1c49b308a5e417873f858d88338ddb89a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Sat, 28 Jun 2025 13:22:04 +0100 Subject: [PATCH 7/7] cleaning up and simplifying names --- docs/building-a-provider/polling.md | 14 ++-- src/Core/OpenTelemetry/ActivityShared.cs | 12 +++- src/Core/OpenTelemetry/CleanerMetrics.cs | 8 +-- .../OpenTelemetry/CompletionRetrierMetrics.cs | 52 ++++++++++++++ .../Polling/CompleteProduceMessagesRetrier.cs | 48 ------------- src/Core/Polling/CompletionRetrier.cs | 70 +++++++++++++++++++ ...ionRetrier.cs => IBatchCompleteRetrier.cs} | 6 +- src/Core/Polling/PollingBackgroundService.cs | 4 +- src/Core/Polling/PollingProducer.cs | 4 +- src/Core/ServiceCollectionExtensions.cs | 21 +++--- ...boxBatchCompleter.cs => BatchCompleter.cs} | 7 +- ...{OutboxBatchFetcher.cs => BatchFetcher.cs} | 10 +-- .../Polling/ConfigurationImplementation.cs | 8 +-- src/MySql/Polling/BatchCompleter.cs | 4 +- src/MySql/Polling/ConfigurationExtensions.cs | 2 +- src/PostgreSql/Polling/BatchCompleter.cs | 4 +- .../Polling/ConfigurationExtensions.cs | 2 +- tests/Core.Tests/OpenTelemetryHelpers.cs | 2 +- .../Polling/PollingBackgroundServiceTests.cs | 28 ++++---- .../Polling/PollingProducerTests.cs | 14 ++-- .../Polling/BatchFetcherTests.cs | 12 ++-- .../Polling/BatchCompleterTests.cs | 8 +-- .../Polling/BatchCompleterTests.cs | 8 +-- 23 files changed, 215 insertions(+), 133 deletions(-) create mode 100644 src/Core/OpenTelemetry/CompletionRetrierMetrics.cs delete mode 100644 src/Core/Polling/CompleteProduceMessagesRetrier.cs create mode 100644 src/Core/Polling/CompletionRetrier.cs rename src/Core/Polling/{IProducedMessagesCompletionRetrier.cs => IBatchCompleteRetrier.cs} (69%) rename src/MongoDb/Polling/{OutboxBatchCompleter.cs => BatchCompleter.cs} (93%) rename src/MongoDb/Polling/{OutboxBatchFetcher.cs => BatchFetcher.cs} (92%) diff --git a/docs/building-a-provider/polling.md b/docs/building-a-provider/polling.md index cdce2e9..b7b9460 100644 --- a/docs/building-a-provider/polling.md +++ b/docs/building-a-provider/polling.md @@ -7,7 +7,7 @@ outline: deep To implement a polling provider, there are three main things you need to do: - implement the `IBatchFetcher` and `IBatchContext` interfaces -- implement the `IProducedMessagesCompletionRetrier` interface, so that OutboxKit can retry completing already produced messages in case of failures +- implement the `IBatchCompleteRetrier` interface, so that OutboxKit can retry completing already produced messages in case of failures - implement the `IOutboxCleaner` interface (assuming you want to support the update processed messages feature) - implement OutboxKit setup, which includes collecting any configuration you require, and calling core's `WithPolling` method @@ -27,16 +27,18 @@ Additionally, `IBatchContext` implements the `IAsyncDisposable` interface, which In terms of lifetime, `IBatchFetcher` is created once per outbox, so should be registered in the DI container as a keyed singleton (more on the keyed part later). `IBatchContext` isn't fetched from the DI container, so it's up to you how you manage the lifetime. Normally, a new instance of `IBatchContext` would be created each time `FetchAndHoldAsync` is called, but if you have some good reasons, you could have a pool of `IBatchContext` instances, or even reuse the same instance. Just make sure there's no issues with reusing it across different fetches and outboxes. -## Implementing `IProducedMessagesCompletionRetrier` +## Implementing `IBatchCompleteRetrier` -`IProducedMessagesCompletionRetrier` is an interface that allows OutboxKit to retry completing messages that were produced successfully, in case of failures when completing them. The goal is to minimize the amount of duplicate messages produced. +`IBatchCompleteRetrier` is an interface that allows OutboxKit to retry completing messages that were produced successfully, in case of failures when completing them. The goal is to minimize the amount of duplicate messages produced. -This interface has a single method, `RetryCompleteAsync`, which takes the messages that were produced but have potentially failed to complete (no way to be sure, as an exception could occur immediately after the completion was persisted to the database). The implementation is likely similar to `IBatchContext.CompleteAsync`, though a bit of extra care has to be taken, to account for the fact that since the failure happened, the messages might have been completed by another running process. +This interface has a single method, `RetryAsync`, which takes the messages that were produced but have potentially failed to complete (no way to be sure, as an exception could occur immediately after the completion was persisted to the database). The implementation is likely similar to `IBatchContext.CompleteAsync`, though a bit of extra care has to be taken, to account for the fact that since the failure happened, the messages might have been completed by another running process. -You don't need to implement any retry logic, just throw an exception if completion fails. The core library will handle the exception and keep calling `RetryCompleteAsync` until it returns successfully. +You don't need to implement any retry logic, just let exceptions bubble up or throw your own to signal completion fail. The core library will handle the exception and keep calling `RetryAsync` until it returns successfully. Note that OutboxKit keeps the potentially failed to complete messages in memory, so this is a best effort approach to minimize duplicates. This means that if either the process restarts or another instance of the process is running, the messages will be produced again. +In terms of lifetime, `IBatchCompleteRetrier`, like `IBatchFetcher`, is created once per outbox, so should be registered in the DI container as a keyed singleton. + ## Implementing `IOutboxCleaner` `IOutboxCleaner` is a rather simple interface, with a single method, `CleanAsync`, which returns the amount of messages that were cleaned, with no arguments (other than the typical `CancellationToken`). @@ -51,6 +53,6 @@ To provide a simple way for library users to configure OutboxKit, your setup cod After you collect all the required configuration, calling `WithPolling` should include the aforementioned key, and something that implements `IPollingOutboxKitConfigurator`. This interface exposes a couple of methods: `ConfigureServices` and `GetCoreSettings`. -`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher`, `IProducedMessagesCompletionRetrier` and `IOutboxCleaner` (you could use it for any other services you require, of course). +`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher`, `IBatchCompleteRetrier` and `IOutboxCleaner` (you could use it for any other services you require, of course). `GetCoreSettings` is invoked when the core library is setting stuff up, and requires the configuration you collected. diff --git a/src/Core/OpenTelemetry/ActivityShared.cs b/src/Core/OpenTelemetry/ActivityShared.cs index 0d8e674..bf85cd3 100644 --- a/src/Core/OpenTelemetry/ActivityShared.cs +++ b/src/Core/OpenTelemetry/ActivityShared.cs @@ -8,7 +8,13 @@ internal static class ActivityHelpers Constants.ActivitySourceName, typeof(ActivityHelpers).Assembly.GetName().Version!.ToString()); - public static Activity? StartActivity(string activityName, OutboxKey key) + public static Activity? StartActivity(string activityName, OutboxKey key) + => StartActivity(activityName, key, []); + + public static Activity? StartActivity( + string activityName, + OutboxKey key, + ReadOnlySpan> tags) { if (!ActivitySource.HasListeners()) { @@ -22,7 +28,8 @@ internal static class ActivityHelpers tags: [ new(ActivityConstants.OutboxProviderKeyTag, key.ProviderKey), - new(ActivityConstants.OutboxClientKeyTag, key.ClientKey) + new(ActivityConstants.OutboxClientKeyTag, key.ClientKey), + ..tags ]); } } @@ -33,4 +40,5 @@ internal static class ActivityConstants public const string OutboxClientKeyTag = "outbox.client_key"; public const string OutboxBatchSizeTag = "outbox.batch.size"; public const string OutboxCleanedCountTag = "outbox.cleaned.count"; + public const string OutboxProducedMessagesToCompleteTag = "outbox.produced_messages_to_complete"; } \ No newline at end of file diff --git a/src/Core/OpenTelemetry/CleanerMetrics.cs b/src/Core/OpenTelemetry/CleanerMetrics.cs index 0162086..17c8f1b 100644 --- a/src/Core/OpenTelemetry/CleanerMetrics.cs +++ b/src/Core/OpenTelemetry/CleanerMetrics.cs @@ -6,13 +6,13 @@ namespace YakShaveFx.OutboxKit.Core.OpenTelemetry; internal sealed class CleanerMetrics : IDisposable { private readonly Meter _meter; - private readonly Counter _producedMessagesCounter; + private readonly Counter _cleanedMessagesCounter; public CleanerMetrics(IMeterFactory meterFactory) { _meter = meterFactory.Create(Constants.MeterName); - _producedMessagesCounter = _meter.CreateCounter( + _cleanedMessagesCounter = _meter.CreateCounter( "outbox.cleaned_messages", unit: "{message}", description: "The number processed outbox messages cleaned"); @@ -20,14 +20,14 @@ public CleanerMetrics(IMeterFactory meterFactory) public void MessagesCleaned(OutboxKey key, int count) { - if (_producedMessagesCounter.Enabled && count > 0) + if (_cleanedMessagesCounter.Enabled && count > 0) { var tags = new TagList { { "provider_key", key.ProviderKey }, { "client_key", key.ClientKey } }; - _producedMessagesCounter.Add(count, tags); + _cleanedMessagesCounter.Add(count, tags); } } diff --git a/src/Core/OpenTelemetry/CompletionRetrierMetrics.cs b/src/Core/OpenTelemetry/CompletionRetrierMetrics.cs new file mode 100644 index 0000000..90049ca --- /dev/null +++ b/src/Core/OpenTelemetry/CompletionRetrierMetrics.cs @@ -0,0 +1,52 @@ +using System.Diagnostics; +using System.Diagnostics.Metrics; + +namespace YakShaveFx.OutboxKit.Core.OpenTelemetry; + +internal sealed class CompletionRetrierMetrics : IDisposable +{ + private readonly Meter _meter; + private readonly Counter _completionRetryAttemptsCounter; + private readonly Counter _completionRetriedMessagesCounter; + + public CompletionRetrierMetrics(IMeterFactory meterFactory) + { + _meter = meterFactory.Create(Constants.MeterName); + + _completionRetryAttemptsCounter = _meter.CreateCounter( + "outbox.completion_retry_attempts", + unit: "{attempt}", + description: "The number of attempts to retry completion of produced messages"); + + _completionRetriedMessagesCounter = _meter.CreateCounter( + "outbox.completion_retried_messages", + unit: "{message}", + description: "The number of messages for which completion was retried"); + + } + + public void CompletionRetryAttempted(OutboxKey key, int count) + { + if (_completionRetryAttemptsCounter.Enabled && count > 0) + { + var tags = new TagList + { + { "provider_key", key.ProviderKey }, + { "client_key", key.ClientKey } + }; + _completionRetryAttemptsCounter.Add(1, tags); + } + + if (_completionRetriedMessagesCounter.Enabled && count > 0) + { + var tags = new TagList + { + { "provider_key", key.ProviderKey }, + { "client_key", key.ClientKey } + }; + _completionRetriedMessagesCounter.Add(count, tags); + } + } + + public void Dispose() => _meter.Dispose(); +} \ No newline at end of file diff --git a/src/Core/Polling/CompleteProduceMessagesRetrier.cs b/src/Core/Polling/CompleteProduceMessagesRetrier.cs deleted file mode 100644 index 6ad422e..0000000 --- a/src/Core/Polling/CompleteProduceMessagesRetrier.cs +++ /dev/null @@ -1,48 +0,0 @@ -namespace YakShaveFx.OutboxKit.Core.Polling; - -internal interface ICollectProducedMessagesToRetryCompletion -{ - void Collect(IReadOnlyCollection messages); -} - -internal interface IRetryCompletionOfProducedMessages -{ - ValueTask RetryCompleteAsync(CancellationToken ct); -} - -// not thread safe, as it is only used in the context of a producing flow, which has no concurrency -internal sealed class CompleteProduceMessagesRetrier( - IProducedMessagesCompletionRetrier completionRetrier, - RetrierBuilderFactory retrierBuilderFactory) - : ICollectProducedMessagesToRetryCompletion, IRetryCompletionOfProducedMessages -{ - private readonly Retrier _retrier = retrierBuilderFactory.Create() - .WithMaxRetries(int.MaxValue) - .WithShouldRetryDecider(ex => - { - // retry on all exceptions except cancellation - if (ex is OperationCanceledException oce) return oce.CancellationToken == CancellationToken.None; - return true; - }) - .Build(); - - private List _messages = new(); - - public void Collect(IReadOnlyCollection messages) => _messages.AddRange(messages); - - public ValueTask RetryCompleteAsync(CancellationToken ct) - => _messages.Count == 0 - ? ValueTask.CompletedTask - : new(InnerRetryCompleteAsync(ct)); - - private async Task InnerRetryCompleteAsync(CancellationToken ct) - { - await _retrier.ExecuteWithRetryAsync( - () => completionRetrier.RetryCompleteAsync(_messages, ct), - ct); - - // since most of the time there are no messages to retry, we clear messages by creating a new list, - // so the old one can be garbage collected, avoiding the underlying array to be kept in memory - _messages = new(); - } -} \ No newline at end of file diff --git a/src/Core/Polling/CompletionRetrier.cs b/src/Core/Polling/CompletionRetrier.cs new file mode 100644 index 0000000..8f1fb1e --- /dev/null +++ b/src/Core/Polling/CompletionRetrier.cs @@ -0,0 +1,70 @@ +using System.Diagnostics; +using YakShaveFx.OutboxKit.Core.OpenTelemetry; + +namespace YakShaveFx.OutboxKit.Core.Polling; + +internal interface ICompletionRetryCollector +{ + void Collect(IReadOnlyCollection messages); +} + +internal interface ICompletionRetrier +{ + ValueTask RetryAsync(CancellationToken ct); +} + +// not thread safe, as it is only used in the context of a producing flow, which has no concurrency +internal sealed class CompletionRetrier( + OutboxKey key, + IBatchCompleteRetrier providerCompletionRetrier, + RetrierBuilderFactory retrierBuilderFactory, + CompletionRetrierMetrics metrics) + : ICompletionRetryCollector, ICompletionRetrier +{ + private readonly Retrier _retrier = retrierBuilderFactory.Create() + .WithMaxRetries(int.MaxValue) + .WithShouldRetryDecider(ex => + { + // retry on all exceptions except cancellation + if (ex is OperationCanceledException oce) return oce.CancellationToken == CancellationToken.None; + return true; + }) + .Build(); + + private List _messages = []; + + public void Collect(IReadOnlyCollection messages) => _messages.AddRange(messages); + + public ValueTask RetryAsync(CancellationToken ct) + => _messages.Count == 0 + ? ValueTask.CompletedTask + : new(InnerRetryCompleteAsync(ct)); + + private async Task InnerRetryCompleteAsync(CancellationToken ct) + { + await _retrier.ExecuteWithRetryAsync( + async () => + { + metrics.CompletionRetryAttempted(key, _messages.Count); + using var activity = ActivityHelpers.StartActivity( + "retrying produced messages completion", + key, + [new(ActivityConstants.OutboxProducedMessagesToCompleteTag, _messages.Count)]); + + try + { + await providerCompletionRetrier.RetryAsync(_messages, ct); + } + catch (Exception) + { + activity?.SetStatus(ActivityStatusCode.Error); + throw; + } + }, + ct); + + // since most of the time there are no messages to retry, we clear messages by creating a new list, + // so the old one can be garbage collected, avoiding the underlying array to be kept in memory + _messages = []; + } +} \ No newline at end of file diff --git a/src/Core/Polling/IProducedMessagesCompletionRetrier.cs b/src/Core/Polling/IBatchCompleteRetrier.cs similarity index 69% rename from src/Core/Polling/IProducedMessagesCompletionRetrier.cs rename to src/Core/Polling/IBatchCompleteRetrier.cs index 3eb5005..6bd21f7 100644 --- a/src/Core/Polling/IProducedMessagesCompletionRetrier.cs +++ b/src/Core/Polling/IBatchCompleteRetrier.cs @@ -3,13 +3,13 @@ namespace YakShaveFx.OutboxKit.Core.Polling; /// /// Interface to be implemented by library users, to make it possible to retry completing messages already produced. /// -public interface IProducedMessagesCompletionRetrier +public interface IBatchCompleteRetrier { /// - /// Completes the given collection of messages. + /// Retries completing the given collection of messages. /// /// The messages that were previously successfully produced. /// The async cancellation token. /// The task representing the asynchronous operation - Task RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct); + Task RetryAsync(IReadOnlyCollection messages, CancellationToken ct); } \ No newline at end of file diff --git a/src/Core/Polling/PollingBackgroundService.cs b/src/Core/Polling/PollingBackgroundService.cs index b35ef44..7402d86 100644 --- a/src/Core/Polling/PollingBackgroundService.cs +++ b/src/Core/Polling/PollingBackgroundService.cs @@ -9,7 +9,7 @@ internal sealed partial class PollingBackgroundService( IPollingProducer producer, TimeProvider timeProvider, CorePollingSettings settings, - IRetryCompletionOfProducedMessages completeRetrier, + ICompletionRetrier completionRetrier, ILogger logger) : BackgroundService { private readonly TimeSpan _pollingInterval = settings.PollingInterval; @@ -24,7 +24,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - await completeRetrier.RetryCompleteAsync(stoppingToken); + await completionRetrier.RetryAsync(stoppingToken); try { diff --git a/src/Core/Polling/PollingProducer.cs b/src/Core/Polling/PollingProducer.cs index a7fc069..0a519aa 100644 --- a/src/Core/Polling/PollingProducer.cs +++ b/src/Core/Polling/PollingProducer.cs @@ -13,7 +13,7 @@ internal sealed partial class PollingProducer( OutboxKey key, IBatchFetcher fetcher, IBatchProducer producer, - ICollectProducedMessagesToRetryCompletion completeRetryCollector, + ICompletionRetryCollector completionRetryCollector, ProducerMetrics metrics, ILogger logger) : IPollingProducer { @@ -53,7 +53,7 @@ private async Task ProduceBatchAsync(CancellationToken ct) catch (Exception ex) { LogCompletionUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex); - completeRetryCollector.Collect(result.Ok); + completionRetryCollector.Collect(result.Ok); // return false to break the loop, as we don't want to produce more messages until we're able to complete the batch return false; diff --git a/src/Core/ServiceCollectionExtensions.cs b/src/Core/ServiceCollectionExtensions.cs index 3ab9469..1aacf0b 100644 --- a/src/Core/ServiceCollectionExtensions.cs +++ b/src/Core/ServiceCollectionExtensions.cs @@ -39,6 +39,7 @@ public static IServiceCollection AddOutboxKit( private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitConfigurator configurator) { services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); if (configurator.PollingConfigurators.Count == 1) @@ -75,26 +76,28 @@ private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitCo s.GetRequiredKeyedService(key), s.GetRequiredService(), corePollingSettings, - s.GetRequiredKeyedService(key), + s.GetRequiredKeyedService(key), s.GetRequiredService>())); services.AddKeyedSingleton( key, - (s, _) => new CompleteProduceMessagesRetrier( - s.GetRequiredKeyedService(key), - s.GetRequiredService())); + (s, _) => new CompletionRetrier( + key, + s.GetRequiredKeyedService(key), + s.GetRequiredService(), + s.GetRequiredService())); - services.AddKeyedSingleton(key, - (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, + (s, _) => s.GetRequiredKeyedService(key)); - services.AddKeyedSingleton(key, - (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, + (s, _) => s.GetRequiredKeyedService(key)); services.AddKeyedSingleton(key, (s, _) => new PollingProducer( key, s.GetRequiredKeyedService(key), s.GetRequiredService(), - s.GetRequiredKeyedService(key), + s.GetRequiredKeyedService(key), s.GetRequiredService(), s.GetRequiredService>())); diff --git a/src/MongoDb/Polling/OutboxBatchCompleter.cs b/src/MongoDb/Polling/BatchCompleter.cs similarity index 93% rename from src/MongoDb/Polling/OutboxBatchCompleter.cs rename to src/MongoDb/Polling/BatchCompleter.cs index 53f5a02..6a7be84 100644 --- a/src/MongoDb/Polling/OutboxBatchCompleter.cs +++ b/src/MongoDb/Polling/BatchCompleter.cs @@ -5,7 +5,7 @@ namespace YakShaveFx.OutboxKit.MongoDb.Polling; -internal sealed class OutboxBatchCompleter : IProducedMessagesCompletionRetrier where TMessage : IMessage +internal sealed class BatchCompleter : IBatchCompleteRetrier where TMessage : IMessage { private readonly IMongoCollection _collection; private readonly TimeProvider _timeProvider; @@ -15,8 +15,7 @@ internal sealed class OutboxBatchCompleter : IProducedMessagesCom private readonly Expression>? _processedAtSelector; private readonly Func, CancellationToken, Task> _complete; - public OutboxBatchCompleter( - OutboxKey key, + public BatchCompleter( MongoDbPollingSettings pollingSettings, MongoDbPollingCollectionSettings collectionSettings, IMongoDatabase db, @@ -43,7 +42,7 @@ public OutboxBatchCompleter( public Task CompleteAsync(IReadOnlyCollection messages, CancellationToken ct) => _complete(messages, ct); - Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + Task IBatchCompleteRetrier.RetryAsync(IReadOnlyCollection messages, CancellationToken ct) => _complete(messages, ct); private async Task CompleteDeleteAsync(IReadOnlyCollection messages, CancellationToken ct) diff --git a/src/MongoDb/Polling/OutboxBatchFetcher.cs b/src/MongoDb/Polling/BatchFetcher.cs similarity index 92% rename from src/MongoDb/Polling/OutboxBatchFetcher.cs rename to src/MongoDb/Polling/BatchFetcher.cs index 09e9766..e3ca38e 100644 --- a/src/MongoDb/Polling/OutboxBatchFetcher.cs +++ b/src/MongoDb/Polling/BatchFetcher.cs @@ -6,7 +6,7 @@ namespace YakShaveFx.OutboxKit.MongoDb.Polling; // ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI -internal sealed class OutboxBatchFetcher : IBatchFetcher where TMessage : IMessage +internal sealed class BatchFetcher : IBatchFetcher where TMessage : IMessage { private readonly IMongoCollection _collection; private readonly DistributedLockThingy _lockThingy; @@ -18,16 +18,16 @@ internal sealed class OutboxBatchFetcher : IBatchFetcher where TM private readonly FilterDefinition _findFilter; private readonly SortDefinition _sort; private readonly Func> _hasNext; - private readonly OutboxBatchCompleter _completer; + private readonly BatchCompleter _completer; - public OutboxBatchFetcher( + public BatchFetcher( OutboxKey key, MongoDbPollingSettings pollingSettings, MongoDbPollingCollectionSettings collectionSettings, MongoDbPollingDistributedLockSettings lockSettings, IMongoDatabase db, DistributedLockThingy lockThingy, - OutboxBatchCompleter completer) + BatchCompleter completer) { _lockId = lockSettings.Id; _lockOwner = lockSettings.Owner; @@ -91,7 +91,7 @@ private async Task> FetchMessagesAsync(Cancellatio private class BatchContext( IReadOnlyCollection messages, - OutboxBatchCompleter completer, + BatchCompleter completer, Func> hasNext, IDistributedLock @lock) : IBatchContext { diff --git a/src/MongoDb/Polling/ConfigurationImplementation.cs b/src/MongoDb/Polling/ConfigurationImplementation.cs index 5b57fbe..15d1dd8 100644 --- a/src/MongoDb/Polling/ConfigurationImplementation.cs +++ b/src/MongoDb/Polling/ConfigurationImplementation.cs @@ -213,20 +213,20 @@ public void ConfigureCollection( services.AddKeyedSingleton( key, - (s, _) => ActivatorUtilities.CreateInstance>( + (s, _) => ActivatorUtilities.CreateInstance>( s, key, pollingSettings, collectionSettings, s.GetRequiredKeyedService>(key)(key, s))); - services.AddKeyedSingleton( + services.AddKeyedSingleton( key, - (s, _) => s.GetRequiredKeyedService>(key)); + (s, _) => s.GetRequiredKeyedService>(key)); services.AddKeyedSingleton( key, - (s, _) => ActivatorUtilities.CreateInstance>( + (s, _) => ActivatorUtilities.CreateInstance>( s, key, pollingSettings, diff --git a/src/MySql/Polling/BatchCompleter.cs b/src/MySql/Polling/BatchCompleter.cs index 7362c78..3addea9 100644 --- a/src/MySql/Polling/BatchCompleter.cs +++ b/src/MySql/Polling/BatchCompleter.cs @@ -6,7 +6,7 @@ namespace YakShaveFx.OutboxKit.MySql.Polling; // ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI -internal sealed class BatchCompleter : IProducedMessagesCompletionRetrier +internal sealed class BatchCompleter : IBatchCompleteRetrier { private delegate MySqlCommand CompleteCommandFactory( IReadOnlyCollection ok, MySqlConnection connection, MySqlTransaction? tx); @@ -59,7 +59,7 @@ public async Task CompleteAsync( if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); } - async Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + async Task IBatchCompleteRetrier.RetryAsync(IReadOnlyCollection messages, CancellationToken ct) { if (messages.Count <= 0) return; diff --git a/src/MySql/Polling/ConfigurationExtensions.cs b/src/MySql/Polling/ConfigurationExtensions.cs index ec222c5..45e1541 100644 --- a/src/MySql/Polling/ConfigurationExtensions.cs +++ b/src/MySql/Polling/ConfigurationExtensions.cs @@ -239,7 +239,7 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) s.GetRequiredKeyedService(key), s.GetRequiredService())); - services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); services .AddKeyedMySqlDataSource(key, _connectionString) diff --git a/src/PostgreSql/Polling/BatchCompleter.cs b/src/PostgreSql/Polling/BatchCompleter.cs index bf2150c..7d69a6a 100644 --- a/src/PostgreSql/Polling/BatchCompleter.cs +++ b/src/PostgreSql/Polling/BatchCompleter.cs @@ -6,7 +6,7 @@ namespace YakShaveFx.OutboxKit.PostgreSql.Polling; // ReSharper disable once ClassNeverInstantiated.Global - automagically instantiated by DI -internal sealed class BatchCompleter : IProducedMessagesCompletionRetrier +internal sealed class BatchCompleter : IBatchCompleteRetrier { private delegate NpgsqlCommand CompleteCommandFactory( IReadOnlyCollection ok, NpgsqlConnection connection, NpgsqlTransaction? tx); @@ -59,7 +59,7 @@ public async Task CompleteAsync( if (completed != messages.Count) throw new InvalidOperationException("Failed to complete messages"); } - async Task IProducedMessagesCompletionRetrier.RetryCompleteAsync(IReadOnlyCollection messages, CancellationToken ct) + async Task IBatchCompleteRetrier.RetryAsync(IReadOnlyCollection messages, CancellationToken ct) { if (messages.Count <= 0) return; diff --git a/src/PostgreSql/Polling/ConfigurationExtensions.cs b/src/PostgreSql/Polling/ConfigurationExtensions.cs index 3c0cae5..20e96ab 100644 --- a/src/PostgreSql/Polling/ConfigurationExtensions.cs +++ b/src/PostgreSql/Polling/ConfigurationExtensions.cs @@ -238,7 +238,7 @@ public void ConfigureServices(OutboxKey key, IServiceCollection services) s.GetRequiredKeyedService(key), s.GetRequiredService())); - services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); + services.AddKeyedSingleton(key, (s, _) => s.GetRequiredKeyedService(key)); services diff --git a/tests/Core.Tests/OpenTelemetryHelpers.cs b/tests/Core.Tests/OpenTelemetryHelpers.cs index 9b6b462..119b8fa 100644 --- a/tests/Core.Tests/OpenTelemetryHelpers.cs +++ b/tests/Core.Tests/OpenTelemetryHelpers.cs @@ -8,7 +8,7 @@ public static class OpenTelemetryHelpers public static IMeterFactory CreateMeterFactoryStub() { var factory = Substitute.For(); - factory.Create(default!).ReturnsForAnyArgs(m => new Meter(m.ArgAt(0))); + factory.Create(Arg.Any()).ReturnsForAnyArgs(m => new Meter(m.ArgAt(0))); return factory; } } \ No newline at end of file diff --git a/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs b/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs index 2bad20a..ebd19a6 100644 --- a/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs +++ b/tests/Core.Tests/Polling/PollingBackgroundServiceTests.cs @@ -13,7 +13,7 @@ public class PollingBackgroundServiceTests private readonly Listener _listener = new(); private readonly FakeTimeProvider _timeProvider = new(); private readonly CorePollingSettings _settings = new(); - private readonly IRetryCompletionOfProducedMessages _completeRetrierStub = new CompleteRetrierStub(); + private readonly ICompletionRetrier _completionRetrierStub = new CompleteRetrierStub(); private readonly CancellationToken _ct = TestContext.Current.CancellationToken; [Fact] @@ -21,7 +21,7 @@ public async Task WhenServiceStartsTheProducerIsInvoked() { var producerSpy = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -34,7 +34,7 @@ public async Task UntilPollingIntervalIsReachedTheProducerIsNotInvokedAgain() { var producerSpy = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -51,7 +51,7 @@ public async Task WhenPollingIntervalIsReachedThenTheProducerIsInvokedAgain() { var producerSpy = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -69,7 +69,7 @@ public async Task WhenListenerIsTriggeredThenTheProducerIsInvokedAgain() { var producerSpy = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run and block @@ -86,7 +86,7 @@ public async Task WhenCancellationTokenIsSignaledThenTheServiceStops() { var producerStub = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerStub, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); var cts = new CancellationTokenSource(); @@ -108,7 +108,7 @@ public async Task WhenTheProducerThrowsTheServiceRemainsRunning() .Throw(new InvalidOperationException("test")); var sut = new PollingBackgroundService(Key, _listener, producerMock, _timeProvider, _settings, - _completeRetrierStub, Logger); + _completionRetrierStub, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); // give it a bit to run and block @@ -120,14 +120,14 @@ public async Task WhenTheProducerThrowsTheServiceRemainsRunning() public async Task WhenThereAreMessagesToRetryCompletingThenTheRetrierIsInvoked() { var producerStub = Substitute.For(); - var retrierSpy = Substitute.For(); + var retrierSpy = Substitute.For(); var sut = new PollingBackgroundService(Key, _listener, producerStub, _timeProvider, _settings, retrierSpy, Logger); await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run - await retrierSpy.Received(1).RetryCompleteAsync(Arg.Any()); + await retrierSpy.Received(1).RetryAsync(Arg.Any()); } [Fact] @@ -135,9 +135,9 @@ public async Task WhenThereAreMessagesToRetryCompletingThenTheProducerIsNotInvok { var retryCompletionSource = new TaskCompletionSource(); var producerSpy = Substitute.For(); - var retrierMock = Substitute.For(); + var retrierMock = Substitute.For(); #pragma warning disable CA2012 - mocking, not actually calling the method - retrierMock.RetryCompleteAsync(Arg.Any()).Returns(new ValueTask(retryCompletionSource.Task)); + retrierMock.RetryAsync(Arg.Any()).Returns(new ValueTask(retryCompletionSource.Task)); #pragma warning restore CA2012 var sut = new PollingBackgroundService(Key, _listener, producerSpy, _timeProvider, _settings, @@ -146,7 +146,7 @@ public async Task WhenThereAreMessagesToRetryCompletingThenTheProducerIsNotInvok await sut.StartAsync(CancellationToken.None); await Task.Delay(TimeSpan.FromMilliseconds(100), _ct); // give it a bit to run - await retrierMock.Received(1).RetryCompleteAsync(Arg.Any()); + await retrierMock.Received(1).RetryAsync(Arg.Any()); await producerSpy.DidNotReceive().ProducePendingAsync(Arg.Any()); retryCompletionSource.SetResult(); @@ -155,7 +155,7 @@ public async Task WhenThereAreMessagesToRetryCompletingThenTheProducerIsNotInvok } } -file sealed class CompleteRetrierStub : IRetryCompletionOfProducedMessages +file sealed class CompleteRetrierStub : ICompletionRetrier { - public ValueTask RetryCompleteAsync(CancellationToken ct) => ValueTask.CompletedTask; + public ValueTask RetryAsync(CancellationToken ct) => ValueTask.CompletedTask; } \ No newline at end of file diff --git a/tests/Core.Tests/Polling/PollingProducerTests.cs b/tests/Core.Tests/Polling/PollingProducerTests.cs index cdc180e..09fabf8 100644 --- a/tests/Core.Tests/Polling/PollingProducerTests.cs +++ b/tests/Core.Tests/Polling/PollingProducerTests.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; +using NSubstitute.Core.Arguments; using YakShaveFx.OutboxKit.Core.OpenTelemetry; using YakShaveFx.OutboxKit.Core.Polling; using static YakShaveFx.OutboxKit.Core.Tests.OpenTelemetryHelpers; @@ -12,15 +13,14 @@ public class PollingProducerTests private static readonly ProducerMetrics Metrics = new(CreateMeterFactoryStub()); private static readonly NullLogger Logger = NullLogger.Instance; - private static readonly ICollectProducedMessagesToRetryCompletion CompleteRetrierStub = - new CompleteRetryCollectorStub(); + private static readonly ICompletionRetryCollector CompleteRetryStub = new CompleteRetryRetryCollectorStub(); [Fact] public async Task WhenBatchIsEmptyThenProducerIsNotInvoked() { var producerSpy = CreateProducer(); var fetcherStub = new BatchFetcherStub([new BatchContextStub([], false)]); - var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetrierStub, Metrics, Logger); + var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetryStub, Metrics, Logger); await sut.ProducePendingAsync(CancellationToken.None); @@ -37,7 +37,7 @@ public async Task WhileThereAreAvailableBatchesProducerIsInvoked(int numberOfBat { var producerSpy = CreateProducer(); var fetcherStub = new BatchFetcherStub(CreateBatchContexts(numberOfBatches)); - var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetrierStub, Metrics, Logger); + var sut = new PollingProducer(Key, fetcherStub, producerSpy, CompleteRetryStub, Metrics, Logger); await sut.ProducePendingAsync(CancellationToken.None); @@ -51,7 +51,7 @@ public async Task WhenCompletingBatchThenMessagesAreCollectedForRetry() { var producerSpy = CreateProducer(); var fetcherStub = new BatchFetcherStub([new BatchContextStub([new MessageStub()], false, true)]); - var collectorSpy = Substitute.For(); + var collectorSpy = Substitute.For(); var sut = new PollingProducer(Key, fetcherStub, producerSpy, collectorSpy, Metrics, Logger); await sut.ProducePendingAsync(CancellationToken.None); @@ -72,7 +72,7 @@ private static IBatchProducer CreateProducer() { var producerSpy = Substitute.For(); producerSpy - .ProduceAsync(default!, default!, default) + .ProduceAsync(Arg.Any(), Arg.Any>(), Arg.Any()) .ReturnsForAnyArgs(args => Task.FromResult(new BatchProduceResult { Ok = (IReadOnlyCollection)args[1] })); return producerSpy; @@ -104,7 +104,7 @@ public Task CompleteAsync(IReadOnlyCollection ok, CancellationToken ct public ValueTask DisposeAsync() => ValueTask.CompletedTask; } -file sealed class CompleteRetryCollectorStub : ICollectProducedMessagesToRetryCompletion +file sealed class CompleteRetryRetryCollectorStub : ICompletionRetryCollector { public void Collect(IReadOnlyCollection messages) { diff --git a/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs b/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs index 2238746..7d72b8a 100644 --- a/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs +++ b/tests/MongoDb.Tests/Polling/BatchFetcherTests.cs @@ -151,7 +151,7 @@ private IBatchFetcher CreateSut( TimeProvider timeProvider) => completionMode switch { - CompletionMode.Delete => new OutboxBatchFetcher( + CompletionMode.Delete => new BatchFetcher( MongoDbPollingProvider.CreateKey("test"), Defaults.Delete.MongoDbPollingSettings, Defaults.Delete.CollectionConfig, @@ -168,13 +168,11 @@ private IBatchFetcher CreateSut( _db, timeProvider, NullLogger.Instance), - new OutboxBatchCompleter( - MongoDbPollingProvider.CreateKey("test"), - Defaults.Delete.MongoDbPollingSettings, + new BatchCompleter(Defaults.Delete.MongoDbPollingSettings, Defaults.Delete.CollectionConfig, _db, timeProvider)), - CompletionMode.Update => new OutboxBatchFetcher( + CompletionMode.Update => new BatchFetcher( MongoDbPollingProvider.CreateKey("test"), Defaults.Update.MongoDbPollingSettings, Defaults.Update.CollectionConfigWithProcessedAt, @@ -191,9 +189,7 @@ private IBatchFetcher CreateSut( _db, timeProvider, NullLogger.Instance), - new OutboxBatchCompleter( - MongoDbPollingProvider.CreateKey("test"), - Defaults.Update.MongoDbPollingSettings, + new BatchCompleter(Defaults.Update.MongoDbPollingSettings, Defaults.Update.CollectionConfigWithProcessedAt, _db, timeProvider)), diff --git a/tests/MySql.Tests/Polling/BatchCompleterTests.cs b/tests/MySql.Tests/Polling/BatchCompleterTests.cs index 1a1c735..9af2243 100644 --- a/tests/MySql.Tests/Polling/BatchCompleterTests.cs +++ b/tests/MySql.Tests/Polling/BatchCompleterTests.cs @@ -62,10 +62,10 @@ public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(Comple .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - IProducedMessagesCompletionRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IBatchCompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); - await sut.RetryCompleteAsync(messages, _ct); + await sut.RetryAsync(messages, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); } @@ -82,12 +82,12 @@ public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionI .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - IProducedMessagesCompletionRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IBatchCompleteRetrier sut = new BatchCompleter(mySqlSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); await CompleteMessagesAsync(messages, connection, completionMode); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); - await sut.RetryCompleteAsync(messages, _ct); + await sut.RetryAsync(messages, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); } diff --git a/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs index 6204c10..dba2eb4 100644 --- a/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs +++ b/tests/PostgreSql.Tests/Polling/BatchCompleterTests.cs @@ -62,10 +62,10 @@ public async Task WhenRetryingCompletingMessagesThenTheyShouldBeCompleted(Comple .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - IProducedMessagesCompletionRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IBatchCompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeFalse(); - await sut.RetryCompleteAsync(messages, _ct); + await sut.RetryAsync(messages, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); } @@ -82,12 +82,12 @@ public async Task WhenRetryingCompletingAlreadyCompletedMessagesThenNoExceptionI .InitAsync(); await using var connection = await dbCtx.DataSource.OpenConnectionAsync(_ct); - IProducedMessagesCompletionRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); + IBatchCompleteRetrier sut = new BatchCompleter(postgresSettings, tableConfig, dbCtx.DataSource, TimeProvider.System); var messages = await GetMessagesAsync(connection, _ct); await CompleteMessagesAsync(messages, connection, completionMode); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); - await sut.RetryCompleteAsync(messages, _ct); + await sut.RetryAsync(messages, _ct); (await AreMessagesCompletedAsync(messages, connection, completionMode)).Should().BeTrue(); }