Skip to content

Track basic.return #1686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions projects/RabbitMQ.Client/Events/BasicAckEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading;

namespace RabbitMQ.Client.Events
Expand All @@ -38,11 +37,13 @@ namespace RabbitMQ.Client.Events
///from an AMQP broker within the Basic content-class.</summary>
public class BasicAckEventArgs : AsyncEventArgs
{
public BasicAckEventArgs(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
public BasicAckEventArgs(ulong deliveryTag, bool multiple, bool returned,
CancellationToken cancellationToken = default)
: base(cancellationToken)
{
DeliveryTag = deliveryTag;
Multiple = multiple;
Returned = returned;
}

///<summary>The sequence number of the acknowledged message, or
Expand All @@ -53,5 +54,10 @@ public BasicAckEventArgs(ulong deliveryTag, bool multiple, CancellationToken can
///<summary>Whether this acknowledgement applies to one message
///or multiple messages.</summary>
public readonly bool Multiple;

///<summary>
///Whether this acknowledgement was due to a <c>basic.return</c>
///</summary>
public readonly bool Returned;
}
}
11 changes: 10 additions & 1 deletion projects/RabbitMQ.Client/Framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace RabbitMQ.Client.Framing
{
internal class Channel : ChannelBase
{
private bool _processedBasicReturn = false;

public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
{
Expand Down Expand Up @@ -88,14 +90,21 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.BasicAck:
{
return HandleBasicAck(cmd, cancellationToken);
bool returned = false;
if (_processedBasicReturn)
{
returned = true;
_processedBasicReturn = false;
}
return HandleBasicAck(cmd, returned, cancellationToken);
}
case ProtocolCommandId.BasicNack:
{
return HandleBasicNack(cmd, cancellationToken);
}
case ProtocolCommandId.BasicReturn:
{
_processedBasicReturn = true;
// Note: always returns true
return HandleBasicReturn(cmd, cancellationToken);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,12 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
return ModelSendAsync(in method, cancellationToken).AsTask();
}

protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken)
protected async Task<bool> HandleBasicAck(IncomingCommand cmd, bool returned, CancellationToken cancellationToken)
{
var ack = new BasicAck(cmd.MethodSpan);
if (!_basicAcksAsyncWrapper.IsEmpty)
{
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken);
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, returned, cancellationToken);
await _basicAcksAsyncWrapper.InvokeAsync(this, args)
.ConfigureAwait(false);
}
Expand Down
1 change: 0 additions & 1 deletion projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ RabbitMQ.Client.Events.AsyncEventArgs
RabbitMQ.Client.Events.AsyncEventArgs.AsyncEventArgs(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.AsyncEventArgs.CancellationToken.get -> System.Threading.CancellationToken
RabbitMQ.Client.Events.BaseExceptionEventArgs.BaseExceptionEventArgs(System.Collections.Generic.IDictionary<string!, object!>! detail, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.BasicDeliverEventArgs.BasicDeliverEventArgs(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.BasicNackEventArgs.BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.BasicReturnEventArgs.BasicReturnEventArgs(ushort replyCode, string! replyText, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple, bool returned, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
readonly RabbitMQ.Client.Events.BasicAckEventArgs.Returned -> bool
62 changes: 55 additions & 7 deletions projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
using System.Threading.Tasks;
using RabbitMQ.Client;

const int MESSAGE_COUNT = 50_000;
// const int MESSAGE_COUNT = 50_000;
const int MESSAGE_COUNT = 50;
bool debug = false;

await PublishMessagesIndividuallyAsync();
await PublishMessagesInBatchAsync();
// await PublishMessagesIndividuallyAsync();
// await PublishMessagesInBatchAsync();
await HandlePublishConfirmsAsynchronously();

static Task<IConnection> CreateConnectionAsync()
Expand All @@ -19,6 +20,7 @@ static Task<IConnection> CreateConnectionAsync()
return factory.CreateConnectionAsync();
}

#if FOO
static async Task PublishMessagesIndividuallyAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");
Expand Down Expand Up @@ -92,6 +94,7 @@ static async Task PublishMessagesInBatchAsync()
sw.Stop();
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
}
#endif

async Task HandlePublishConfirmsAsynchronously()
{
Expand Down Expand Up @@ -158,7 +161,28 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
}
}

channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicReturnAsync += (sender, ea) =>
{
ulong publishSeqNo = 0;
if (ea.BasicProperties.Headers is not null)
{
object? val = ea.BasicProperties.Headers["x-publish-seq-no"];
if (val is not null)
{
publishSeqNo = ulong.Parse(Encoding.ASCII.GetString((byte[])val));
}
}

Console.WriteLine($"{DateTime.Now} [WARNING] message has been basic.return-ed, seq no: {publishSeqNo}");
return Task.CompletedTask;
};

channel.BasicAcksAsync += (sender, ea) =>
{
Console.WriteLine($"{DateTime.Now} [INFO] message sequence number: {ea.DeliveryTag} has been acked (multiple: {ea.Multiple}, returned: {ea.Returned})");
return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

channel.BasicNacksAsync += (sender, ea) =>
{
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
Expand All @@ -168,7 +192,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
var sw = new Stopwatch();
sw.Start();

var publishTasks = new List<Task>();
var publishTasks = new List<ValueTask>();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
string msg = i.ToString();
Expand All @@ -187,11 +211,35 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
semaphore.Release();
}
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());

var headers = new Dictionary<string, object?>
{
["x-publish-seq-no"] = nextPublishSeqNo.ToString()
};

var props = new BasicProperties
{
Headers = headers
};

string routingKey = queueName;
int modulo = Random.Shared.Next(1, 5);
if (i % modulo == 0)
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing message {nextPublishSeqNo} with random routing key so it should be returned");
routingKey = Guid.NewGuid().ToString();
}
ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey,
mandatory: true, basicProperties: props, body: body);
publishTasks.Add(publishTask);
}

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
foreach (ValueTask vt in publishTasks)
{
await vt;
}
publishingCompleted = true;

try
Expand Down
Loading