From 55122b7e78c1d2b071069657f708129ff0bf867b Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 19 Sep 2024 16:13:25 -0700 Subject: [PATCH 1/2] Track `basic.return` Just an idea --- .../Events/BasicAckEventArgs.cs | 10 ++- projects/RabbitMQ.Client/Framing/Channel.cs | 11 +++- projects/RabbitMQ.Client/Impl/ChannelBase.cs | 4 +- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 1 - .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 2 + .../PublisherConfirms/PublisherConfirms.cs | 61 ++++++++++++++++--- 6 files changed, 76 insertions(+), 13 deletions(-) diff --git a/projects/RabbitMQ.Client/Events/BasicAckEventArgs.cs b/projects/RabbitMQ.Client/Events/BasicAckEventArgs.cs index 09b3b39860..d94c79b487 100644 --- a/projects/RabbitMQ.Client/Events/BasicAckEventArgs.cs +++ b/projects/RabbitMQ.Client/Events/BasicAckEventArgs.cs @@ -29,7 +29,6 @@ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- -using System; using System.Threading; namespace RabbitMQ.Client.Events @@ -38,11 +37,13 @@ namespace RabbitMQ.Client.Events ///from an AMQP broker within the Basic content-class. public class BasicAckEventArgs : AsyncEventArgs { - public BasicAckEventArgs(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) + public BasicAckEventArgs(ulong deliveryTag, bool multiple, bool returned, + CancellationToken cancellationToken = default) : base(cancellationToken) { DeliveryTag = deliveryTag; Multiple = multiple; + Returned = returned; } ///The sequence number of the acknowledged message, or @@ -53,5 +54,10 @@ public BasicAckEventArgs(ulong deliveryTag, bool multiple, CancellationToken can ///Whether this acknowledgement applies to one message ///or multiple messages. public readonly bool Multiple; + + /// + ///Whether this acknowledgement was due to a basic.return + /// + public readonly bool Returned; } } diff --git a/projects/RabbitMQ.Client/Framing/Channel.cs b/projects/RabbitMQ.Client/Framing/Channel.cs index ddb1394bdd..b8ccf1f368 100644 --- a/projects/RabbitMQ.Client/Framing/Channel.cs +++ b/projects/RabbitMQ.Client/Framing/Channel.cs @@ -37,6 +37,8 @@ namespace RabbitMQ.Client.Framing { internal class Channel : ChannelBase { + private bool _processedBasicReturn = false; + public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) : base(config, session, consumerDispatchConcurrency) { @@ -88,7 +90,13 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicAck: { - return HandleBasicAck(cmd, cancellationToken); + bool returned = false; + if (_processedBasicReturn) + { + returned = true; + _processedBasicReturn = false; + } + return HandleBasicAck(cmd, returned, cancellationToken); } case ProtocolCommandId.BasicNack: { @@ -96,6 +104,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicReturn: { + _processedBasicReturn = true; // Note: always returns true return HandleBasicReturn(cmd, cancellationToken); } diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 4cd9118d31..9768607570 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -594,12 +594,12 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart return ModelSendAsync(in method, cancellationToken).AsTask(); } - protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) + protected async Task HandleBasicAck(IncomingCommand cmd, bool returned, CancellationToken cancellationToken) { var ack = new BasicAck(cmd.MethodSpan); if (!_basicAcksAsyncWrapper.IsEmpty) { - var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); + var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, returned, cancellationToken); await _basicAcksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 9b4e290f26..17a217fc4a 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -842,7 +842,6 @@ RabbitMQ.Client.Events.AsyncEventArgs RabbitMQ.Client.Events.AsyncEventArgs.AsyncEventArgs(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Events.AsyncEventArgs.CancellationToken.get -> System.Threading.CancellationToken RabbitMQ.Client.Events.BaseExceptionEventArgs.BaseExceptionEventArgs(System.Collections.Generic.IDictionary! detail, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void -RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Events.BasicDeliverEventArgs.BasicDeliverEventArgs(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Events.BasicNackEventArgs.BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Events.BasicReturnEventArgs.BasicReturnEventArgs(ushort replyCode, string! replyText, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb2..9bff1fb2e3 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1,2 @@ +RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple, bool returned, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +readonly RabbitMQ.Client.Events.BasicAckEventArgs.Returned -> bool \ No newline at end of file diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 16b597459d..6442cd9141 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -6,11 +6,12 @@ using System.Threading.Tasks; using RabbitMQ.Client; -const int MESSAGE_COUNT = 50_000; +// const int MESSAGE_COUNT = 50_000; +const int MESSAGE_COUNT = 21; bool debug = false; -await PublishMessagesIndividuallyAsync(); -await PublishMessagesInBatchAsync(); +// await PublishMessagesIndividuallyAsync(); +// await PublishMessagesInBatchAsync(); await HandlePublishConfirmsAsynchronously(); static Task CreateConnectionAsync() @@ -19,6 +20,7 @@ static Task CreateConnectionAsync() return factory.CreateConnectionAsync(); } +#if FOO static async Task PublishMessagesIndividuallyAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once"); @@ -92,6 +94,7 @@ static async Task PublishMessagesInBatchAsync() sw.Stop(); Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); } +#endif async Task HandlePublishConfirmsAsynchronously() { @@ -158,7 +161,28 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) } } - channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + channel.BasicReturnAsync += (sender, ea) => + { + ulong publishSeqNo = 0; + if (ea.BasicProperties.Headers is not null) + { + object? val = ea.BasicProperties.Headers["x-publish-seq-no"]; + if (val is not null) + { + publishSeqNo = ulong.Parse(Encoding.ASCII.GetString((byte[])val)); + } + } + + Console.WriteLine($"{DateTime.Now} [WARNING] message has been basic.return-ed, seq no: {publishSeqNo}"); + return Task.CompletedTask; + }; + + channel.BasicAcksAsync += (sender, ea) => + { + Console.WriteLine($"{DateTime.Now} [INFO] message sequence number: {ea.DeliveryTag} has been acked (multiple: {ea.Multiple}, returned: {ea.Returned})"); + return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + }; + channel.BasicNacksAsync += (sender, ea) => { Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})"); @@ -168,7 +192,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) var sw = new Stopwatch(); sw.Start(); - var publishTasks = new List(); + var publishTasks = new List(); for (int i = 0; i < MESSAGE_COUNT; i++) { string msg = i.ToString(); @@ -187,11 +211,34 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { semaphore.Release(); } - publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask()); + + var headers = new Dictionary + { + ["x-publish-seq-no"] = nextPublishSeqNo.ToString() + }; + + var props = new BasicProperties + { + Headers = headers + }; + + string routingKey = queueName; + if (i % 2 == 0) + { + Console.WriteLine($"{DateTime.Now} [INFO] publishing message {nextPublishSeqNo} with random routing key so it should be returned"); + routingKey = Guid.NewGuid().ToString(); + } + ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey, + mandatory: true, basicProperties: props, body: body); + publishTasks.Add(publishTask); } using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + // await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + foreach (ValueTask vt in publishTasks) + { + await vt; + } publishingCompleted = true; try From ea51574a90827096126b01bae8acd26d3793a663 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 19 Sep 2024 16:17:43 -0700 Subject: [PATCH 2/2] Seems to be working as expected... --- .../Test/Applications/PublisherConfirms/PublisherConfirms.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 6442cd9141..ae943be744 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -7,7 +7,7 @@ using RabbitMQ.Client; // const int MESSAGE_COUNT = 50_000; -const int MESSAGE_COUNT = 21; +const int MESSAGE_COUNT = 50; bool debug = false; // await PublishMessagesIndividuallyAsync(); @@ -223,7 +223,8 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) }; string routingKey = queueName; - if (i % 2 == 0) + int modulo = Random.Shared.Next(1, 5); + if (i % modulo == 0) { Console.WriteLine($"{DateTime.Now} [INFO] publishing message {nextPublishSeqNo} with random routing key so it should be returned"); routingKey = Guid.NewGuid().ToString();