Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/Core/CleanUp/CleanUpBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using YakShaveFx.OutboxKit.Core.OpenTelemetry;
Expand All @@ -24,22 +25,30 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
using var activity = ActivityHelpers.StartActivity("clean processed outbox messages", key);
try
using (var activity = ActivityHelpers.StartActivity("clean processed outbox messages", key))
{
var cleaned = await cleaner.CleanAsync(stoppingToken);
metrics.MessagesCleaned(key, cleaned);
activity?.SetTag(ActivityConstants.OutboxCleanedCountTag, cleaned);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// expected when the service is stopping, let it stop gracefully
continue;
}
catch (Exception ex)
{
// we don't want the background service to stop while the application continues, so catching and logging
LogUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
try
{
var cleaned = await cleaner.CleanAsync(stoppingToken);
metrics.MessagesCleaned(key, cleaned);
activity?.SetTag(ActivityConstants.OutboxCleanedCountTag, cleaned);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// expected when the service is stopping, let it stop gracefully
continue;
}
catch (Exception ex)
{
// we don't want the background service to stop while the application continues, so catching and logging
LogUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
activity?.SetStatus(ActivityStatusCode.Error);
activity?.RecordException(ex, new TagList
{
{ ActivityConstants.OutboxProviderKeyTag, key.ProviderKey },
{ ActivityConstants.OutboxClientKeyTag, key.ClientKey }
});
}
}

await Task.Delay(_cleanUpInterval, timeProvider, stoppingToken);
Expand All @@ -57,13 +66,16 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
LogLevel.Debug,
Message =
"Starting outbox clean up service for provider key \"{providerKey}\" and client key \"{clientKey}\", with clean up interval {cleanUpInterval}")]
private static partial void LogStarting(ILogger logger, string providerKey, string clientKey, TimeSpan cleanUpInterval);
private static partial void LogStarting(ILogger logger, string providerKey, string clientKey,
TimeSpan cleanUpInterval);

[LoggerMessage(LogLevel.Debug,
Message = "Shutting down outbox clean up service for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
Message =
"Shutting down outbox clean up service for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
private static partial void LogStopping(ILogger logger, string providerKey, string clientKey);

[LoggerMessage(LogLevel.Error,
Message = "Unexpected error while cleaning outbox messages for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
Message =
"Unexpected error while cleaning outbox messages for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
private static partial void LogUnexpectedError(ILogger logger, string providerKey, string clientKey, Exception ex);
}
53 changes: 52 additions & 1 deletion src/Core/OpenTelemetry/ActivityShared.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Globalization;

namespace YakShaveFx.OutboxKit.Core.OpenTelemetry;

Expand All @@ -8,7 +9,7 @@ internal static class ActivityHelpers
Constants.ActivitySourceName,
typeof(ActivityHelpers).Assembly.GetName().Version!.ToString());

public static Activity? StartActivity(string activityName, OutboxKey key)
public static Activity? StartActivity(string activityName, OutboxKey key)
=> StartActivity(activityName, key, []);

public static Activity? StartActivity(
Expand Down Expand Up @@ -41,4 +42,54 @@ internal static class ActivityConstants
public const string OutboxBatchSizeTag = "outbox.batch.size";
public const string OutboxCleanedCountTag = "outbox.cleaned.count";
public const string OutboxProducedMessagesToCompleteTag = "outbox.produced_messages_to_complete";
}

internal static class ActivityExtensions
{
// copied and adapted from an older version of the OpenTelemetry.Api package
// an equivalent AddException was added in .NET 9, but this library targets .NET 8 right now
// to remove when we upgrade to targeting .NET 10
public static Activity RecordException(this Activity activity, Exception ex, in TagList tags)
{
const string exceptionEventName = "exception";
const string exceptionMessageTag = "exception.message";
const string exceptionStackTraceTag = "exception.stacktrace";
const string exceptionTypeTag = "exception.type";


var tagsCollection = new ActivityTagsCollection
{
{ exceptionTypeTag, ex.GetType().FullName },
{ exceptionStackTraceTag, ex.ToInvariantString() },
};

if (!string.IsNullOrWhiteSpace(ex.Message))
{
tagsCollection.Add(exceptionMessageTag, ex.Message);
}

foreach (var tag in tags)
{
tagsCollection[tag.Key] = tag.Value;
}

activity.AddEvent(new ActivityEvent(exceptionEventName, tags: tagsCollection));

return activity;
}

private static string ToInvariantString(this Exception exception)
{
var originalUICulture = Thread.CurrentThread.CurrentUICulture;

try
{
Thread.CurrentThread.CurrentUICulture = CultureInfo.InvariantCulture;
return exception.ToString();
}
finally
{
Thread.CurrentThread.CurrentUICulture = originalUICulture;
}
}
}
33 changes: 32 additions & 1 deletion src/Core/OpenTelemetry/CompletionRetrierMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ internal sealed class CompletionRetrierMetrics : IDisposable
private readonly Meter _meter;
private readonly Counter<long> _completionRetryAttemptsCounter;
private readonly Counter<long> _completionRetriedMessagesCounter;
private readonly UpDownCounter<int> _pendingRetryCounter;

public CompletionRetrierMetrics(IMeterFactory meterFactory)
{
Expand All @@ -22,7 +23,11 @@ public CompletionRetrierMetrics(IMeterFactory meterFactory)
"outbox.completion_retried_messages",
unit: "{message}",
description: "The number of messages for which completion was retried");


_pendingRetryCounter = _meter.CreateUpDownCounter<int>(
"outbox.messages_pending_completion_retry",
unit: "{message}",
description: "The number of messages pending completion retry");
}

public void CompletionRetryAttempted(OutboxKey key, int count)
Expand All @@ -48,5 +53,31 @@ public void CompletionRetryAttempted(OutboxKey key, int count)
}
}

public void NewMessagesPendingRetry(OutboxKey key, int count)
{
if (_pendingRetryCounter.Enabled)
{
var tags = new TagList
{
{ "provider_key", key.ProviderKey },
{ "client_key", key.ClientKey }
};
_pendingRetryCounter.Add(count, tags);
}
}

public void MessagesCompleted(OutboxKey key, int count)
{
if (_pendingRetryCounter.Enabled && count > 0)
{
var tags = new TagList
{
{ "provider_key", key.ProviderKey },
{ "client_key", key.ClientKey }
};
_pendingRetryCounter.Add(-count, tags);
}
}

public void Dispose() => _meter.Dispose();
}
46 changes: 32 additions & 14 deletions src/Core/Polling/CompletionRetrier.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using YakShaveFx.OutboxKit.Core.OpenTelemetry;

namespace YakShaveFx.OutboxKit.Core.Polling;
Expand All @@ -14,11 +15,12 @@ internal interface ICompletionRetrier
}

// not thread safe, as it is only used in the context of a producing flow, which has no concurrency
internal sealed class CompletionRetrier(
internal sealed partial class CompletionRetrier(
OutboxKey key,
IBatchCompleteRetrier providerCompletionRetrier,
RetrierBuilderFactory retrierBuilderFactory,
CompletionRetrierMetrics metrics)
CompletionRetrierMetrics metrics,
ILogger<CompletionRetrier> logger)
: ICompletionRetryCollector, ICompletionRetrier
{
private readonly Retrier _retrier = retrierBuilderFactory.Create()
Expand All @@ -30,41 +32,57 @@ internal sealed class CompletionRetrier(
return true;
})
.Build();

private List<IMessage> _messages = [];

public void Collect(IReadOnlyCollection<IMessage> messages) => _messages.AddRange(messages);
public void Collect(IReadOnlyCollection<IMessage> messages)
{
_messages.AddRange(messages);
metrics.NewMessagesPendingRetry(key, messages.Count);
}

public ValueTask RetryAsync(CancellationToken ct)
=> _messages.Count == 0
? ValueTask.CompletedTask
: new(InnerRetryCompleteAsync(ct));
: new(InnerRetryAsync(ct));

private async Task InnerRetryCompleteAsync(CancellationToken ct)
{
await _retrier.ExecuteWithRetryAsync(
private async Task InnerRetryAsync(CancellationToken ct)
=> await _retrier.ExecuteWithRetryAsync(
async () =>
{
metrics.CompletionRetryAttempted(key, _messages.Count);
using var activity = ActivityHelpers.StartActivity(
"retrying produced messages completion",
key,
[new(ActivityConstants.OutboxProducedMessagesToCompleteTag, _messages.Count)]);

try
{
await providerCompletionRetrier.RetryAsync(_messages, ct);

metrics.MessagesCompleted(key, _messages.Count);

// since most of the time there are no messages to retry, we clear messages by creating a new list,
// so the old one can be garbage collected, avoiding the underlying array to be kept in memory
_messages = [];
}
catch (Exception)
catch (Exception ex)
{
LogRetryFailed(logger, key.ProviderKey, key.ClientKey, ex);
activity?.SetStatus(ActivityStatusCode.Error);
activity?.RecordException(ex, new TagList
{
{ ActivityConstants.OutboxProviderKeyTag, key.ProviderKey },
{ ActivityConstants.OutboxClientKeyTag, key.ClientKey }
});
throw;
}
},
ct);

// since most of the time there are no messages to retry, we clear messages by creating a new list,
// so the old one can be garbage collected, avoiding the underlying array to be kept in memory
_messages = [];
}
// logging as warning instead of error, as this is a retry, it's kind of expected that something might be wrong
[LoggerMessage(LogLevel.Warning,
Message =
"Error while retrying message completion for provider key \"{providerKey}\" and client key \"{clientKey}\"")]
private static partial void LogRetryFailed(ILogger logger, string providerKey, string clientKey, Exception ex);
}
7 changes: 7 additions & 0 deletions src/Core/Polling/PollingProducer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using YakShaveFx.OutboxKit.Core.OpenTelemetry;

Expand Down Expand Up @@ -53,6 +54,12 @@ private async Task<bool> ProduceBatchAsync(CancellationToken ct)
catch (Exception ex)
{
LogCompletionUnexpectedError(logger, key.ProviderKey, key.ClientKey, ex);
activity?.SetStatus(ActivityStatusCode.Error);
activity?.RecordException(ex, new TagList
{
{ ActivityConstants.OutboxProviderKeyTag, key.ProviderKey },
{ ActivityConstants.OutboxClientKeyTag, key.ClientKey }
});
completionRetryCollector.Collect(result.Ok);

// return false to break the loop, as we don't want to produce more messages until we're able to complete the batch
Expand Down
3 changes: 2 additions & 1 deletion src/Core/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ private static void AddOutboxKitPolling(IServiceCollection services, OutboxKitCo
key,
s.GetRequiredKeyedService<IBatchCompleteRetrier>(key),
s.GetRequiredService<RetrierBuilderFactory>(),
s.GetRequiredService<CompletionRetrierMetrics>()));
s.GetRequiredService<CompletionRetrierMetrics>(),
s.GetRequiredService<ILogger<CompletionRetrier>>()));

services.AddKeyedSingleton<ICompletionRetryCollector>(key,
(s, _) => s.GetRequiredKeyedService<CompletionRetrier>(key));
Expand Down
Loading