Skip to content

Commit 8145002

Browse files
Introduce mechanism to collect and retry completing successfully produced messages (#45)
1 parent b53afdb commit 8145002

38 files changed

+1356
-474
lines changed

docs/building-a-provider/polling.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ outline: deep
77
To implement a polling provider, there are three main things you need to do:
88

99
- implement the `IBatchFetcher` and `IBatchContext` interfaces
10+
- implement the `IBatchCompleteRetrier` interface, so that OutboxKit can retry completing already produced messages in case of failures
1011
- implement the `IOutboxCleaner` interface (assuming you want to support the update processed messages feature)
1112
- implement OutboxKit setup, which includes collecting any configuration you require, and calling core's `WithPolling` method
1213

@@ -26,6 +27,18 @@ Additionally, `IBatchContext` implements the `IAsyncDisposable` interface, which
2627

2728
In terms of lifetime, `IBatchFetcher` is created once per outbox, so should be registered in the DI container as a keyed singleton (more on the keyed part later). `IBatchContext` isn't fetched from the DI container, so it's up to you how you manage the lifetime. Normally, a new instance of `IBatchContext` would be created each time `FetchAndHoldAsync` is called, but if you have some good reasons, you could have a pool of `IBatchContext` instances, or even reuse the same instance. Just make sure there's no issues with reusing it across different fetches and outboxes.
2829

30+
## Implementing `IBatchCompleteRetrier`
31+
32+
`IBatchCompleteRetrier` is an interface that allows OutboxKit to retry completing messages that were produced successfully, in case of failures when completing them. The goal is to minimize the amount of duplicate messages produced.
33+
34+
This interface has a single method, `RetryAsync`, which takes the messages that were produced but have potentially failed to complete (no way to be sure, as an exception could occur immediately after the completion was persisted to the database). The implementation is likely similar to `IBatchContext.CompleteAsync`, though a bit of extra care has to be taken, to account for the fact that since the failure happened, the messages might have been completed by another running process.
35+
36+
You don't need to implement any retry logic, just let exceptions bubble up or throw your own to signal completion fail. The core library will handle the exception and keep calling `RetryAsync` until it returns successfully.
37+
38+
Note that OutboxKit keeps the potentially failed to complete messages in memory, so this is a best effort approach to minimize duplicates. This means that if either the process restarts or another instance of the process is running, the messages will be produced again.
39+
40+
In terms of lifetime, `IBatchCompleteRetrier`, like `IBatchFetcher`, is created once per outbox, so should be registered in the DI container as a keyed singleton.
41+
2942
## Implementing `IOutboxCleaner`
3043

3144
`IOutboxCleaner` is a rather simple interface, with a single method, `CleanAsync`, which returns the amount of messages that were cleaned, with no arguments (other than the typical `CancellationToken`).
@@ -40,6 +53,6 @@ To provide a simple way for library users to configure OutboxKit, your setup cod
4053

4154
After you collect all the required configuration, calling `WithPolling` should include the aforementioned key, and something that implements `IPollingOutboxKitConfigurator`. This interface exposes a couple of methods: `ConfigureServices` and `GetCoreSettings`.
4255

43-
`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher` and `IOutboxCleaner` (you could use it for any other services you require, of course).
56+
`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher`, `IBatchCompleteRetrier` and `IOutboxCleaner` (you could use it for any other services you require, of course).
4457

4558
`GetCoreSettings` is invoked when the core library is setting stuff up, and requires the configuration you collected.

docs/building-a-provider/push.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ outline: deep
44

55
# Push
66

7-
🚧 coming soon
7+
🚧 coming soon (maybe 😂)

src/Core/OpenTelemetry/ActivityShared.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ internal static class ActivityHelpers
88
Constants.ActivitySourceName,
99
typeof(ActivityHelpers).Assembly.GetName().Version!.ToString());
1010

11-
public static Activity? StartActivity(string activityName, OutboxKey key)
11+
public static Activity? StartActivity(string activityName, OutboxKey key)
12+
=> StartActivity(activityName, key, []);
13+
14+
public static Activity? StartActivity(
15+
string activityName,
16+
OutboxKey key,
17+
ReadOnlySpan<KeyValuePair<string, object?>> tags)
1218
{
1319
if (!ActivitySource.HasListeners())
1420
{
@@ -22,7 +28,8 @@ internal static class ActivityHelpers
2228
tags:
2329
[
2430
new(ActivityConstants.OutboxProviderKeyTag, key.ProviderKey),
25-
new(ActivityConstants.OutboxClientKeyTag, key.ClientKey)
31+
new(ActivityConstants.OutboxClientKeyTag, key.ClientKey),
32+
..tags
2633
]);
2734
}
2835
}
@@ -33,4 +40,5 @@ internal static class ActivityConstants
3340
public const string OutboxClientKeyTag = "outbox.client_key";
3441
public const string OutboxBatchSizeTag = "outbox.batch.size";
3542
public const string OutboxCleanedCountTag = "outbox.cleaned.count";
43+
public const string OutboxProducedMessagesToCompleteTag = "outbox.produced_messages_to_complete";
3644
}

src/Core/OpenTelemetry/CleanerMetrics.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,28 @@ namespace YakShaveFx.OutboxKit.Core.OpenTelemetry;
66
internal sealed class CleanerMetrics : IDisposable
77
{
88
private readonly Meter _meter;
9-
private readonly Counter<long> _producedMessagesCounter;
9+
private readonly Counter<long> _cleanedMessagesCounter;
1010

1111
public CleanerMetrics(IMeterFactory meterFactory)
1212
{
1313
_meter = meterFactory.Create(Constants.MeterName);
1414

15-
_producedMessagesCounter = _meter.CreateCounter<long>(
15+
_cleanedMessagesCounter = _meter.CreateCounter<long>(
1616
"outbox.cleaned_messages",
1717
unit: "{message}",
1818
description: "The number processed outbox messages cleaned");
1919
}
2020

2121
public void MessagesCleaned(OutboxKey key, int count)
2222
{
23-
if (_producedMessagesCounter.Enabled && count > 0)
23+
if (_cleanedMessagesCounter.Enabled && count > 0)
2424
{
2525
var tags = new TagList
2626
{
2727
{ "provider_key", key.ProviderKey },
2828
{ "client_key", key.ClientKey }
2929
};
30-
_producedMessagesCounter.Add(count, tags);
30+
_cleanedMessagesCounter.Add(count, tags);
3131
}
3232
}
3333

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System.Diagnostics;
2+
using System.Diagnostics.Metrics;
3+
4+
namespace YakShaveFx.OutboxKit.Core.OpenTelemetry;
5+
6+
internal sealed class CompletionRetrierMetrics : IDisposable
7+
{
8+
private readonly Meter _meter;
9+
private readonly Counter<long> _completionRetryAttemptsCounter;
10+
private readonly Counter<long> _completionRetriedMessagesCounter;
11+
12+
public CompletionRetrierMetrics(IMeterFactory meterFactory)
13+
{
14+
_meter = meterFactory.Create(Constants.MeterName);
15+
16+
_completionRetryAttemptsCounter = _meter.CreateCounter<long>(
17+
"outbox.completion_retry_attempts",
18+
unit: "{attempt}",
19+
description: "The number of attempts to retry completion of produced messages");
20+
21+
_completionRetriedMessagesCounter = _meter.CreateCounter<long>(
22+
"outbox.completion_retried_messages",
23+
unit: "{message}",
24+
description: "The number of messages for which completion was retried");
25+
26+
}
27+
28+
public void CompletionRetryAttempted(OutboxKey key, int count)
29+
{
30+
if (_completionRetryAttemptsCounter.Enabled && count > 0)
31+
{
32+
var tags = new TagList
33+
{
34+
{ "provider_key", key.ProviderKey },
35+
{ "client_key", key.ClientKey }
36+
};
37+
_completionRetryAttemptsCounter.Add(1, tags);
38+
}
39+
40+
if (_completionRetriedMessagesCounter.Enabled && count > 0)
41+
{
42+
var tags = new TagList
43+
{
44+
{ "provider_key", key.ProviderKey },
45+
{ "client_key", key.ClientKey }
46+
};
47+
_completionRetriedMessagesCounter.Add(count, tags);
48+
}
49+
}
50+
51+
public void Dispose() => _meter.Dispose();
52+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System.Diagnostics;
2+
using YakShaveFx.OutboxKit.Core.OpenTelemetry;
3+
4+
namespace YakShaveFx.OutboxKit.Core.Polling;
5+
6+
internal interface ICompletionRetryCollector
7+
{
8+
void Collect(IReadOnlyCollection<IMessage> messages);
9+
}
10+
11+
internal interface ICompletionRetrier
12+
{
13+
ValueTask RetryAsync(CancellationToken ct);
14+
}
15+
16+
// not thread safe, as it is only used in the context of a producing flow, which has no concurrency
17+
internal sealed class CompletionRetrier(
18+
OutboxKey key,
19+
IBatchCompleteRetrier providerCompletionRetrier,
20+
RetrierBuilderFactory retrierBuilderFactory,
21+
CompletionRetrierMetrics metrics)
22+
: ICompletionRetryCollector, ICompletionRetrier
23+
{
24+
private readonly Retrier _retrier = retrierBuilderFactory.Create()
25+
.WithMaxRetries(int.MaxValue)
26+
.WithShouldRetryDecider(ex =>
27+
{
28+
// retry on all exceptions except cancellation
29+
if (ex is OperationCanceledException oce) return oce.CancellationToken == CancellationToken.None;
30+
return true;
31+
})
32+
.Build();
33+
34+
private List<IMessage> _messages = [];
35+
36+
public void Collect(IReadOnlyCollection<IMessage> messages) => _messages.AddRange(messages);
37+
38+
public ValueTask RetryAsync(CancellationToken ct)
39+
=> _messages.Count == 0
40+
? ValueTask.CompletedTask
41+
: new(InnerRetryCompleteAsync(ct));
42+
43+
private async Task InnerRetryCompleteAsync(CancellationToken ct)
44+
{
45+
await _retrier.ExecuteWithRetryAsync(
46+
async () =>
47+
{
48+
metrics.CompletionRetryAttempted(key, _messages.Count);
49+
using var activity = ActivityHelpers.StartActivity(
50+
"retrying produced messages completion",
51+
key,
52+
[new(ActivityConstants.OutboxProducedMessagesToCompleteTag, _messages.Count)]);
53+
54+
try
55+
{
56+
await providerCompletionRetrier.RetryAsync(_messages, ct);
57+
}
58+
catch (Exception)
59+
{
60+
activity?.SetStatus(ActivityStatusCode.Error);
61+
throw;
62+
}
63+
},
64+
ct);
65+
66+
// since most of the time there are no messages to retry, we clear messages by creating a new list,
67+
// so the old one can be garbage collected, avoiding the underlying array to be kept in memory
68+
_messages = [];
69+
}
70+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace YakShaveFx.OutboxKit.Core.Polling;
2+
3+
/// <summary>
4+
/// Interface to be implemented by library users, to make it possible to retry completing messages already produced.
5+
/// </summary>
6+
public interface IBatchCompleteRetrier
7+
{
8+
/// <summary>
9+
/// Retries completing the given collection of messages.
10+
/// </summary>
11+
/// <param name="messages">The messages that were previously successfully produced.</param>
12+
/// <param name="ct">The async cancellation token.</param>
13+
/// <returns>The task representing the asynchronous operation</returns>
14+
Task RetryAsync(IReadOnlyCollection<IMessage> messages, CancellationToken ct);
15+
}

src/Core/Polling/PollingBackgroundService.cs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ internal sealed partial class PollingBackgroundService(
99
IPollingProducer producer,
1010
TimeProvider timeProvider,
1111
CorePollingSettings settings,
12+
ICompletionRetrier completionRetrier,
1213
ILogger<PollingBackgroundService> logger) : BackgroundService
1314
{
1415
private readonly TimeSpan _pollingInterval = settings.PollingInterval;
@@ -23,6 +24,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2324
{
2425
try
2526
{
27+
await completionRetrier.RetryAsync(stoppingToken);
28+
2629
try
2730
{
2831
await producer.ProducePendingAsync(stoppingToken);
@@ -38,26 +41,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3841
LogUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
3942
}
4043

41-
// to avoid letting the delays running in the background, wasting resources
42-
// we create a linked token, to cancel them
43-
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
44-
45-
var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token);
46-
var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token);
47-
48-
// wait for whatever occurs first:
49-
// - being notified of new messages added to the outbox
50-
// - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted
51-
// something but didn't produce it, or some error occurred when producing and there are pending messages
52-
await Task.WhenAny(listenerTask, delayTask);
53-
54-
LogWakeUp(
55-
logger,
56-
key.ProviderKey,
57-
key.ClientKey,
58-
listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed");
59-
60-
await linkedTokenSource.CancelAsync();
44+
await WaitBeforeNextIteration(stoppingToken);
6145
}
6246
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
6347
{
@@ -68,6 +52,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
6852
LogStopping(logger, key.ProviderKey, key.ClientKey);
6953
}
7054

55+
private async Task WaitBeforeNextIteration(CancellationToken ct)
56+
{
57+
// no need to even try to wait if the service is stopping
58+
if (ct.IsCancellationRequested) return;
59+
60+
// to avoid letting the delays running in the background, wasting resources
61+
// we create a linked token, to cancel them
62+
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct);
63+
64+
var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token);
65+
var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token);
66+
67+
// wait for whatever occurs first:
68+
// - being notified of new messages added to the outbox
69+
// - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted
70+
// something but didn't produce it, or some error occurred when producing and there are pending messages
71+
await Task.WhenAny(listenerTask, delayTask);
72+
73+
LogWakeUp(
74+
logger,
75+
key.ProviderKey,
76+
key.ClientKey,
77+
listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed");
78+
79+
await linkedTokenSource.CancelAsync();
80+
}
81+
7182
[LoggerMessage(LogLevel.Debug,
7283
Message =
7384
"Starting outbox polling background service for provider key \"{providerKey}\" and client key \"{clientKey}\", with polling interval {pollingInterval}")]

src/Core/Polling/Producer.cs renamed to src/Core/Polling/PollingProducer.cs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using Microsoft.Extensions.Logging;
12
using YakShaveFx.OutboxKit.Core.OpenTelemetry;
23

34
namespace YakShaveFx.OutboxKit.Core.Polling;
@@ -8,11 +9,13 @@ internal interface IPollingProducer
89
Task ProducePendingAsync(CancellationToken ct);
910
}
1011

11-
internal sealed class PollingProducer(
12+
internal sealed partial class PollingProducer(
1213
OutboxKey key,
1314
IBatchFetcher fetcher,
1415
IBatchProducer producer,
15-
ProducerMetrics metrics) : IPollingProducer
16+
ICompletionRetryCollector completionRetryCollector,
17+
ProducerMetrics metrics,
18+
ILogger<PollingProducer> logger) : IPollingProducer
1619
{
1720
public async Task ProducePendingAsync(CancellationToken ct)
1821
{
@@ -41,10 +44,26 @@ private async Task<bool> ProduceBatchAsync(CancellationToken ct)
4144
metrics.BatchProduced(key, messages.Count == result.Ok.Count);
4245
metrics.MessagesProduced(key, result.Ok.Count);
4346

44-
// messages already produced, try to ack them
45-
// not passing the actual cancellation token to try to complete the batch even if the application is shutting down
46-
await batchContext.CompleteAsync(result.Ok, CancellationToken.None);
47+
try
48+
{
49+
// messages already produced, try to ack them
50+
// not passing the actual cancellation token to try to complete the batch even if the application is shutting down
51+
await batchContext.CompleteAsync(result.Ok, CancellationToken.None);
52+
}
53+
catch (Exception ex)
54+
{
55+
LogCompletionUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
56+
completionRetryCollector.Collect(result.Ok);
57+
58+
// return false to break the loop, as we don't want to produce more messages until we're able to complete the batch
59+
return false;
60+
}
4761

4862
return await batchContext.HasNextAsync(ct);
4963
}
64+
65+
[LoggerMessage(LogLevel.Error,
66+
Message =
67+
"Unexpected error while completing produced outbox messages for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
68+
private static partial void LogCompletionUnexpectedError(ILogger logger, string providerKey, string clientKey, Exception ex);
5069
}

0 commit comments

Comments
 (0)