Replies: 3 comments
-
@punxrok Hey, can you please use Discord for questions rather than GH issues? We try to keep the issue count low, and you can't really close questions like this. Short story: no, that's not supported, and why that's not documented:) Happy to take a PR for that if you want to. We're admittedly leaning hard into a database backed DLQ, but will use native DLQ capabilities if they exist. You might be the first person to use the Pulsar transport. Someone wanted it years ago, and it was hanging around, so I released it just in case. |
Beta Was this translation helpful? Give feedback.
-
Hi, sorry. Will do it next time using Discord. I played a bit around with Pulsar and resiliency (nuget: DotPulsar.Extensions.Resiliency) and made a test case, but the retry mechanism is only supported on SubscriptionType.Shared or SubscriptionType.KeyShared: using DotPulsar;
using DotPulsar.Extensions;
using DotPulsar.Internal;
using DotPulsar.Schemas;
using Polly;
using System.Text;
var cts = new CancellationTokenSource();
var token = cts.Token;
Console.CancelKeyPress += (sender, e) =>
{
e.Cancel = true;
cts.Cancel();
};
await using var client = PulsarClient.Builder().Build(); // Connecting to pulsar://localhost:6650
await using var producer = client.NewProducer(Schema.String).Topic("persistent://public/default/mytopic").Create();
var metadataForScheduledMsg = new MessageMetadata
{
DeliverAtTimeAsDateTime = DateTime.UtcNow.AddSeconds(10)
};
var rnd = new Random();
Console.WriteLine("Sending message....");
var messageId = await producer.Send($"Hello World msg {rnd.Next(100)}", token);
var maxRedeliveryCount = 3;
await using var dlq = new DeadLetterPolicy(
client.NewProducer().Topic("persistent://public/default/mytopic-DLQ"),
client.NewProducer().Topic("persistent://public/default/mytopic-RETRY"),
maxRedeliveryCount
,TimeSpan.FromSeconds(15)
//, new ResiliencePipelineBuilder().AddResilientProducerDefaults(options => { }, options => { }).Build()
);
await using var consumer = client.NewConsumer(Schema.String)
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.SubscriptionType(SubscriptionType.Shared) // or key shared - others are not supported - https://github.com/apache/pulsar-dotpulsar/issues/169
.InitialPosition(SubscriptionInitialPosition.Latest)
.Create();
var c1 = Task.Run(async () =>
{
try
{
await foreach (var message in consumer.Messages(token))
{
try
{
Console.WriteLine($"{DateTime.Now}: Received: {message.Value()}");
throw new Exception("Simulated failure in message processing.");
await consumer.Acknowledge(message);
}
catch (Exception e)
{
Console.WriteLine(e);
await dlq.ReconsumeLater(message, delayTime: TimeSpan.FromSeconds(5)); // you can override the delay time like in relation to the message properties
await consumer.Acknowledge(message);
}
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Task was canceled.");
}
}, token);
await using var consumerRetry = client.NewConsumer(Schema.String)
.SubscriptionName("MySubscription")
.SubscriptionType(SubscriptionType.Shared) // or key shared - others are not supported
.Topic("persistent://public/default/mytopic-RETRY")
.InitialPosition(SubscriptionInitialPosition.Latest)
.Create();
var c2 = Task.Run(async () =>
{
try
{
await foreach (var message in consumerRetry.Messages(token))
{
try
{
Console.WriteLine($"{DateTime.Now}: Received RETRY topic: {message.Value()}. Attempt count: {int.Parse(message.Properties["RECONSUMETIMES"])}/{maxRedeliveryCount}");
throw new Exception("Simulated failure in message processing.");
await consumer.Acknowledge(message);
}
catch (Exception e)
{
Console.WriteLine(e);
await dlq.ReconsumeLater(message, delayTime: TimeSpan.FromSeconds(5));
await consumerRetry.Acknowledge(message);
}
//finally
//{
// var reconsumedCount = int.Parse(message.Properties["RECONSUMETIMES"]);
// if (reconsumedCount >= maxRedeliveryCount)
// await consumerRetry.Acknowledge(message);
//}
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Task was canceled.");
}
}, token);
await using var consumerDlq= client.NewConsumer(Schema.String)
.SubscriptionName("MySubscription")
.SubscriptionType(SubscriptionType.Shared) // or key shared - others are not supported
.Topic("persistent://public/default/mytopic-DLQ")
.InitialPosition(SubscriptionInitialPosition.Latest)
.Create();
var c3 = Task.Run(async () =>
{
try
{
await foreach (var message in consumerDlq.Messages(token))
{
try
{
Console.WriteLine($"{DateTime.Now}: Received DLQ dead-lettered message: {message.Value()}");
await consumerDlq.Acknowledge(message);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Task was canceled.");
}
}, token);
//await using var producer2 = client.NewProducer(Schema.String)
// .Topic("...")
// .CreateResilient(static pipeline => {
// pipeline.AddResilientProducerDefaults(configureRetry: static options => {
// options.MaxRetryAttempts = 3;
// });
// });
await Task.WhenAll([c1, c2, c3]);
Console.WriteLine("Stopping...");
I hope to be eventually able to add the functionality PR. |
Beta Was this translation helpful? Give feedback.
-
Hey @jeremydmiller |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I'm playing around with Wolverine and Pulsar.
I have made a test app which publishes a message, and the message is pushed to Pulsar.
Some other background service listens to some Pulsar topic, and let's say that the handler throws an exception.
I have set this inside UsePulsar method:
opts.OnException(exception => { return true; }).MoveToErrorQueue();
And when the exception was thrown inside the handler for a specific message, the Wolverine internals stated, that the message was moved to the error queue:
AspireApp1.ApiService to pulsar://persistent/public/default/message was moved to the error queue
I thought that internally Wolverine would make a new error/dead-lettered topic, and the message will be sent there. But nothing new appears (topic) inside Pulsar.
Is that still unsupported when using Pulsar, or am I missing something (I don't know the Wolverine, I only started working on it yesterday).
Log:
2025-03-21T16:04:31 Service ID: 7a8348fe-5a91-4d4d-94cb-4cabdb558c17 - Received Message with payload: Body9
2025-03-21T16:04:31 info: AspireApp1.Contracts.Message1[104]
2025-03-21T16:04:31 Successfully processed message AspireApp1.Contracts.Message1#08dd6889-af11-ce8b-fc34-9748fcfa0000 from pulsar://persistent/public/default/message
2025-03-21T16:04:31 info: AspireApp1.WorkerServiceTest.TaskRequestHandler[0]
2025-03-21T16:04:31 Service ID: 7a8348fe-5a91-4d4d-94cb-4cabdb558c17 - Received Message with payload: Body10
2025-03-21T16:04:31 Service ID: 7a8348fe-5a91-4d4d-94cb-4cabdb558c17 - Received Message with payload: Body10
2025-03-21T16:04:31 info: AspireApp1.Contracts.Message1[104]
2025-03-21T16:04:31 Successfully processed message AspireApp1.Contracts.Message1#08dd6889-af12-af4a-fc34-9748fcfa0000 from pulsar://persistent/public/default/message
2025-03-21T16:04:31 info: AspireApp1.WorkerServiceTest.TaskRequestHandler[0]
2025-03-21T16:04:31 Service ID: 7a8348fe-5a91-4d4d-94cb-4cabdb558c17 - Received Message with payload: Body11
2025-03-21T16:04:31 Service ID: 7a8348fe-5a91-4d4d-94cb-4cabdb558c17 - Received Message with payload: Body11
2025-03-21T16:04:36 fail: AspireApp1.Contracts.Message1[105]
2025-03-21T16:04:36 Failed to process message AspireApp1.Contracts.Message1#08dd6889-af13-af88-fc34-9748fcfa0000 from pulsar://persistent/public/default/message
2025-03-21T16:04:36 System.Exception: error
2025-03-21T16:04:36 at AspireApp1.WorkerServiceTest.TaskRequestHandler.Handle(Message1 message)
2025-03-21T16:04:36 at Internal.Generated.WolverineHandlers.Message1Handler2067614017.HandleAsync(MessageContext context, CancellationToken cancellation)
2025-03-21T16:04:36 at Wolverine.Runtime.Handlers.Executor.ExecuteAsync(MessageContext context, CancellationToken cancellation) in /home/runner/work/wolverine/wolverine/src/Wolverine/Runtime/Handlers/Executor.cs:line 166
2025-03-21T16:04:37 fail: Wolverine.Runtime.WolverineRuntime[108]
2025-03-21T16:04:37 Envelope Envelope #08dd6889-af13-af88-fc34-9748fcfa0000/CorrelationId=9085f6de5d3d18da2af2f221ed028881 (AspireApp1.Contracts.Message1) from AspireApp1.ApiService to pulsar://persistent/public/default/message was moved to the error queue
2025-03-21T16:04:37 System.Exception: error
2025-03-21T16:04:37 at AspireApp1.WorkerServiceTest.TaskRequestHandler.Handle(Message1 message)
2025-03-21T16:04:37 at Internal.Generated.WolverineHandlers.Message1Handler2067614017.HandleAsync(MessageContext context, CancellationToken cancellation)
2025-03-21T16:04:37 at Wolverine.Runtime.Handlers.Executor.ExecuteAsync(MessageContext context, CancellationToken cancellation) in /home/runner/work/wolverine/wolverine/src/Wolverine/Runtime/Handlers/Executor.cs:line 166
Beta Was this translation helpful? Give feedback.
All reactions