Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/building-a-provider/polling.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ outline: deep
To implement a polling provider, there are three main things you need to do:

- implement the `IBatchFetcher` and `IBatchContext` interfaces
- implement the `IBatchCompleteRetrier` interface, so that OutboxKit can retry completing already produced messages in case of failures
- implement the `IOutboxCleaner` interface (assuming you want to support the update processed messages feature)
- implement OutboxKit setup, which includes collecting any configuration you require, and calling core's `WithPolling` method

Expand All @@ -26,6 +27,18 @@ Additionally, `IBatchContext` implements the `IAsyncDisposable` interface, which

In terms of lifetime, `IBatchFetcher` is created once per outbox, so should be registered in the DI container as a keyed singleton (more on the keyed part later). `IBatchContext` isn't fetched from the DI container, so it's up to you how you manage the lifetime. Normally, a new instance of `IBatchContext` would be created each time `FetchAndHoldAsync` is called, but if you have some good reasons, you could have a pool of `IBatchContext` instances, or even reuse the same instance. Just make sure there's no issues with reusing it across different fetches and outboxes.

## Implementing `IBatchCompleteRetrier`

`IBatchCompleteRetrier` is an interface that allows OutboxKit to retry completing messages that were produced successfully, in case of failures when completing them. The goal is to minimize the amount of duplicate messages produced.

This interface has a single method, `RetryAsync`, which takes the messages that were produced but have potentially failed to complete (no way to be sure, as an exception could occur immediately after the completion was persisted to the database). The implementation is likely similar to `IBatchContext.CompleteAsync`, though a bit of extra care has to be taken, to account for the fact that since the failure happened, the messages might have been completed by another running process.

You don't need to implement any retry logic, just let exceptions bubble up or throw your own to signal completion fail. The core library will handle the exception and keep calling `RetryAsync` until it returns successfully.

Note that OutboxKit keeps the potentially failed to complete messages in memory, so this is a best effort approach to minimize duplicates. This means that if either the process restarts or another instance of the process is running, the messages will be produced again.

In terms of lifetime, `IBatchCompleteRetrier`, like `IBatchFetcher`, is created once per outbox, so should be registered in the DI container as a keyed singleton.

## Implementing `IOutboxCleaner`

`IOutboxCleaner` is a rather simple interface, with a single method, `CleanAsync`, which returns the amount of messages that were cleaned, with no arguments (other than the typical `CancellationToken`).
Expand All @@ -40,6 +53,6 @@ To provide a simple way for library users to configure OutboxKit, your setup cod

After you collect all the required configuration, calling `WithPolling` should include the aforementioned key, and something that implements `IPollingOutboxKitConfigurator`. This interface exposes a couple of methods: `ConfigureServices` and `GetCoreSettings`.

`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher` and `IOutboxCleaner` (you could use it for any other services you require, of course).
`ConfigureServices` is invoked when the core library is setting up services, and it's where you should register your `IBatchFetcher` and `IOutboxCleaner` implementations, plus any other services you require. You get the `OutboxKey` as an argument, and you should use that at least when registering `IBatchFetcher`, `IBatchCompleteRetrier` and `IOutboxCleaner` (you could use it for any other services you require, of course).

`GetCoreSettings` is invoked when the core library is setting stuff up, and requires the configuration you collected.
2 changes: 1 addition & 1 deletion docs/building-a-provider/push.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ outline: deep

# Push

🚧 coming soon
🚧 coming soon (maybe 😂)
12 changes: 10 additions & 2 deletions src/Core/OpenTelemetry/ActivityShared.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ internal static class ActivityHelpers
Constants.ActivitySourceName,
typeof(ActivityHelpers).Assembly.GetName().Version!.ToString());

public static Activity? StartActivity(string activityName, OutboxKey key)
public static Activity? StartActivity(string activityName, OutboxKey key)
=> StartActivity(activityName, key, []);

public static Activity? StartActivity(
string activityName,
OutboxKey key,
ReadOnlySpan<KeyValuePair<string, object?>> tags)
{
if (!ActivitySource.HasListeners())
{
Expand All @@ -22,7 +28,8 @@ internal static class ActivityHelpers
tags:
[
new(ActivityConstants.OutboxProviderKeyTag, key.ProviderKey),
new(ActivityConstants.OutboxClientKeyTag, key.ClientKey)
new(ActivityConstants.OutboxClientKeyTag, key.ClientKey),
..tags
]);
}
}
Expand All @@ -33,4 +40,5 @@ internal static class ActivityConstants
public const string OutboxClientKeyTag = "outbox.client_key";
public const string OutboxBatchSizeTag = "outbox.batch.size";
public const string OutboxCleanedCountTag = "outbox.cleaned.count";
public const string OutboxProducedMessagesToCompleteTag = "outbox.produced_messages_to_complete";
}
8 changes: 4 additions & 4 deletions src/Core/OpenTelemetry/CleanerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@ namespace YakShaveFx.OutboxKit.Core.OpenTelemetry;
internal sealed class CleanerMetrics : IDisposable
{
private readonly Meter _meter;
private readonly Counter<long> _producedMessagesCounter;
private readonly Counter<long> _cleanedMessagesCounter;

public CleanerMetrics(IMeterFactory meterFactory)
{
_meter = meterFactory.Create(Constants.MeterName);

_producedMessagesCounter = _meter.CreateCounter<long>(
_cleanedMessagesCounter = _meter.CreateCounter<long>(
"outbox.cleaned_messages",
unit: "{message}",
description: "The number processed outbox messages cleaned");
}

public void MessagesCleaned(OutboxKey key, int count)
{
if (_producedMessagesCounter.Enabled && count > 0)
if (_cleanedMessagesCounter.Enabled && count > 0)
{
var tags = new TagList
{
{ "provider_key", key.ProviderKey },
{ "client_key", key.ClientKey }
};
_producedMessagesCounter.Add(count, tags);
_cleanedMessagesCounter.Add(count, tags);
}
}

Expand Down
52 changes: 52 additions & 0 deletions src/Core/OpenTelemetry/CompletionRetrierMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;

namespace YakShaveFx.OutboxKit.Core.OpenTelemetry;

internal sealed class CompletionRetrierMetrics : IDisposable
{
private readonly Meter _meter;
private readonly Counter<long> _completionRetryAttemptsCounter;
private readonly Counter<long> _completionRetriedMessagesCounter;

public CompletionRetrierMetrics(IMeterFactory meterFactory)
{
_meter = meterFactory.Create(Constants.MeterName);

_completionRetryAttemptsCounter = _meter.CreateCounter<long>(
"outbox.completion_retry_attempts",
unit: "{attempt}",
description: "The number of attempts to retry completion of produced messages");

_completionRetriedMessagesCounter = _meter.CreateCounter<long>(
"outbox.completion_retried_messages",
unit: "{message}",
description: "The number of messages for which completion was retried");

}

public void CompletionRetryAttempted(OutboxKey key, int count)
{
if (_completionRetryAttemptsCounter.Enabled && count > 0)
{
var tags = new TagList
{
{ "provider_key", key.ProviderKey },
{ "client_key", key.ClientKey }
};
_completionRetryAttemptsCounter.Add(1, tags);
}

if (_completionRetriedMessagesCounter.Enabled && count > 0)
{
var tags = new TagList
{
{ "provider_key", key.ProviderKey },
{ "client_key", key.ClientKey }
};
_completionRetriedMessagesCounter.Add(count, tags);
}
}

public void Dispose() => _meter.Dispose();
}
70 changes: 70 additions & 0 deletions src/Core/Polling/CompletionRetrier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Diagnostics;
using YakShaveFx.OutboxKit.Core.OpenTelemetry;

namespace YakShaveFx.OutboxKit.Core.Polling;

internal interface ICompletionRetryCollector
{
void Collect(IReadOnlyCollection<IMessage> messages);
}

internal interface ICompletionRetrier
{
ValueTask RetryAsync(CancellationToken ct);
}

// not thread safe, as it is only used in the context of a producing flow, which has no concurrency
internal sealed class CompletionRetrier(
OutboxKey key,
IBatchCompleteRetrier providerCompletionRetrier,
RetrierBuilderFactory retrierBuilderFactory,
CompletionRetrierMetrics metrics)
: ICompletionRetryCollector, ICompletionRetrier
{
private readonly Retrier _retrier = retrierBuilderFactory.Create()
.WithMaxRetries(int.MaxValue)
.WithShouldRetryDecider(ex =>
{
// retry on all exceptions except cancellation
if (ex is OperationCanceledException oce) return oce.CancellationToken == CancellationToken.None;
return true;
})
.Build();

private List<IMessage> _messages = [];

public void Collect(IReadOnlyCollection<IMessage> messages) => _messages.AddRange(messages);

public ValueTask RetryAsync(CancellationToken ct)
=> _messages.Count == 0
? ValueTask.CompletedTask
: new(InnerRetryCompleteAsync(ct));

private async Task InnerRetryCompleteAsync(CancellationToken ct)
{
await _retrier.ExecuteWithRetryAsync(
async () =>
{
metrics.CompletionRetryAttempted(key, _messages.Count);
using var activity = ActivityHelpers.StartActivity(
"retrying produced messages completion",
key,
[new(ActivityConstants.OutboxProducedMessagesToCompleteTag, _messages.Count)]);

try
{
await providerCompletionRetrier.RetryAsync(_messages, ct);
}
catch (Exception)
{
activity?.SetStatus(ActivityStatusCode.Error);
throw;
}
},
ct);

// since most of the time there are no messages to retry, we clear messages by creating a new list,
// so the old one can be garbage collected, avoiding the underlying array to be kept in memory
_messages = [];
}
}
15 changes: 15 additions & 0 deletions src/Core/Polling/IBatchCompleteRetrier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace YakShaveFx.OutboxKit.Core.Polling;

/// <summary>
/// Interface to be implemented by library users, to make it possible to retry completing messages already produced.
/// </summary>
public interface IBatchCompleteRetrier
{
/// <summary>
/// Retries completing the given collection of messages.
/// </summary>
/// <param name="messages">The messages that were previously successfully produced.</param>
/// <param name="ct">The async cancellation token.</param>
/// <returns>The task representing the asynchronous operation</returns>
Task RetryAsync(IReadOnlyCollection<IMessage> messages, CancellationToken ct);
}
51 changes: 31 additions & 20 deletions src/Core/Polling/PollingBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ internal sealed partial class PollingBackgroundService(
IPollingProducer producer,
TimeProvider timeProvider,
CorePollingSettings settings,
ICompletionRetrier completionRetrier,
ILogger<PollingBackgroundService> logger) : BackgroundService
{
private readonly TimeSpan _pollingInterval = settings.PollingInterval;
Expand All @@ -23,6 +24,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await completionRetrier.RetryAsync(stoppingToken);

try
{
await producer.ProducePendingAsync(stoppingToken);
Expand All @@ -38,26 +41,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
LogUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
}

// to avoid letting the delays running in the background, wasting resources
// we create a linked token, to cancel them
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);

var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token);
var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token);

// wait for whatever occurs first:
// - being notified of new messages added to the outbox
// - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted
// something but didn't produce it, or some error occurred when producing and there are pending messages
await Task.WhenAny(listenerTask, delayTask);

LogWakeUp(
logger,
key.ProviderKey,
key.ClientKey,
listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed");

await linkedTokenSource.CancelAsync();
await WaitBeforeNextIteration(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
Expand All @@ -68,6 +52,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
LogStopping(logger, key.ProviderKey, key.ClientKey);
}

private async Task WaitBeforeNextIteration(CancellationToken ct)
{
// no need to even try to wait if the service is stopping
if (ct.IsCancellationRequested) return;

// to avoid letting the delays running in the background, wasting resources
// we create a linked token, to cancel them
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct);

var listenerTask = listener.WaitForMessagesAsync(key, linkedTokenSource.Token);
var delayTask = Task.Delay(_pollingInterval, timeProvider, linkedTokenSource.Token);

// wait for whatever occurs first:
// - being notified of new messages added to the outbox
// - poll the outbox every x amount of time, for example, in cases where another instance of the service persisted
// something but didn't produce it, or some error occurred when producing and there are pending messages
await Task.WhenAny(listenerTask, delayTask);

LogWakeUp(
logger,
key.ProviderKey,
key.ClientKey,
listenerTask.IsCompleted ? "listener triggered" : "polling interval elapsed");

await linkedTokenSource.CancelAsync();
}

[LoggerMessage(LogLevel.Debug,
Message =
"Starting outbox polling background service for provider key \"{providerKey}\" and client key \"{clientKey}\", with polling interval {pollingInterval}")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.Extensions.Logging;
using YakShaveFx.OutboxKit.Core.OpenTelemetry;

namespace YakShaveFx.OutboxKit.Core.Polling;
Expand All @@ -8,11 +9,13 @@ internal interface IPollingProducer
Task ProducePendingAsync(CancellationToken ct);
}

internal sealed class PollingProducer(
internal sealed partial class PollingProducer(
OutboxKey key,
IBatchFetcher fetcher,
IBatchProducer producer,
ProducerMetrics metrics) : IPollingProducer
ICompletionRetryCollector completionRetryCollector,
ProducerMetrics metrics,
ILogger<PollingProducer> logger) : IPollingProducer
{
public async Task ProducePendingAsync(CancellationToken ct)
{
Expand Down Expand Up @@ -41,10 +44,26 @@ private async Task<bool> ProduceBatchAsync(CancellationToken ct)
metrics.BatchProduced(key, messages.Count == result.Ok.Count);
metrics.MessagesProduced(key, result.Ok.Count);

// messages already produced, try to ack them
// not passing the actual cancellation token to try to complete the batch even if the application is shutting down
await batchContext.CompleteAsync(result.Ok, CancellationToken.None);
try
{
// messages already produced, try to ack them
// not passing the actual cancellation token to try to complete the batch even if the application is shutting down
await batchContext.CompleteAsync(result.Ok, CancellationToken.None);
}
catch (Exception ex)
{
LogCompletionUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
completionRetryCollector.Collect(result.Ok);

// return false to break the loop, as we don't want to produce more messages until we're able to complete the batch
return false;
}

return await batchContext.HasNextAsync(ct);
}

[LoggerMessage(LogLevel.Error,
Message =
"Unexpected error while completing produced outbox messages for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
private static partial void LogCompletionUnexpectedError(ILogger logger, string providerKey, string clientKey, Exception ex);
}
Loading