diff --git a/projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs index b85fe6797..5c64dea3d 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs @@ -29,127 +29,44 @@ // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- -using System; -using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Sources; namespace RabbitMQ.Client.Impl { - sealed class AsyncManualResetEvent : IValueTaskSource + sealed class AsyncManualResetEvent { - private ManualResetValueTaskSourceCore _valueTaskSource; - private bool _isSet; + volatile TaskCompletionSource _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously); public AsyncManualResetEvent(bool initialState = false) { - _isSet = initialState; - _valueTaskSource.Reset(); if (initialState) { - _valueTaskSource.SetResult(true); + _taskCompletionSource.SetResult(true); } } - public bool IsSet => Volatile.Read(ref _isSet); + public bool IsSet => _taskCompletionSource.Task.IsCompleted; - public async ValueTask WaitAsync(CancellationToken cancellationToken) + public Task WaitAsync(CancellationToken cancellationToken = default) { - if (IsSet) - { - return; - } - - cancellationToken.ThrowIfCancellationRequested(); - - CancellationTokenRegistration tokenRegistration = -#if NET - cancellationToken.UnsafeRegister( - static state => - { - var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; - source.SetException(new OperationCanceledException(token)); - }, (_valueTaskSource, cancellationToken)); -#else - cancellationToken.Register( - static state => - { - var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; - source.SetException(new OperationCanceledException(token)); - }, - state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false); -#endif - try - { - await new ValueTask(this, _valueTaskSource.Version) - .ConfigureAwait(false); - } - finally - { -#if NET - await tokenRegistration.DisposeAsync() - .ConfigureAwait(false); -#else - tokenRegistration.Dispose(); -#endif - } + Task task = _taskCompletionSource.Task; + return task.IsCompleted ? task : task.WaitAsync(cancellationToken); } - public void Set() - { - if (IsSet) - { - return; - } - - Volatile.Write(ref _isSet, true); - _valueTaskSource.SetResult(true); - } + public void Set() => _taskCompletionSource.TrySetResult(true); public void Reset() { - if (!IsSet) + while (true) { - return; + TaskCompletionSource currentTcs = _taskCompletionSource; + if (!currentTcs.Task.IsCompleted || + Interlocked.CompareExchange(ref _taskCompletionSource, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), currentTcs) == currentTcs) + { + return; + } } - - Volatile.Write(ref _isSet, false); - _valueTaskSource.Reset(); } - - void IValueTaskSource.GetResult(short token) - { - if (token != _valueTaskSource.Version) - { - ThrowIncorrectTokenException(); - } - - _valueTaskSource.GetResult(token); - } - - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) - { - if (token != _valueTaskSource.Version) - { - ThrowIncorrectTokenException(); - } - - return _valueTaskSource.GetStatus(token); - } - - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) - { - if (token != _valueTaskSource.Version) - { - ThrowIncorrectTokenException(); - } - - _valueTaskSource.OnCompleted(continuation, state, token, flags); - } - - [DoesNotReturn] - static void ThrowIncorrectTokenException() => - throw new InvalidOperationException("ValueTask cannot be awaited multiple times."); } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 9ed4e0de1..c0d68f690 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -229,11 +229,11 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken) + private Task MaybeEnforceFlowControlAsync(CancellationToken cancellationToken) { if (_flowControlBlock.IsSet) { - return default; + return Task.CompletedTask; } return _flowControlBlock.WaitAsync(cancellationToken);