Skip to content

Pulsar support for retry and DLQ #1333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
55733ef
Merge pull request #1 from JasperFx/main
punxrok Mar 25, 2025
4014879
#1326 Pulsar support for native retry and DLQ queues - beware of ugly…
Mar 26, 2025
e846db9
#1326 Pulsar support for native retry and DLQ queues - beware of ugly…
Mar 26, 2025
0d14f9b
#1326 Pulsar support for native retry and DLQ queues - beware of ugly…
Mar 26, 2025
9caf90d
#1326 Additional constructors for DeadLetterTopic and RetryLetterTopic
Mar 27, 2025
8dd8786
#1326 Additional constructors for DeadLetterTopic and RetryLetterTopic
Mar 27, 2025
42890d6
#1326 Additional constructors for DeadLetterTopic and RetryLetterTopic
Mar 27, 2025
90ad042
#1326 refactoring
Mar 27, 2025
ab74d4f
#1326 refactoring
Mar 27, 2025
d0d1d8c
#1326 fix
Mar 27, 2025
656daf0
#1326 refactoring for MoveToErrorsAsync
Mar 27, 2025
f5f844e
#1326 fix
Mar 27, 2025
c163b41
#1326 refactoring
Mar 27, 2025
30def92
#1326 added support for ISupportRetryLetterQueue
Mar 28, 2025
93baf89
#1326 refactoring
Mar 28, 2025
5619c31
#1326 refactoring and some dirty fixes for envelope's Attempts sync w…
Mar 29, 2025
3d33828
#1326 refactoring
Mar 30, 2025
971d51e
#1326 Pulsar support for native retry and DLQ queues in buffered endp…
Mar 30, 2025
8373a44
#1326 bug fixes - better unit test asserts and proper condition waiti…
Mar 31, 2025
a6e8204
#1326 bug fixes - added unit test for only DLQ (without retry queue f…
Mar 31, 2025
844c8b2
#1326 refactoring - as Jeremy suggested, I removed the action MoveTo…
Apr 1, 2025
b50c751
#1326 refactoring - substituted ISupportRetryLetterQueue (to be delet…
Apr 14, 2025
d4c81ac
#1326 refactoring - removed most of not needed constructs from the pr…
Apr 14, 2025
ac343d9
#1326 refactoring - removed not needed construct from the previously …
Apr 16, 2025
908b4de
#1326 refactoring - removed not needed construct from the previously …
Apr 24, 2025
58e2100
#1326 refactoring
Apr 24, 2025
4462d5a
#1326 clean-up
Apr 25, 2025
f7fd39b
#1326 clean-up
Apr 25, 2025
707f40a
#1326 clean-up
Apr 25, 2025
a19f610
#1326 clean-up
Apr 25, 2025
70c4686
#1326 clean-up
Apr 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public static async Task configure()

// And all the normal Wolverine options...
.Sequential();


// Listen for incoming messages from a Pulsar topic with a shared subscription and using RETRY and DLQ queues
opts.ListenToPulsarTopic("persistent://public/default/three")
.WithSharedSubscriptionType()
.DeadLetterQueueing(new DeadLetterTopic(DeadLetterTopicMode.Native))
.RetryLetterQueueing(new RetryLetterTopic([TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5)]))
.Sequential();
});

#endregion
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Oakton;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.ComplianceTests.Scheduling;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.Pulsar.Tests;

public class PulsarNativeReliabilityTests : /*TransportComplianceFixture,*/ IAsyncLifetime
{
public IHost WolverineHost;

public PulsarNativeReliabilityTests()
{
}

private IHostBuilder ConfigureBuilder()
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{

var topic = Guid.NewGuid().ToString();
var topicPath = $"persistent://public/default/compliance{topic}";

opts.UsePulsar(b => { });

opts.IncludeType<SRMessageHandlers>();

opts.PublishMessage<SRMessage1>()
.ToPulsarTopic(topicPath);

opts.ListenToPulsarTopic(topicPath)
.WithSharedSubscriptionType()
.DeadLetterQueueing(DeadLetterTopic.DefaultNative)
.RetryLetterQueueing(new RetryLetterTopic([TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3)]))
//.ProcessInline();
.BufferedInMemory();


var topicPath2 = $"persistent://public/default/no-retry-{topic}";
opts.IncludeType<SRMessageHandlers>();

opts.PublishMessage<SRMessage2>()
.ToPulsarTopic(topicPath2);

opts.ListenToPulsarTopic(topicPath2)
.WithSharedSubscriptionType()
.DeadLetterQueueing(DeadLetterTopic.DefaultNative)
.DisableRetryLetterQueueing()
.ProcessInline();

});
}

public async Task InitializeAsync()
{
WolverineHost = ConfigureBuilder().Build();
await WolverineHost.StartAsync();
}

[Fact]
public async Task run_setup_with_simulated_exception_in_handler()
{
var session = await WolverineHost.TrackActivity(TimeSpan.FromSeconds(1000))
//.WaitForMessageToBeReceivedAt<SRMessage1>(WolverineHost)
.DoNotAssertOnExceptionsDetected()
.IncludeExternalTransports()
.WaitForCondition(new WaitForDeadLetteredMessage<SRMessage1>())
.SendMessageAndWaitAsync(new SRMessage1());


session.Sent.AllMessages();
session.MovedToErrorQueue
.MessagesOf<SRMessage1>()
.Count()
.ShouldBe(1);

session.Received
.MessagesOf<SRMessage1>()
.Count()
.ShouldBe(4);

session.Requeued
.MessagesOf<SRMessage1>()
.Count()
.ShouldBe(3);


// TODO: I Guess the capture of the envelope headers occurs before we manipulate it
var firstRequeuedEnvelope = session.Requeued.Envelopes().First();
firstRequeuedEnvelope.ShouldSatisfyAllConditions(
() => firstRequeuedEnvelope.Attempts.ShouldBe(1),
() => firstRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeFalse()
);
var secondRequeuedEnvelope = session.Requeued.Envelopes().Skip(1).First();
secondRequeuedEnvelope.ShouldSatisfyAllConditions(
() => secondRequeuedEnvelope.Attempts.ShouldBe(2),
() => secondRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeTrue(),
() => secondRequeuedEnvelope.Headers["DELAY_TIME"].ShouldBe(TimeSpan.FromSeconds(4).TotalMilliseconds.ToString())
);

var thirdRequeuedEnvelope = session.Requeued.Envelopes().Skip(2).First();
thirdRequeuedEnvelope.ShouldSatisfyAllConditions(
() => thirdRequeuedEnvelope.Attempts.ShouldBe(3),
() => thirdRequeuedEnvelope.Headers.ContainsKey("DELAY_TIME").ShouldBeTrue(),
() => thirdRequeuedEnvelope.Headers["DELAY_TIME"].ShouldBe(TimeSpan.FromSeconds(2).TotalMilliseconds.ToString())
);


var dlqEnvelope = session.MovedToErrorQueue.Envelopes().First();
dlqEnvelope.ShouldSatisfyAllConditions(
() => dlqEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.Exception).ShouldBeTrue(),
() => dlqEnvelope.Headers[PulsarEnvelopeConstants.ReconsumeTimes].ShouldBe("3")
);

}

[Fact]
public async Task run_setup_with_simulated_exception_in_handler_only_native_dead_lettered_queue()
{
var session = await WolverineHost.TrackActivity(TimeSpan.FromSeconds(100))
.DoNotAssertOnExceptionsDetected()
.IncludeExternalTransports()
.WaitForCondition(new WaitForDeadLetteredMessage<SRMessage2>())
.SendMessageAndWaitAsync(new SRMessage2());


session.Sent.AllMessages();
session.MovedToErrorQueue
.MessagesOf<SRMessage2>()
.Count()
.ShouldBe(1);

session.Received
.MessagesOf<SRMessage2>()
.Count()
.ShouldBe(1);

session.Requeued
.MessagesOf<SRMessage2>()
.Count()
.ShouldBe(0);




var firstEnvelope = session.MovedToErrorQueue.Envelopes().First();
firstEnvelope.ShouldSatisfyAllConditions(
() => firstEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.Exception).ShouldBeTrue(),
() => firstEnvelope.Headers.ContainsKey(PulsarEnvelopeConstants.ReconsumeTimes).ShouldBeFalse()
);

}



public async Task DisposeAsync()
{
await WolverineHost.StopAsync();
WolverineHost.Dispose();
}


}

public class SRMessage1;
public class SRMessage2;


public class SRMessageHandlers
{
public Task Handle(SRMessage1 message, IMessageContext context)
{
throw new InvalidOperationException("Simulated exception");
}

public Task Handle(SRMessage2 message, IMessageContext context)
{
throw new InvalidOperationException("Simulated exception");
}

}



public class WaitForDeadLetteredMessage<T> : ITrackedCondition
{

private bool _found;

public WaitForDeadLetteredMessage()
{

}

public void Record(EnvelopeRecord record)
{
if (record.Envelope.Message is T && record.MessageEventType == MessageEventType.MovedToErrorQueue )
// && record.Envelope.Destination?.ToString().Contains(_dlqTopic) == true)
{
_found = true;
}
}

public bool IsCompleted() => _found;
}

73 changes: 73 additions & 0 deletions src/Transports/Pulsar/Wolverine.Pulsar/DeadLetterTopic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@


namespace Wolverine.Pulsar;

public enum DeadLetterTopicMode
{
/// <summary>
/// Opt into using Pulsar's native dead letter topic approach. This is the default and recommended
/// </summary>
Native,

/// <summary>
/// Completely ignore Pulsar native dead letter topic in favor of Wolverine persistent dead letter queueing
/// </summary>
WolverineStorage
}

public class DeadLetterTopic
{

public static DeadLetterTopic DefaultNative => new(DeadLetterTopicMode.Native);

private string? _topicName;

public DeadLetterTopicMode Mode { get; set; }

public DeadLetterTopic(DeadLetterTopicMode mode)
{
Mode = mode;
}

public DeadLetterTopic(string topicName, DeadLetterTopicMode mode)
{
_topicName = topicName;
Mode = mode;
}

public string TopicName
{
get => _topicName;
set => _topicName = value?? throw new ArgumentNullException(nameof(TopicName));
}

protected bool Equals(DeadLetterTopic other)
{
return _topicName == other._topicName;
}

public override bool Equals(object? obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}

if (ReferenceEquals(this, obj))
{
return true;
}

if (obj.GetType() != this.GetType())
{
return false;
}

return Equals((DeadLetterTopic)obj);
}

public override int GetHashCode()
{
return _topicName.GetHashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Wolverine.ErrorHandling;
using Wolverine.Runtime;

namespace Wolverine.Pulsar.ErrorHandling;

public class PulsarNativeContinuationSource : IContinuationSource
{
public string Description { get; }

public IContinuation Build(Exception ex, Envelope envelope)
{
// Only handle Pulsar envelopes/listeners
if (envelope.Listener is PulsarListener)
{
return new PulsarNativeResiliencyContinuation(ex);
}

// Return null to let the next continuation source handle it
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Diagnostics;
using Wolverine.ErrorHandling;
using Wolverine.Runtime;

namespace Wolverine.Pulsar.ErrorHandling;

public class PulsarNativeResiliencyContinuation : IContinuation
{
private readonly Exception _exception;

public PulsarNativeResiliencyContinuation(Exception exception)
{
_exception = exception;
}

public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, Activity? activity)
{
var context = lifecycle as MessageContext;
var envelope = context?.Envelope;

if (envelope?.Listener is PulsarListener listener)
{
if (listener.NativeRetryLetterQueueEnabled && listener.RetryLetterTopic!.Retry.Count >= envelope.Attempts)
{
// Use native retry if enabled and attempts are within bounds
//await listener.MoveToScheduledUntilAsync(envelope, now);
await new ScheduledRetryContinuation(listener.RetryLetterTopic!.Retry[envelope.Attempts - 1])
.ExecuteAsync(lifecycle, runtime, now, activity);
return;
}

if (listener.NativeDeadLetterQueueEnabled)
{
await new MoveToErrorQueue(_exception)
.ExecuteAsync(lifecycle, runtime, now, activity);
//await listener.MoveToErrorsAsync(envelope, _exception);
}

return;
}

// Fall back to MoveToErrorQueue for other listener types
await new MoveToErrorQueue(_exception).ExecuteAsync(lifecycle, runtime, now, activity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Wolverine.Configuration;
using Wolverine.ErrorHandling;
using Wolverine.ErrorHandling.Matches;

namespace Wolverine.Pulsar.ErrorHandling;

public class PulsarNativeResiliencyPolicy : IWolverinePolicy
{
public void Apply(WolverineOptions options)
{
var rule = new FailureRule(new AlwaysMatches());

rule.AddSlot(new PulsarNativeContinuationSource());

// Set the same source as the InfiniteSource to handle all other attempts
rule.InfiniteSource = new PulsarNativeContinuationSource();

// Add this rule to the global failure collection
// This ensures it will be checked before other rules
options.Policies.Failures.Add(rule);
}
}
Loading