Skip to content

Commit 9dab7bc

Browse files
authored
Merge pull request #1683 from danielmarbach/event-args-cancellation
Event args cancellation
2 parents 317945c + 66f0e41 commit 9dab7bc

File tree

53 files changed

+433
-297
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+433
-297
lines changed

projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
1717
_autoResetEvent = autoResetEvent;
1818
}
1919

20-
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
21-
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
20+
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
21+
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
22+
CancellationToken cancellationToken = default)
2223
{
2324
if (Interlocked.Increment(ref _current) == Count)
2425
{
@@ -28,11 +29,11 @@ public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool
2829
return Task.CompletedTask;
2930
}
3031

31-
public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask;
32+
public Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;
3233

33-
public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask;
34+
public Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;
3435

35-
public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask;
36+
public Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;
3637

3738
public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask;
3839

projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ public CountingConsumer(IChannel channel, uint messageCount) : base(channel)
4242
}
4343

4444
/// <inheritdoc />
45-
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
45+
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
46+
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
47+
CancellationToken cancellationToken = default)
4648
{
4749
if (Interlocked.Decrement(ref _remainingCount) == 0)
4850
{

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 42 additions & 37 deletions
Large diffs are not rendered by default.

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace RabbitMQ.Client
@@ -45,33 +46,36 @@ public string[] ConsumerTags
4546
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
4647
/// for use in acknowledging received messages, for instance.
4748
/// </summary>
48-
public IChannel Channel { get; private set; }
49+
public IChannel Channel { get; }
4950

5051
/// <summary>
5152
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
5253
/// e.g. the queue has been deleted (either by this channel or by any other channel).
5354
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
5455
/// </summary>
5556
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
56-
public virtual Task HandleBasicCancelAsync(string consumerTag)
57+
/// <param name="cancellationToken">The cancellation token.</param>
58+
public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default)
5759
{
58-
return OnCancel(consumerTag);
60+
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
5961
}
6062

6163
/// <summary>
6264
/// Called upon successful deregistration of the consumer from the broker.
6365
/// </summary>
6466
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
65-
public virtual Task HandleBasicCancelOkAsync(string consumerTag)
67+
/// <param name="cancellationToken">The cancellation token.</param>
68+
public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default)
6669
{
67-
return OnCancel(consumerTag);
70+
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
6871
}
6972

7073
/// <summary>
7174
/// Called upon successful registration of the consumer with the broker.
7275
/// </summary>
7376
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
74-
public virtual Task HandleBasicConsumeOkAsync(string consumerTag)
77+
/// <param name="cancellationToken">The cancellation token.</param>
78+
public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default)
7579
{
7680
_consumerTags.Add(consumerTag);
7781
IsRunning = true;
@@ -94,7 +98,8 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
9498
string exchange,
9599
string routingKey,
96100
IReadOnlyBasicProperties properties,
97-
ReadOnlyMemory<byte> body)
101+
ReadOnlyMemory<byte> body,
102+
CancellationToken cancellationToken = default)
98103
{
99104
// Nothing to do here.
100105
return Task.CompletedTask;
@@ -108,18 +113,20 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
108113
public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason)
109114
{
110115
ShutdownReason = reason;
111-
return OnCancel(_consumerTags.ToArray());
116+
return OnCancelAsync(ConsumerTags, reason.CancellationToken);
112117
}
113118

114119
/// <summary>
115120
/// Default implementation - overridable in subclasses.</summary>
116-
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
121+
/// <param name="consumerTags">The set of consumer tags that were cancelled</param>
122+
/// <param name="cancellationToken">The cancellation token.</param>
117123
/// <remarks>
118124
/// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
119125
/// </remarks>
120-
public virtual Task OnCancel(params string[] consumerTags)
126+
protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
121127
{
122128
IsRunning = false;
129+
123130
foreach (string consumerTag in consumerTags)
124131
{
125132
_consumerTags.Remove(consumerTag);

projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace RabbitMQ.Client
@@ -20,19 +21,22 @@ public interface IAsyncBasicConsumer
2021
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
2122
/// </summary>
2223
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
23-
Task HandleBasicCancelAsync(string consumerTag);
24+
/// <param name="cancellationToken">The cancellation token.</param>
25+
Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default);
2426

2527
/// <summary>
2628
/// Called upon successful deregistration of the consumer from the broker.
2729
/// </summary>
2830
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
29-
Task HandleBasicCancelOkAsync(string consumerTag);
31+
/// <param name="cancellationToken">The cancellation token.</param>
32+
Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default);
3033

3134
/// <summary>
3235
/// Called upon successful registration of the consumer with the broker.
3336
/// </summary>
3437
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
35-
Task HandleBasicConsumeOkAsync(string consumerTag);
38+
/// <param name="cancellationToken">The cancellation token.</param>
39+
Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default);
3640

3741
/// <summary>
3842
/// Called each time a message arrives for this consumer.
@@ -44,7 +48,7 @@ public interface IAsyncBasicConsumer
4448
/// </para>
4549
/// <para>
4650
/// NOTE: Using the <c>body</c> outside of
47-
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte})"/></c>
51+
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte}, CancellationToken)"/></c>
4852
/// requires that it be copied!
4953
/// </para>
5054
/// </remarks>
@@ -55,7 +59,8 @@ Task HandleBasicDeliverAsync(string consumerTag,
5559
string exchange,
5660
string routingKey,
5761
IReadOnlyBasicProperties properties,
58-
ReadOnlyMemory<byte> body);
62+
ReadOnlyMemory<byte> body,
63+
CancellationToken cancellationToken = default);
5964

6065
/// <summary>
6166
/// Called when the channel shuts down.

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public interface IConnection : INetworkConnection, IDisposable
161161
/// <remarks>
162162
/// This event will never fire for connections that disable automatic recovery.
163163
/// </remarks>
164-
event AsyncEventHandler<EventArgs> RecoverySucceededAsync;
164+
event AsyncEventHandler<AsyncEventArgs> RecoverySucceededAsync;
165165

166166
/// <summary>
167167
/// Raised when the connection recovery fails, e.g. because reconnection or topology
@@ -212,7 +212,7 @@ public interface IConnection : INetworkConnection, IDisposable
212212
/// <summary>
213213
/// Raised when a connection is unblocked by the AMQP broker.
214214
/// </summary>
215-
event AsyncEventHandler<EventArgs> ConnectionUnblockedAsync;
215+
event AsyncEventHandler<AsyncEventArgs> ConnectionUnblockedAsync;
216216

217217
/// <summary>
218218
/// This method updates the secret used to authenticate this connection.

projects/RabbitMQ.Client/client/api/IRecoverable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ namespace RabbitMQ.Client
3939
/// </summary>
4040
public interface IRecoverable
4141
{
42-
event AsyncEventHandler<EventArgs> RecoveryAsync;
42+
event AsyncEventHandler<AsyncEventArgs> RecoveryAsync;
4343
}
4444
}

projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Threading;
34+
using RabbitMQ.Client.Events;
3335

3436
namespace RabbitMQ.Client
3537
{
@@ -39,7 +41,8 @@ namespace RabbitMQ.Client
3941
/// <remarks>
4042
/// The <see cref="ClassId"/> and <see cref="Initiator"/> properties should be used to determine the originator of the shutdown event.
4143
/// </remarks>
42-
public class ShutdownEventArgs : EventArgs
44+
/// TODO: Should this be moved to the events folder and the namespace be adjusted?
45+
public class ShutdownEventArgs : AsyncEventArgs
4346
{
4447
private readonly Exception? _exception;
4548

@@ -48,16 +51,17 @@ public class ShutdownEventArgs : EventArgs
4851
/// 0 for <see cref="ClassId"/> and <see cref="MethodId"/>.
4952
/// </summary>
5053
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
51-
object? cause = null)
52-
: this(initiator, replyCode, replyText, 0, 0, cause)
54+
object? cause = null, CancellationToken cancellationToken = default)
55+
: this(initiator, replyCode, replyText, 0, 0, cause, cancellationToken: cancellationToken)
5356
{
5457
}
5558

5659
/// <summary>
5760
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
5861
/// </summary>
5962
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
60-
ushort classId, ushort methodId, object? cause = null)
63+
ushort classId, ushort methodId, object? cause = null, CancellationToken cancellationToken = default)
64+
: base(cancellationToken)
6165
{
6266
Initiator = initiator;
6367
ReplyCode = replyCode;
@@ -70,8 +74,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
7074
/// <summary>
7175
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
7276
/// </summary>
73-
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception)
74-
: this(initiator, replyCode, replyText, 0, 0)
77+
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
78+
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
7579
{
7680
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
7781
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Threading;
33+
34+
namespace RabbitMQ.Client.Events
35+
{
36+
/// <summary>
37+
/// Provides data for <see cref="AsyncEventHandler{T}"/>
38+
/// events that can be invoked asynchronously.
39+
/// </summary>
40+
public class AsyncEventArgs
41+
{
42+
/// <summary>
43+
/// Initializes a new instance of the <see cref="AsyncEventArgs"/>
44+
/// class.
45+
/// </summary>
46+
/// <param name="cancellationToken">
47+
/// A cancellation token related to the original operation that raised
48+
/// the event. It's important for your handler to pass this token
49+
/// along to any asynchronous or long-running synchronous operations
50+
/// that take a token so cancellation will correctly propagate. The
51+
/// default value is <see cref="CancellationToken.None"/>.
52+
/// </param>
53+
public AsyncEventArgs(CancellationToken cancellationToken = default)
54+
: base()
55+
{
56+
CancellationToken = cancellationToken;
57+
}
58+
59+
/// <summary>
60+
/// Gets a cancellation token related to the original operation that
61+
/// raised the event. It's important for your handler to pass this
62+
/// token along to any asynchronous or long-running synchronous
63+
/// operations that take a token so cancellation (via something like
64+
/// <code>
65+
/// new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
66+
/// </code>
67+
/// for example) will correctly propagate.
68+
/// </summary>
69+
public CancellationToken CancellationToken { get; }
70+
71+
public static AsyncEventArgs CreateOrDefault(CancellationToken cancellationToken)
72+
{
73+
if (cancellationToken.CanBeCanceled)
74+
{
75+
return new AsyncEventArgs(cancellationToken);
76+
}
77+
78+
return Empty;
79+
}
80+
81+
/// <summary>
82+
/// Provides a value to use with events that do not have event data.
83+
/// </summary>
84+
public static readonly AsyncEventArgs Empty = new AsyncEventArgs();
85+
}
86+
}

projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@
3333

3434
namespace RabbitMQ.Client.Events
3535
{
36-
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event);
36+
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : AsyncEventArgs;
3737
}

0 commit comments

Comments
 (0)