diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/DocumentationSamples.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/DocumentationSamples.cs index 6895b36c2..f90cb08f1 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/DocumentationSamples.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/DocumentationSamples.cs @@ -37,6 +37,14 @@ public static async Task configure() // And all the normal Wolverine options... .Sequential(); + + + // Listen for incoming messages from a Pulsar topic with a shared subscription and using RETRY and DLQ queues + opts.ListenToPulsarTopic("persistent://public/default/three") + .WithSharedSubscriptionType() + .DeadLetterQueueing(new DeadLetterTopic(DeadLetterTopicMode.Native)) + .RetryLetterQueueing(new RetryLetterTopic([TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5)])) + .Sequential(); }); #endregion diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarNativeReliabilityTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarNativeReliabilityTests.cs new file mode 100644 index 000000000..6f202d3f2 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarNativeReliabilityTests.cs @@ -0,0 +1,212 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.ComplianceTests.Scheduling; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.Pulsar.Tests; + +public class PulsarNativeReliabilityTests : /*TransportComplianceFixture,*/ IAsyncLifetime +{ + public IHost WolverineHost; + + public PulsarNativeReliabilityTests() + { + } + + private IHostBuilder ConfigureBuilder() + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + + var topic = Guid.NewGuid().ToString(); + var topicPath = $"persistent://public/default/compliance{topic}"; + + opts.UsePulsar(b => { }); + + opts.IncludeType(); + + opts.PublishMessage() + .ToPulsarTopic(topicPath); + + opts.ListenToPulsarTopic(topicPath) + .WithSharedSubscriptionType() + .DeadLetterQueueing(DeadLetterTopic.DefaultNative) + .RetryLetterQueueing(new RetryLetterTopic([TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3)])) + //.ProcessInline(); + .BufferedInMemory(); + + + var topicPath2 = $"persistent://public/default/no-retry-{topic}"; + opts.IncludeType(); + + opts.PublishMessage() + .ToPulsarTopic(topicPath2); + + opts.ListenToPulsarTopic(topicPath2) + .WithSharedSubscriptionType() + .DeadLetterQueueing(DeadLetterTopic.DefaultNative) + .DisableRetryLetterQueueing() + .ProcessInline(); + + }); + } + + public async Task InitializeAsync() + { + WolverineHost = ConfigureBuilder().Build(); + await WolverineHost.StartAsync(); + } + + [Fact] + public async Task run_setup_with_simulated_exception_in_handler() + { + var session = await WolverineHost.TrackActivity(TimeSpan.FromSeconds(1000)) + //.WaitForMessageToBeReceivedAt(WolverineHost) + .DoNotAssertOnExceptionsDetected() + .IncludeExternalTransports() + .WaitForCondition(new WaitForDeadLetteredMessage()) + .SendMessageAndWaitAsync(new SRMessage1()); + + + session.Sent.AllMessages(); + session.MovedToErrorQueue + .MessagesOf() + .Count() + .ShouldBe(1); + + session.Received + .MessagesOf() + .Count() + .ShouldBe(4); + + session.Requeued + .MessagesOf() + .Count() + .ShouldBe(3); + + + // TODO: I Guess the capture of the envelope headers occurs before we manipulate it + var firstRequeuedEnvelope = session.Requeued.Envelopes().First(); + firstRequeuedEnvelope.ShouldSatisfyAllConditions( + () => firstRequeuedEnvelope.Attempts.ShouldBe(1), + () => firstRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeFalse() + ); + var secondRequeuedEnvelope = session.Requeued.Envelopes().Skip(1).First(); + secondRequeuedEnvelope.ShouldSatisfyAllConditions( + () => secondRequeuedEnvelope.Attempts.ShouldBe(2), + () => secondRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeTrue(), + () => secondRequeuedEnvelope.Headers["DELAY_TIME"].ShouldBe(TimeSpan.FromSeconds(4).TotalMilliseconds.ToString()) + ); + + var thirdRequeuedEnvelope = session.Requeued.Envelopes().Skip(2).First(); + thirdRequeuedEnvelope.ShouldSatisfyAllConditions( + () => thirdRequeuedEnvelope.Attempts.ShouldBe(3), + () => thirdRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeTrue(), + () => thirdRequeuedEnvelope.Headers["DELAY_TIME"].ShouldBe(TimeSpan.FromSeconds(2).TotalMilliseconds.ToString()) + ); + + + var dlqEnvelope = session.MovedToErrorQueue.Envelopes().First(); + dlqEnvelope.ShouldSatisfyAllConditions( + () => dlqEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.Exception).ShouldBeTrue(), + () => dlqEnvelope.Headers[PulsarEnvelopeConstants.ReconsumeTimes].ShouldBe("3") + ); + + } + + [Fact] + public async Task run_setup_with_simulated_exception_in_handler_only_native_dead_lettered_queue() + { + var session = await WolverineHost.TrackActivity(TimeSpan.FromSeconds(100)) + .DoNotAssertOnExceptionsDetected() + .IncludeExternalTransports() + .WaitForCondition(new WaitForDeadLetteredMessage()) + .SendMessageAndWaitAsync(new SRMessage2()); + + + session.Sent.AllMessages(); + session.MovedToErrorQueue + .MessagesOf() + .Count() + .ShouldBe(1); + + session.Received + .MessagesOf() + .Count() + .ShouldBe(1); + + session.Requeued + .MessagesOf() + .Count() + .ShouldBe(0); + + + + + var firstEnvelope = session.MovedToErrorQueue.Envelopes().First(); + firstEnvelope.ShouldSatisfyAllConditions( + () => firstEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.Exception).ShouldBeTrue(), + () => firstEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.ReconsumeTimes).ShouldBeFalse() + ); + + } + + + + public async Task DisposeAsync() + { + await WolverineHost.StopAsync(); + WolverineHost.Dispose(); + } + + +} + +public class SRMessage1; +public class SRMessage2; + + +public class SRMessageHandlers +{ + public Task Handle(SRMessage1 message, IMessageContext context) + { + throw new InvalidOperationException("Simulated exception"); + } + + public Task Handle(SRMessage2 message, IMessageContext context) + { + throw new InvalidOperationException("Simulated exception"); + } + +} + + + +public class WaitForDeadLetteredMessage : ITrackedCondition +{ + + private bool _found; + + public WaitForDeadLetteredMessage() + { + + } + + public void Record(EnvelopeRecord record) + { + if (record.Envelope.Message is T && record.MessageEventType == MessageEventType.MovedToErrorQueue ) + // && record.Envelope.Destination?.ToString().Contains(_dlqTopic) == true) + { + _found = true; + } + } + + public bool IsCompleted() => _found; +} + diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/DeadLetterTopic.cs b/src/Transports/Pulsar/Wolverine.Pulsar/DeadLetterTopic.cs new file mode 100644 index 000000000..1805d473e --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/DeadLetterTopic.cs @@ -0,0 +1,73 @@ + + +namespace Wolverine.Pulsar; + +public enum DeadLetterTopicMode +{ + /// + /// Opt into using Pulsar's native dead letter topic approach. This is the default and recommended + /// + Native, + + /// + /// Completely ignore Pulsar native dead letter topic in favor of Wolverine persistent dead letter queueing + /// + WolverineStorage +} + +public class DeadLetterTopic +{ + + public static DeadLetterTopic DefaultNative => new(DeadLetterTopicMode.Native); + + private string? _topicName; + + public DeadLetterTopicMode Mode { get; set; } + + public DeadLetterTopic(DeadLetterTopicMode mode) + { + Mode = mode; + } + + public DeadLetterTopic(string topicName, DeadLetterTopicMode mode) + { + _topicName = topicName; + Mode = mode; + } + + public string TopicName + { + get => _topicName; + set => _topicName = value?? throw new ArgumentNullException(nameof(TopicName)); + } + + protected bool Equals(DeadLetterTopic other) + { + return _topicName == other._topicName; + } + + public override bool Equals(object? obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != this.GetType()) + { + return false; + } + + return Equals((DeadLetterTopic)obj); + } + + public override int GetHashCode() + { + return _topicName.GetHashCode(); + } +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeContinuationSource.cs b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeContinuationSource.cs new file mode 100644 index 000000000..e37168120 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeContinuationSource.cs @@ -0,0 +1,21 @@ +using Wolverine.ErrorHandling; +using Wolverine.Runtime; + +namespace Wolverine.Pulsar.ErrorHandling; + +public class PulsarNativeContinuationSource : IContinuationSource +{ + public string Description { get; } + + public IContinuation Build(Exception ex, Envelope envelope) + { + // Only handle Pulsar envelopes/listeners + if (envelope.Listener is PulsarListener) + { + return new PulsarNativeResiliencyContinuation(ex); + } + + // Return null to let the next continuation source handle it + return null; + } +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyContinuation.cs b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyContinuation.cs new file mode 100644 index 000000000..eeb5fc2d9 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyContinuation.cs @@ -0,0 +1,45 @@ +using System.Diagnostics; +using Wolverine.ErrorHandling; +using Wolverine.Runtime; + +namespace Wolverine.Pulsar.ErrorHandling; + +public class PulsarNativeResiliencyContinuation : IContinuation +{ + private readonly Exception _exception; + + public PulsarNativeResiliencyContinuation(Exception exception) + { + _exception = exception; + } + + public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, Activity? activity) + { + var context = lifecycle as MessageContext; + var envelope = context?.Envelope; + + if (envelope?.Listener is PulsarListener listener) + { + if (listener.NativeRetryLetterQueueEnabled && listener.RetryLetterTopic!.Retry.Count >= envelope.Attempts) + { + // Use native retry if enabled and attempts are within bounds + //await listener.MoveToScheduledUntilAsync(envelope, now); + await new ScheduledRetryContinuation(listener.RetryLetterTopic!.Retry[envelope.Attempts - 1]) + .ExecuteAsync(lifecycle, runtime, now, activity); + return; + } + + if (listener.NativeDeadLetterQueueEnabled) + { + await new MoveToErrorQueue(_exception) + .ExecuteAsync(lifecycle, runtime, now, activity); + //await listener.MoveToErrorsAsync(envelope, _exception); + } + + return; + } + + // Fall back to MoveToErrorQueue for other listener types + await new MoveToErrorQueue(_exception).ExecuteAsync(lifecycle, runtime, now, activity); + } +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyPolicy.cs b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyPolicy.cs new file mode 100644 index 000000000..5416a0672 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/ErrorHandling/PulsarNativeResiliencyPolicy.cs @@ -0,0 +1,22 @@ +using Wolverine.Configuration; +using Wolverine.ErrorHandling; +using Wolverine.ErrorHandling.Matches; + +namespace Wolverine.Pulsar.ErrorHandling; + +public class PulsarNativeResiliencyPolicy : IWolverinePolicy +{ + public void Apply(WolverineOptions options) + { + var rule = new FailureRule(new AlwaysMatches()); + + rule.AddSlot(new PulsarNativeContinuationSource()); + + // Set the same source as the InfiniteSource to handle all other attempts + rule.InfiniteSource = new PulsarNativeContinuationSource(); + + // Add this rule to the global failure collection + // This ensures it will be checked before other rules + options.Policies.Failures.Add(rule); + } +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs index d1c0cadce..139d6d5da 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs @@ -1,5 +1,6 @@ using DotPulsar; using JasperFx.Core; +using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; @@ -28,6 +29,18 @@ public PulsarEndpoint(Uri uri, PulsarTransport parent) : base(uri, EndpointRole. public string SubscriptionName { get; internal set; } = "Wolverine"; public SubscriptionType SubscriptionType { get; internal set; } = SubscriptionType.Exclusive; + /// + /// Use to override the dead letter topic for this endpoint + /// + public DeadLetterTopic? DeadLetterTopic { get; set; } + + /// + /// Use to override the retry letter topic for this endpoint + /// + public RetryLetterTopic? RetryLetterTopic { get; set; } + + public bool IsPersistent => Persistence.Equals(Persistent); + public static Uri UriFor(bool persistent, string tenant, string @namespace, string topicName) { var scheme = persistent ? "persistent" : "non-persistent"; @@ -94,4 +107,16 @@ protected override ISender CreateSender(IWolverineRuntime runtime) { return new PulsarSender(runtime, this, _parent, runtime.Cancellation); } + + public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISender? deadLetterSender) + { + return base.TryBuildDeadLetterSender(runtime, out deadLetterSender); + + + // TODO: ? + //var queueName = this.DeadLetterTopic?.TopicName ?? _parent.DeadLetterTopic.TopicName; + //var dlq = _parent[UriFor(queueName)]; + //deadLetterSender = dlq.CreateSender(runtime); + //return true; + } } \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeConstants.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeConstants.cs new file mode 100644 index 000000000..b2dd4c99a --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeConstants.cs @@ -0,0 +1,13 @@ +namespace Wolverine.Pulsar; + +public static class PulsarEnvelopeConstants +{ + public const string ReconsumeTimes = "RECONSUMETIMES"; + public const string DelayTimeMetadataKey = "DELAY_TIME"; + public const string RealTopicMetadataKey = "REAL_TOPIC"; + public const string TopicMetadataKey = "TOPIC_NAME"; + public const string MessageIdMetadataKey = "MESSAGE_ID"; + public const string RealSubscriptionMetadataKey = "REAL_SUBSCRIPTION"; + public const string OriginMessageIdMetadataKey = "ORIGIN_MESSAGE_ID"; + public const string Exception = "EXCEPTION"; +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs index 13539ff4f..394053304 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs @@ -21,11 +21,25 @@ protected override void writeOutgoingHeader(MessageMetadata outgoing, string key protected override bool tryReadIncomingHeader(IMessage> incoming, string key, out string? value) { + if (key == EnvelopeConstants.AttemptsKey && incoming.Properties.TryGetValue(PulsarEnvelopeConstants.ReconsumeTimes, out value)) + { + return true; + } return incoming.Properties.TryGetValue(key, out value); } protected override void writeIncomingHeaders(IMessage> incoming, Envelope envelope) { - foreach (var pair in incoming.Properties) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Properties) + { + envelope.Headers[pair.Key] = pair.Value; + + // doesn't work, it gets overwritten in next step - fix in tryReadIncomingHeader + //if (pair.Key == PulsarEnvelopeConstants.ReconsumeTimes) + //{ + // envelope.Headers[EnvelopeConstants.AttemptsKey] = pair.Value; + // envelope.Attempts = int.Parse(pair.Value); + //} + } } } \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs index 9ba45d69a..c4e321232 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs @@ -1,28 +1,34 @@ using System.Buffers; +using System.Globalization; +using DotPulsar; using DotPulsar.Abstractions; using DotPulsar.Extensions; +using DotPulsar.Internal; using Wolverine.Runtime; using Wolverine.Transports; namespace Wolverine.Pulsar; -internal class PulsarListener : IListener +internal class PulsarListener : IListener, ISupportDeadLetterQueue, ISupportNativeScheduling { private readonly CancellationToken _cancellation; private readonly IConsumer>? _consumer; + private readonly IConsumer>? _retryConsumer; private readonly CancellationTokenSource _localCancellation; private readonly Task? _receivingLoop; + private readonly Task? _receivingRetryLoop; private readonly PulsarSender _sender; + private IReceiver _receiver; + private PulsarEndpoint _endpoint; + private IProducer>? _retryLetterQueueProducer; + private IProducer>? _dlqProducer; public PulsarListener(IWolverineRuntime runtime, PulsarEndpoint endpoint, IReceiver receiver, PulsarTransport transport, CancellationToken cancellation) { - if (receiver == null) - { - throw new ArgumentNullException(nameof(receiver)); - } - + _endpoint = endpoint; + _receiver = receiver ?? throw new ArgumentNullException(nameof(receiver)); _cancellation = cancellation; Address = endpoint.Uri; @@ -40,8 +46,20 @@ public PulsarListener(IWolverineRuntime runtime, PulsarEndpoint endpoint, IRecei .Topic(endpoint.PulsarTopic()) .Create(); + // TODO: check + NativeDeadLetterQueueEnabled = transport.DeadLetterTopic is not null && + transport.DeadLetterTopic.Mode != DeadLetterTopicMode.WolverineStorage || + endpoint.DeadLetterTopic is not null && endpoint.DeadLetterTopic.Mode != + DeadLetterTopicMode.WolverineStorage; + + NativeRetryLetterQueueEnabled = endpoint.RetryLetterTopic is not null && + RetryLetterTopic.SupportedSubscriptionTypes.Contains(endpoint.SubscriptionType); + + trySetupNativeResiliency(endpoint, transport); + _receivingLoop = Task.Run(async () => { + await foreach (var message in _consumer.Messages(combined.Token)) { var envelope = new PulsarEnvelope(message) @@ -53,7 +71,82 @@ public PulsarListener(IWolverineRuntime runtime, PulsarEndpoint endpoint, IRecei await receiver.ReceivedAsync(this, envelope); } + + }, combined.Token); + + + if (NativeRetryLetterQueueEnabled) + { + _retryConsumer = createRetryConsumer(endpoint, transport); + _receivingRetryLoop = Task.Run(async () => + { + await foreach (var message in _retryConsumer.Messages(combined.Token)) + { + var envelope = new PulsarEnvelope(message) + { + Data = message.Data.ToArray() + }; + + mapper.MapIncomingToEnvelope(envelope, message); + + await receiver.ReceivedAsync(this, envelope); + + + } + + }, combined.Token); + } + } + + private void trySetupNativeResiliency(PulsarEndpoint endpoint, PulsarTransport transport) + { + if (!NativeRetryLetterQueueEnabled && !NativeDeadLetterQueueEnabled) + { + return; + } + + if (endpoint.RetryLetterTopic is not null) + { + + _retryLetterQueueProducer = transport.Client!.NewProducer() + .Topic(getRetryLetterTopicUri(endpoint)!.ToString()) + .Create(); + } + + _dlqProducer = transport.Client!.NewProducer() + .Topic(getDeadLetteredTopicUri(endpoint).ToString()) + .Create(); + + } + + + + private IConsumer> createRetryConsumer(PulsarEndpoint endpoint, PulsarTransport transport) + { + var topicRetry = getRetryLetterTopicUri(endpoint); + + return transport.Client!.NewConsumer() + .SubscriptionName(endpoint.SubscriptionName) + .SubscriptionType(endpoint.SubscriptionType) + .Topic(topicRetry!.ToString()) + .Create(); + } + + private Uri? getRetryLetterTopicUri(PulsarEndpoint endpoint) + { + return NativeDeadLetterQueueEnabled + ? PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace, + endpoint.RetryLetterTopic?.TopicName ?? $"{endpoint.TopicName}-RETRY") + : null; + } + + private Uri getDeadLetteredTopicUri(PulsarEndpoint endpoint) + { + var topicDql = PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace, + endpoint.DeadLetterTopic?.TopicName ?? $"{endpoint.TopicName}-DLQ"); + + return topicDql; } public ValueTask CompleteAsync(Envelope envelope) @@ -87,9 +180,26 @@ public async ValueTask DisposeAsync() await _consumer.DisposeAsync(); } + if (_retryConsumer != null) + { + await _retryConsumer.DisposeAsync(); + } + + if (_retryLetterQueueProducer != null) + { + await _retryLetterQueueProducer.DisposeAsync(); + } + + if (_dlqProducer != null) + { + await _dlqProducer.DisposeAsync(); + } + + await _sender.DisposeAsync(); _receivingLoop!.Dispose(); + _receivingRetryLoop?.Dispose(); } public Uri Address { get; } @@ -103,6 +213,13 @@ public async ValueTask StopAsync() await _consumer.Unsubscribe(_cancellation); await _consumer.RedeliverUnacknowledgedMessages(_cancellation); + + + if (_retryConsumer != null) + { + await _retryConsumer.Unsubscribe(_cancellation); + await _retryConsumer.RedeliverUnacknowledgedMessages(_cancellation); + } } public async Task TryRequeueAsync(Envelope envelope) @@ -115,4 +232,113 @@ public async Task TryRequeueAsync(Envelope envelope) return false; } -} \ No newline at end of file + + public bool NativeDeadLetterQueueEnabled { get; } + public RetryLetterTopic? RetryLetterTopic => _endpoint.RetryLetterTopic; + public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) + { + if (NativeDeadLetterQueueEnabled && envelope is PulsarEnvelope e) + { + await moveToQueueAsync(envelope, exception, true); + } + } + + public bool NativeRetryLetterQueueEnabled { get; } + + public async Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) + { + if (NativeRetryLetterQueueEnabled && envelope is PulsarEnvelope e) + { + await moveToQueueAsync(e, e.Failure, false); + } + } + + + private async Task moveToQueueAsync(Envelope envelope, Exception exception, bool isDeadLettered = false) + { + if (envelope is PulsarEnvelope e) + { + var messageMetadata = BuildMessageMetadata(envelope, e, exception, isDeadLettered); + + IConsumer>? associatedConsumer; + IProducer> associatedProducer; + + if (NativeRetryLetterQueueEnabled && !isDeadLettered) + { + associatedConsumer = _retryConsumer!; + associatedProducer = _retryLetterQueueProducer!; + } + else + { + associatedConsumer = _consumer!; + associatedProducer = _dlqProducer!; + } + + await associatedConsumer.Acknowledge(e.MessageData, + _cancellation); // TODO: check: original message should be acked and copy is sent to retry/DLQ + // TODO: check: what to do with the original message on Wolverine side? I Guess it should be acked? or we could use some kind of RequeueContinuation in FailureRuleCollection. If I understand correctly, Wolverine is/should handle original Wolverine message and its copies across Pulsar's topics as same identity? + + await associatedProducer.Send(messageMetadata, e.MessageData.Data, _cancellation) + .ConfigureAwait(false); + } + } + + private MessageMetadata BuildMessageMetadata(Envelope envelope, PulsarEnvelope e, Exception exception, + bool isDeadLettered) + { + var messageMetadata = new MessageMetadata(); + + foreach (var property in e.Headers) + { + messageMetadata[property.Key] = property.Value; + } + + //reconsumeTimesValue = GetReconsumeHeader(messageMetadata); + + if (!e.Headers.TryGetValue(PulsarEnvelopeConstants.RealTopicMetadataKey, out var originTopicNameStr)) + { + originTopicNameStr = envelope.Headers[EnvelopeConstants.ReplyUriKey]; + + } + + messageMetadata[PulsarEnvelopeConstants.RealTopicMetadataKey] = originTopicNameStr; + + var eid = e.Headers.GetValueOrDefault(PulsarEnvelopeConstants.OriginMessageIdMetadataKey, + e.MessageData.MessageId.ToString()); + + if (!e.Headers.ContainsKey(PulsarEnvelopeConstants.OriginMessageIdMetadataKey)) + { + messageMetadata[PulsarEnvelopeConstants.OriginMessageIdMetadataKey] = eid; + } + + + if (!isDeadLettered) + { + messageMetadata[PulsarEnvelopeConstants.ReconsumeTimes] = envelope.Attempts.ToString(); + var delayTime = _endpoint.RetryLetterTopic!.Retry[envelope.Attempts - 1]; + messageMetadata[PulsarEnvelopeConstants.DelayTimeMetadataKey] = + delayTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture); + messageMetadata.DeliverAtTimeAsDateTimeOffset = DateTimeOffset.UtcNow.Add(delayTime); + } + else + { + //messageMetadata[PulsarEnvelopeConstants.DelayTimeMetadataKey] = null; + messageMetadata.DeliverAtTimeAsDateTimeOffset = DateTimeOffset.UtcNow; + e.Headers[PulsarEnvelopeConstants.Exception] = exception.ToString(); + } + + + + return messageMetadata; + } +} + + +public static class MessageExtensions +{ + public static bool TryGetMessageProperty(this DotPulsar.Abstractions.IMessage message, string key, out string val) + { + return message.Properties.TryGetValue(key , out val); + } +} + diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs index d5fdf4dd9..5717f5bc2 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs @@ -1,6 +1,7 @@ using DotPulsar; using DotPulsar.Abstractions; using JasperFx.Core; +using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; @@ -25,6 +26,41 @@ public PulsarTransport() : base(ProtocolName, "Pulsar") public IPulsarClientBuilder Builder { get; } internal IPulsarClient? Client { get; private set; } + public DeadLetterTopic? DeadLetterTopic { get; internal set; } // TODO: should we even have a default or just per endpoint based? + public RetryLetterTopic? RetryLetterTopic { get; internal set; } // TODO: should we even have a default or just per endpoint based? + + + //private IEnumerable enabledDeadLetterTopics() + //{ + // if (DeadLetterTopic.Mode != DeadLetterTopicMode.WolverineStorage) + // { + // yield return DeadLetterTopic; + // } + + // foreach (var queue in endpoints()) + // { + // if (queue.IsPersistent && queue.Role == EndpointRole.Application && queue.DeadLetterTopic != null && + // queue.DeadLetterTopic.Mode != DeadLetterTopicMode.WolverineStorage) + // { + // yield return queue.DeadLetterTopic; + // } + // } + //} + + //public IEnumerable enabledRetryLetterTopics() + //{ + // if (RetryLetterTopic != null) + // { + // yield return RetryLetterTopic; + // } + // foreach (var queue in endpoints()) + // { + // if (queue.IsPersistent && queue.Role == EndpointRole.Application && queue.RetryLetterTopic != null) + // { + // yield return queue.RetryLetterTopic; + // } + // } + //} public ValueTask DisposeAsync() { @@ -52,6 +88,8 @@ public override ValueTask InitializeAsync(IWolverineRuntime runtime) return ValueTask.CompletedTask; } + + public PulsarEndpoint EndpointFor(string topicPath) { var uri = PulsarEndpoint.UriFor(topicPath); diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs index bbcf640c9..154867028 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs @@ -1,8 +1,10 @@ +using System.Net; using DotPulsar; using DotPulsar.Abstractions; using JasperFx.Core.Reflection; using Wolverine.Configuration; using Wolverine.ErrorHandling; +using Wolverine.Pulsar.ErrorHandling; namespace Wolverine.Pulsar; @@ -29,6 +31,12 @@ internal static PulsarTransport PulsarTransport(this WolverineOptions endpoints) /// public static void UsePulsar(this WolverineOptions endpoints, Action configure) { + // doesn't apply the policy?!?: + //endpoints.Policies.Add(); + //endpoints.Policies.Add(new PulsarNativeResiliencyPolicy()); + + new PulsarNativeResiliencyPolicy().Apply(endpoints); + configure(endpoints.PulsarTransport().Builder); } @@ -73,6 +81,7 @@ public static PulsarListenerConfiguration ListenToPulsarTopic(this WolverineOpti endpoint.IsListener = true; return new PulsarListenerConfiguration(endpoint); } + } public class PulsarListenerConfiguration : ListenerConfiguration @@ -108,8 +117,62 @@ public PulsarListenerConfiguration SubscriptionType(SubscriptionType subscriptio e.SubscriptionType = subscriptionType; }); + // TODO: check how to restrict it properly + //if (subscriptionType is DotPulsar.SubscriptionType.Shared or DotPulsar.SubscriptionType.KeyShared) + // return new PulsarSharedListenerConfiguration(this._endpoint); + + return this; + } + + /// + /// Override the Pulsar subscription type to for just this topic + /// + /// + /// + public PulsarListenerConfiguration WithFailoverSubscriptionType() + { + add(e => { e.SubscriptionType = DotPulsar.SubscriptionType.Failover; }); + + return this; + } + + /// + /// Override the Pulsar subscription type to for just this topic + /// + /// + /// + public PulsarListenerConfiguration WithExclusiveSubscriptionType() + { + add(e => { e.SubscriptionType = DotPulsar.SubscriptionType.Exclusive; }); + return this; } + + /// + /// Override the Pulsar subscription type to for just this topic + /// + /// + /// + public PulsarNativeResiliencyDeadLetterConfiguration WithSharedSubscriptionType() + { + add(e => { e.SubscriptionType = DotPulsar.SubscriptionType.Shared; }); + + return new PulsarNativeResiliencyDeadLetterConfiguration(new PulsarListenerConfiguration(_endpoint)); + } + + + /// + /// Override the Pulsar subscription type to for just this topic + /// + /// + /// + public PulsarNativeResiliencyDeadLetterConfiguration WithKeySharedSubscriptionType() + { + add(e => { e.SubscriptionType = DotPulsar.SubscriptionType.KeyShared; }); + + return new PulsarNativeResiliencyDeadLetterConfiguration(new PulsarListenerConfiguration(_endpoint)); + } + /// /// Add circuit breaker exception handling to this listener /// @@ -127,6 +190,28 @@ public PulsarListenerConfiguration CircuitBreaker(Action? return this; } + + /// + /// Customize the dead letter queueing for this specific endpoint + /// + /// Optional configuration + /// + public PulsarListenerConfiguration DeadLetterQueueing(DeadLetterTopic dlq) + { + add(e => + { + e.DeadLetterTopic = dlq; + e.Runtime.Options.Policies.OnAnyException().MoveToErrorQueue(); + }); + + return this; + } + + internal void Apply(Action action) + { + add(action); + } + // /// // /// To optimize the message listener throughput, // /// start up multiple listening endpoints. This is @@ -143,6 +228,161 @@ public PulsarListenerConfiguration CircuitBreaker(Action? // } } +public class PulsarNativeResiliencyConfig +{ + public DeadLetterTopic DeadLetterTopic { get; set; } + public RetryLetterTopic? RetryLetterTopic { get; set; } + + + public Action Apply() + { + return endpoint => + { + if (RetryLetterTopic is null && DeadLetterTopic is null) + { + endpoint.DeadLetterTopic = null; + endpoint.RetryLetterTopic = null; + return; + } + + // Set the DLQ configuration regardless + if (DeadLetterTopic is not null) + { + endpoint.DeadLetterTopic = DeadLetterTopic; + } + + if (RetryLetterTopic is not null) + { + // Validate subscription type + if (endpoint.SubscriptionType is SubscriptionType.Failover or SubscriptionType.Exclusive) + { + throw new InvalidOperationException( + "Pulsar does not support Retry letter queueing with Failover or Exclusive subscription types. Please use Shared or KeyShared subscription types."); + } + + // Set retry configuration + endpoint.RetryLetterTopic = RetryLetterTopic; + + endpoint.Runtime.Options.EnableAutomaticFailureAcks = false; + } + + //if (RetryLetterTopic is null) + //{ + // // Just move to error queue with no retry + // endpoint.Runtime.Options.Policies.OnAnyException().MoveToErrorQueue(); + //} + //else + //{ + + // // Set retry configuration + // endpoint.RetryLetterTopic = RetryLetterTopic; + + // // Configure retry policy + + // //endpoint.IncomingRules + // endpoint.Runtime.Options.Policies.OnAnyException() + // .ScheduleRetry(RetryLetterTopic.Retry.ToArray()) + // .Then + // .MoveToErrorQueue(); + + // endpoint.Runtime.Options.EnableAutomaticFailureAcks = false; + //} + }; + } + +} + +public abstract class PulsarNativeResiliencyConfiguration +{ + protected readonly PulsarListenerConfiguration Endpoint; + protected PulsarNativeResiliencyConfig NativeResiliencyConfig; + + protected PulsarNativeResiliencyConfiguration(PulsarListenerConfiguration endpoint) + { + Endpoint = endpoint; + NativeResiliencyConfig = new PulsarNativeResiliencyConfig(); + + } + + protected PulsarNativeResiliencyConfiguration(PulsarListenerConfiguration endpoint, PulsarNativeResiliencyConfig config) + { + Endpoint = endpoint; + NativeResiliencyConfig = config; + + } + +} + + +public class PulsarNativeResiliencyDeadLetterConfiguration : PulsarNativeResiliencyConfiguration +{ + + + public PulsarNativeResiliencyDeadLetterConfiguration(PulsarListenerConfiguration endpoint) + : base(endpoint) + { + + + } + + /// + /// Customize the dead letter queueing for this specific endpoint + /// + /// DLQ configuration + /// + public PulsarNativeResiliencyRetryLetterConfiguration DeadLetterQueueing(DeadLetterTopic dlq) + { + NativeResiliencyConfig.DeadLetterTopic = dlq; + + return new PulsarNativeResiliencyRetryLetterConfiguration(Endpoint, NativeResiliencyConfig); + } + + /// + /// Disable native DLQ functionality for this queue + /// + /// + public PulsarListenerConfiguration DisableDeadLetterQueueing() + { + return this.Endpoint; + } +} + +public class PulsarNativeResiliencyRetryLetterConfiguration : PulsarNativeResiliencyConfiguration +{ + + public PulsarNativeResiliencyRetryLetterConfiguration(PulsarListenerConfiguration endpoint, PulsarNativeResiliencyConfig config) + : base(endpoint, config) + { + + + } + + /// + /// Customize the retry letter queueing for this specific endpoint + /// + /// Optional configuration + /// + public PulsarListenerConfiguration RetryLetterQueueing(RetryLetterTopic rt) + { + NativeResiliencyConfig.RetryLetterTopic = rt; + Endpoint.Apply(NativeResiliencyConfig.Apply()); + + return Endpoint; + } + + /// + /// Disable native Retry letter functionality for this queue + /// + /// + public PulsarListenerConfiguration DisableRetryLetterQueueing() + { + NativeResiliencyConfig.RetryLetterTopic = null; + Endpoint.Apply(NativeResiliencyConfig.Apply()); + + return Endpoint; + } +} + public class PulsarSubscriberConfiguration : SubscriberConfiguration { public PulsarSubscriberConfiguration(PulsarEndpoint endpoint) : base(endpoint) diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/RetryLetterTopic.cs b/src/Transports/Pulsar/Wolverine.Pulsar/RetryLetterTopic.cs new file mode 100644 index 000000000..c3000ea7f --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/RetryLetterTopic.cs @@ -0,0 +1,73 @@ +using DotPulsar; + +namespace Wolverine.Pulsar; + +/// +/// TODO: how to handle retries internally in Wolverine? +/// +public class RetryLetterTopic +{ + public static RetryLetterTopic DefaultNative => new([ + TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(5), TimeSpan.FromMinutes(2) + ]); + + + /// + /// Message delaying does not work with Pulsar if the subscription type is not shared or key shared + /// + public static IReadOnlySet SupportedSubscriptionTypes = new HashSet() + { + SubscriptionType.Shared, SubscriptionType.KeyShared + }; + + private string? _topicName; + private readonly List _retries; + + public RetryLetterTopic(List retries) + { + _retries = retries; + } + public RetryLetterTopic(string topicName, List retries) + { + _topicName = topicName; + _retries = retries; + } + + public string? TopicName + { + get => _topicName; + set => _topicName = value ?? throw new ArgumentNullException(nameof(TopicName)); + } + + public List Retry => _retries.ToList(); + + protected bool Equals(RetryLetterTopic other) + { + return _topicName == other._topicName; + } + + public override bool Equals(object? obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != this.GetType()) + { + return false; + } + + return Equals((RetryLetterTopic)obj); + } + + public override int GetHashCode() + { + return _topicName.GetHashCode(); + } +} \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/Wolverine.Pulsar.csproj b/src/Transports/Pulsar/Wolverine.Pulsar/Wolverine.Pulsar.csproj index 28c81e20c..5a8765c9e 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/Wolverine.Pulsar.csproj +++ b/src/Transports/Pulsar/Wolverine.Pulsar/Wolverine.Pulsar.csproj @@ -9,12 +9,12 @@ false - + - + - + diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index fcea4c9c9..55474c6c2 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -124,8 +124,9 @@ public async Task ReScheduleAsync(DateTimeOffset scheduledTime) throw new InvalidOperationException("No Envelope is active for this context"); } + Runtime.MessageTracking.Requeued(Envelope); Envelope.ScheduledTime = scheduledTime; - if (_channel is ISupportNativeScheduling c) + if (tryGetRescheduler(_channel, Envelope) is ISupportNativeScheduling c) { await c.MoveToScheduledUntilAsync(Envelope, Envelope.ScheduledTime.Value); } @@ -135,6 +136,36 @@ public async Task ReScheduleAsync(DateTimeOffset scheduledTime) } } + private ISupportNativeScheduling? tryGetRescheduler(IChannelCallback? channel, Envelope e) + { + // TODO: is that ok, or should we modify Task ISupportNativeScheduling.MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) in DurableReceiver and BufferedReceiver? + if (e.Listener is ISupportNativeScheduling c2) + { + return c2; + } + + if (channel is ISupportNativeScheduling c) + { + return c; + } + + return default; + } + private ISupportDeadLetterQueue? tryGetDeadLetterQueue(IChannelCallback? channel, Envelope e) + { + if (_channel is ISupportDeadLetterQueue { NativeDeadLetterQueueEnabled: true } c) + { + return c; + } + + if (e.Listener is ISupportDeadLetterQueue { NativeDeadLetterQueueEnabled: true } c2) + { + return c2; + } + + return default; + } + public async Task MoveToDeadLetterQueueAsync(Exception exception) { // Don't bother with agent commands @@ -145,18 +176,19 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception) throw new InvalidOperationException("No Envelope is active for this context"); } - if (_channel is ISupportDeadLetterQueue c && c.NativeDeadLetterQueueEnabled) + var deadLetterQueue = tryGetDeadLetterQueue(_channel, Envelope); + if (deadLetterQueue is not null) { if (Envelope.Batch != null) { foreach (var envelope in Envelope.Batch) { - await c.MoveToErrorsAsync(envelope, exception); + await deadLetterQueue.MoveToErrorsAsync(envelope, exception); } } else { - await c.MoveToErrorsAsync(Envelope, exception); + await deadLetterQueue.MoveToErrorsAsync(Envelope, exception); } return; diff --git a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs index f56d4edb6..c2c179d9d 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs @@ -13,8 +13,10 @@ public sealed partial class WolverineRuntime : IMessageTracker public const int NoRoutesEventId = 107; public const int MovedToErrorQueueId = 108; public const int UndeliverableEventId = 108; + public const int RescheduledEventId = 109; private static readonly Action _movedToErrorQueue; + private static readonly Action _rescheduled; private static readonly Action _noHandler; private static readonly Action _noRoutes; private static readonly Action _received; @@ -41,6 +43,9 @@ static WolverineRuntime() _noRoutes = LoggerMessage.Define(LogLevel.Information, NoRoutesEventId, "No routes can be determined for {envelope}"); + _rescheduled = LoggerMessage.Define(LogLevel.Error, RescheduledEventId, + "Envelope {envelope} was rescheduled to queue"); + _movedToErrorQueue = LoggerMessage.Define(LogLevel.Error, MovedToErrorQueueId, "Envelope {envelope} was moved to the error queue"); @@ -143,6 +148,7 @@ public void Requeued(Envelope envelope) { Logger.LogInformation("Requeue for message {Id} of message type {MessageType}", envelope.Id, envelope.MessageType); ActiveSession?.Record(MessageEventType.Requeued, envelope, _serviceName, _uniqueNodeId); + _rescheduled(Logger, envelope, null); } [Obsolete("Try to eliminate this")] diff --git a/src/Wolverine/Runtime/WolverineTracing.cs b/src/Wolverine/Runtime/WolverineTracing.cs index 0f2e03f8a..94de07af7 100644 --- a/src/Wolverine/Runtime/WolverineTracing.cs +++ b/src/Wolverine/Runtime/WolverineTracing.cs @@ -30,7 +30,7 @@ public const string /// ActivityEvent marking when an incoming envelope is discarded /// public const string EnvelopeDiscarded = "wolverine.envelope.discarded"; - + /// /// ActivityEvent marking when an incoming envelope is being moved to the error queue ///