Skip to content

Improve AsyncManualResetEvent implementation to address races #1843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 15 additions & 98 deletions projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,127 +29,44 @@
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace RabbitMQ.Client.Impl
{
sealed class AsyncManualResetEvent : IValueTaskSource
sealed class AsyncManualResetEvent
{
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
private bool _isSet;
volatile TaskCompletionSource<bool> _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

public AsyncManualResetEvent(bool initialState = false)
{
_isSet = initialState;
_valueTaskSource.Reset();
if (initialState)
{
_valueTaskSource.SetResult(true);
_taskCompletionSource.SetResult(true);
}
}

public bool IsSet => Volatile.Read(ref _isSet);
public bool IsSet => _taskCompletionSource.Task.IsCompleted;

public async ValueTask WaitAsync(CancellationToken cancellationToken)
public Task WaitAsync(CancellationToken cancellationToken = default)
{
if (IsSet)
{
return;
}

cancellationToken.ThrowIfCancellationRequested();

CancellationTokenRegistration tokenRegistration =
#if NET
cancellationToken.UnsafeRegister(
static state =>
{
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
source.SetException(new OperationCanceledException(token));
}, (_valueTaskSource, cancellationToken));
#else
cancellationToken.Register(
static state =>
{
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
source.SetException(new OperationCanceledException(token));
},
state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false);
#endif
try
{
await new ValueTask(this, _valueTaskSource.Version)
.ConfigureAwait(false);
}
finally
{
#if NET
await tokenRegistration.DisposeAsync()
.ConfigureAwait(false);
#else
tokenRegistration.Dispose();
#endif
}
Task<bool> task = _taskCompletionSource.Task;
return task.IsCompleted ? task : task.WaitAsync(cancellationToken);
}

public void Set()
{
if (IsSet)
{
return;
}

Volatile.Write(ref _isSet, true);
_valueTaskSource.SetResult(true);
}
public void Set() => _taskCompletionSource.TrySetResult(true);

public void Reset()
{
if (!IsSet)
while (true)
{
return;
TaskCompletionSource<bool> currentTcs = _taskCompletionSource;
if (!currentTcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _taskCompletionSource, new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), currentTcs) == currentTcs)
{
return;
}
}

Volatile.Write(ref _isSet, false);
_valueTaskSource.Reset();
}

void IValueTaskSource.GetResult(short token)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

_valueTaskSource.GetResult(token);
}

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

return _valueTaskSource.GetStatus(token);
}

void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

_valueTaskSource.OnCompleted(continuation, state, token, flags);
}

[DoesNotReturn]
static void ThrowIncorrectTokenException() =>
throw new InvalidOperationException("ValueTask cannot be awaited multiple times.");
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
private Task MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
{
if (_flowControlBlock.IsSet)
{
return default;
return Task.CompletedTask;
}

return _flowControlBlock.WaitAsync(cancellationToken);
Expand Down