diff --git a/.ci/ubuntu/gha-log-check.sh b/.ci/ubuntu/gha-log-check.sh index 8301df1f3c..9755e17199 100755 --- a/.ci/ubuntu/gha-log-check.sh +++ b/.ci/ubuntu/gha-log-check.sh @@ -11,6 +11,6 @@ declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" if docker logs "$rabbitmq_docker_name" | grep -iF inet_error then - echo '[ERROR] found inet_error in RabbitMQ logs' 1>&2 - exit 1 + echo '[WARNING] found inet_error in RabbitMQ logs' 1>&2 + exit 0 fi diff --git a/.ci/windows/gha-log-check.ps1 b/.ci/windows/gha-log-check.ps1 index eb9a8ce529..d618f43b11 100644 --- a/.ci/windows/gha-log-check.ps1 +++ b/.ci/windows/gha-log-check.ps1 @@ -8,6 +8,6 @@ Write-Host "[INFO] looking for errors in '$rabbitmq_log_dir'" If (Get-ChildItem $rabbitmq_log_dir\*.log | Select-String -Quiet -SimpleMatch -Pattern inet_error) { - Write-Error "[ERROR] found inet_error in '$rabbitmq_log_dir'" - exit 1 + Write-Error "[WARNING] found inet_error in '$rabbitmq_log_dir'" + exit 0 } diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs new file mode 100644 index 0000000000..a0a084d02c --- /dev/null +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -0,0 +1,224 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Util; + +namespace RabbitMQ.Client.Impl +{ + internal partial class Channel : IChannel, IRecoverable + { + public async ValueTask BasicPublishAsync(string exchange, string routingKey, + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + PublisherConfirmationInfo? publisherConfirmationInfo = null; + try + { + publisherConfirmationInfo = + await MaybeStartPublisherConfirmationTracking(cancellationToken) + .ConfigureAwait(false); + + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + + var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) + : default; + + ulong publishSequenceNumber = 0; + if (publisherConfirmationInfo is not null) + { + publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber; + } + + BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); + if (props is null) + { + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + .ConfigureAwait(false); + } + else + { + await ModelSendAsync(in cmd, in props, body, cancellationToken) + .ConfigureAwait(false); + } + } + catch (Exception ex) + { + bool exceptionWasHandled = + MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex); + if (!exceptionWasHandled) + { + throw; + } + } + finally + { + await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken) + .ConfigureAwait(false); + } + } + + public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + PublisherConfirmationInfo? publisherConfirmationInfo = null; + try + { + publisherConfirmationInfo = + await MaybeStartPublisherConfirmationTracking(cancellationToken) + .ConfigureAwait(false); + + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + + var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); + + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) + : default; + + ulong publishSequenceNumber = 0; + if (publisherConfirmationInfo is not null) + { + publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber; + } + + BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); + if (props is null) + { + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + .ConfigureAwait(false); + } + else + { + await ModelSendAsync(in cmd, in props, body, cancellationToken) + .ConfigureAwait(false); + } + } + catch (Exception ex) + { + bool exceptionWasHandled = + MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex); + if (!exceptionWasHandled) + { + throw; + } + } + finally + { + await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken) + .ConfigureAwait(false); + } + } + + private BasicProperties? PopulateBasicPropertiesHeaders(TProperties basicProperties, + Activity? sendActivity, ulong publishSequenceNumber) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + /* + * Note: there is nothing to do in this method if *both* of these + * conditions are true: + * + * sendActivity is null - there is no activity to add as a header + * publisher confirmations are NOT enabled + */ + if (sendActivity is null && !_publisherConfirmationsEnabled) + { + return null; + } + + bool newHeaders = false; + IDictionary? headers = basicProperties.Headers; + if (headers is null) + { + headers = new Dictionary(); + newHeaders = true; + } + MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity); + MaybeAddPublishSequenceNumberToHeaders(headers); + + switch (basicProperties) + { + case BasicProperties writableProperties: + if (newHeaders) + { + writableProperties.Headers = headers; + } + return null; + case EmptyBasicProperty: + return new BasicProperties { Headers = headers }; + default: + return new BasicProperties(basicProperties) { Headers = headers }; + } + + void MaybeAddActivityToHeaders(IDictionary headers, + string? correlationId, Activity? sendActivity) + { + if (sendActivity is not null) + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(correlationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId); + } + } + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + RabbitMQActivitySource.ContextInjector(sendActivity, headers); + } + } + + void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers) + { + if (_publisherConfirmationsEnabled) + { + byte[] publishSequenceNumberBytes = new byte[8]; + NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); + headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; + } + } + } + } +} diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs new file mode 100644 index 0000000000..f5617611ee --- /dev/null +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -0,0 +1,333 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Util; + +namespace RabbitMQ.Client.Impl +{ + internal partial class Channel : IChannel, IRecoverable + { + private bool _publisherConfirmationsEnabled = false; + private bool _publisherConfirmationTrackingEnabled = false; + private ulong _nextPublishSeqNo = 0; + private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); + private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); + + private class PublisherConfirmationInfo + { + private ulong _publishSequenceNumber; + private TaskCompletionSource? _publisherConfirmationTcs; + + internal PublisherConfirmationInfo() + { + _publishSequenceNumber = 0; + _publisherConfirmationTcs = null; + } + + internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource? publisherConfirmationTcs) + { + _publishSequenceNumber = publishSequenceNumber; + _publisherConfirmationTcs = publisherConfirmationTcs; + } + + internal ulong PublishSequenceNumber => _publishSequenceNumber; + + internal TaskCompletionSource? PublisherConfirmationTcs => _publisherConfirmationTcs; + + internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken) + { + if (_publisherConfirmationTcs is not null) + { + await _publisherConfirmationTcs.Task.WaitAsync(cancellationToken) + .ConfigureAwait(false); + } + } + + internal bool MaybeHandleException(Exception ex) + { + bool exceptionWasHandled = false; + + if (_publisherConfirmationTcs is not null) + { + _publisherConfirmationTcs.SetException(ex); + exceptionWasHandled = true; + } + + return exceptionWasHandled; + } + } + + public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) + { + if (_publisherConfirmationsEnabled) + { + await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return _nextPublishSeqNo; + } + finally + { + _confirmSemaphore.Release(); + } + } + else + { + return _nextPublishSeqNo; + } + } + + private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) + { + _publisherConfirmationsEnabled = publisherConfirmationsEnabled; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + } + + private async Task MaybeConfirmSelect(CancellationToken cancellationToken) + { + if (_publisherConfirmationsEnabled) + { + // NOTE: _rpcSemaphore is held + bool enqueued = false; + var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + try + { + if (_nextPublishSeqNo == 0UL) + { + if (_publisherConfirmationTrackingEnabled) + { + _confirmsTaskCompletionSources.Clear(); + } + _nextPublishSeqNo = 1; + } + + enqueued = Enqueue(k); + + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + + return; + } + finally + { + if (false == enqueued) + { + k.Dispose(); + } + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool ShouldHandleAckOrNack(ulong deliveryTag) + { + return _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && + deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void HandleAck(ulong deliveryTag, bool multiple) + { + if (ShouldHandleAckOrNack(deliveryTag)) + { + if (multiple) + { + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + { + if (pair.Key <= deliveryTag) + { + pair.Value.SetResult(true); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); + } + } + } + else + { + if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetResult(true); + } + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) + { + if (ShouldHandleAckOrNack(deliveryTag)) + { + if (multiple) + { + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + { + if (pair.Key <= deliveryTag) + { + pair.Value.SetException(new PublishException(pair.Key, isReturn)); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); + } + } + } + else + { + if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetException(new PublishException(deliveryTag, isReturn)); + } + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void HandleReturn(BasicReturnEventArgs basicReturnEvent) + { + if (_publisherConfirmationsEnabled) + { + ulong publishSequenceNumber = 0; + IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties; + object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader]; + if (maybeSeqNum != null && + maybeSeqNum is byte[] seqNumBytes) + { + publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes); + } + + HandleNack(publishSequenceNumber, multiple: false, isReturn: true); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason) + { + if (_publisherConfirmationsEnabled) + { + await _confirmSemaphore.WaitAsync(reason.CancellationToken) + .ConfigureAwait(false); + try + { + if (!_confirmsTaskCompletionSources.IsEmpty) + { + var exception = new AlreadyClosedException(reason); + foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) + { + confirmsTaskCompletionSource.TrySetException(exception); + } + + _confirmsTaskCompletionSources.Clear(); + } + } + finally + { + _confirmSemaphore.Release(); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async Task MaybeStartPublisherConfirmationTracking(CancellationToken cancellationToken) + { + if (_publisherConfirmationsEnabled) + { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + + ulong publishSequenceNumber = _nextPublishSeqNo; + + TaskCompletionSource? publisherConfirmationTcs = null; + if (_publisherConfirmationTrackingEnabled) + { + publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; + } + + _nextPublishSeqNo++; + + return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs); + } + else + { + return null; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConfirmationInfo? publisherConfirmationInfo, + Exception ex) + { + bool exceptionWasHandled = false; + + if (_publisherConfirmationsEnabled && + publisherConfirmationInfo is not null) + { + _nextPublishSeqNo--; + + if (_publisherConfirmationTrackingEnabled) + { + _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); + } + + exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex); + } + + return exceptionWasHandled; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo? publisherConfirmationInfo, + CancellationToken cancellationToken) + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + + if (publisherConfirmationInfo is not null) + { + await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) + .ConfigureAwait(false); + } + } + } + } +} diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index cce4129845..6ca0c9a93a 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -30,8 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Buffers.Binary; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -44,11 +42,10 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; -using RabbitMQ.Client.Util; namespace RabbitMQ.Client.Impl { - internal class Channel : IChannel, IRecoverable + internal partial class Channel : IChannel, IRecoverable { ///Only used to kick-start a connection open ///sequence. See @@ -60,12 +57,6 @@ internal class Channel : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); - private bool _publisherConfirmationsEnabled = false; - private bool _publisherConfirmationTrackingEnabled = false; - private ulong _nextPublishSeqNo = 0; - private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); - private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); - private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); @@ -374,8 +365,7 @@ internal async Task OpenAsync(bool publisherConfirmationsEnabled = fal bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default) { - _publisherConfirmationsEnabled = publisherConfirmationsEnabled; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -393,11 +383,8 @@ await ModelSendAsync(in method, k.CancellationToken) bool result = await k; Debug.Assert(result); - if (_publisherConfirmationsEnabled) - { - await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken) - .ConfigureAwait(false); - } + await MaybeConfirmSelect(cancellationToken) + .ConfigureAwait(false); } finally { @@ -494,31 +481,12 @@ internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) private async Task OnChannelShutdownAsync(ShutdownEventArgs reason) { _continuationQueue.HandleChannelShutdown(reason); + await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); - if (_publisherConfirmationsEnabled) - { - await _confirmSemaphore.WaitAsync(reason.CancellationToken) - .ConfigureAwait(false); - try - { - if (!_confirmsTaskCompletionSources.IsEmpty) - { - var exception = new AlreadyClosedException(reason); - foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) - { - confirmsTaskCompletionSource.TrySetException(exception); - } - - _confirmsTaskCompletionSources.Clear(); - } - } - finally - { - _confirmSemaphore.Release(); - } - } + await MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(reason) + .ConfigureAwait(false); _flowControlBlock.Set(); } @@ -613,8 +581,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken) - .ConfigureAwait(false); + HandleAck(ack._deliveryTag, ack._multiple); return true; } @@ -631,8 +598,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleNack(nack._deliveryTag, nack._multiple, false, cancellationToken) - .ConfigureAwait(false); + HandleNack(nack._deliveryTag, nack._multiple, false); return true; } @@ -651,19 +617,7 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e) .ConfigureAwait(false); } - if (_publisherConfirmationsEnabled) - { - ulong publishSequenceNumber = 0; - IReadOnlyBasicProperties props = e.BasicProperties; - object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader]; - if (maybeSeqNum != null) - { - publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); - } - - await HandleNack(publishSequenceNumber, multiple: false, isReturn: true, cancellationToken) - .ConfigureAwait(false); - } + HandleReturn(e); return true; } @@ -852,26 +806,6 @@ await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) return true; } - public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) - { - if (_publisherConfirmationsEnabled) - { - await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - return _nextPublishSeqNo; - } - finally - { - _confirmSemaphore.Release(); - } - } - else - { - return _nextPublishSeqNo; - } - } - public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken) { @@ -1009,168 +943,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async ValueTask BasicPublishAsync(string exchange, string routingKey, - bool mandatory, TProperties basicProperties, ReadOnlyMemory body, - CancellationToken cancellationToken = default) - where TProperties : IReadOnlyBasicProperties, IAmqpHeader - { - TaskCompletionSource? publisherConfirmationTcs = null; - ulong publishSequenceNumber = 0; - try - { - if (_publisherConfirmationsEnabled) - { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - - publishSequenceNumber = _nextPublishSeqNo; - - if (_publisherConfirmationTrackingEnabled) - { - publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; - } - - _nextPublishSeqNo++; - } - - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - - var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) - : default; - - BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); - if (props is null) - { - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) - .ConfigureAwait(false); - } - else - { - await ModelSendAsync(in cmd, in props, body, cancellationToken) - .ConfigureAwait(false); - } - } - catch (Exception ex) - { - if (_publisherConfirmationsEnabled) - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); - } - } - - if (publisherConfirmationTcs is not null) - { - publisherConfirmationTcs.SetException(ex); - } - else - { - throw; - } - } - finally - { - if (_publisherConfirmationsEnabled) - { - _confirmSemaphore.Release(); - } - } - - if (publisherConfirmationTcs is not null) - { - await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) - .ConfigureAwait(false); - } - } - - public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, - bool mandatory, TProperties basicProperties, ReadOnlyMemory body, - CancellationToken cancellationToken = default) - where TProperties : IReadOnlyBasicProperties, IAmqpHeader - { - TaskCompletionSource? publisherConfirmationTcs = null; - ulong publishSequenceNumber = 0; - try - { - if (_publisherConfirmationsEnabled) - { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - - publishSequenceNumber = _nextPublishSeqNo; - - if (_publisherConfirmationTrackingEnabled) - { - publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; - } - - _nextPublishSeqNo++; - } - - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - - var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) - : default; - - BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); - if (props is null) - { - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) - .ConfigureAwait(false); - } - else - { - await ModelSendAsync(in cmd, in props, body, cancellationToken) - .ConfigureAwait(false); - } - } - catch (Exception ex) - { - if (_publisherConfirmationsEnabled) - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); - } - } - - if (publisherConfirmationTcs is not null) - { - publisherConfirmationTcs.SetException(ex); - } - else - { - throw; - } - } - finally - { - if (_publisherConfirmationsEnabled) - { - _confirmSemaphore.Release(); - } - } - - if (publisherConfirmationTcs is not null) - { - await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) - .ConfigureAwait(false); - } - } - public async Task UpdateSecretAsync(string newSecret, string reason, CancellationToken cancellationToken) { @@ -1727,172 +1499,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - // NOTE: _rpcSemaphore is held - private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false, - CancellationToken cancellationToken = default) - { - _publisherConfirmationsEnabled = true; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd; - - bool enqueued = false; - var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - try - { - if (_nextPublishSeqNo == 0UL) - { - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.Clear(); - } - _nextPublishSeqNo = 1; - } - - enqueued = Enqueue(k); - - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); - - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - } - } - - private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) - { - if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) - { - if (multiple) - { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) - { - if (pair.Key <= deliveryTag) - { - pair.Value.SetResult(true); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); - } - } - } - else - { - if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) - { - tcs.SetResult(true); - } - } - } - - return Task.CompletedTask; - } - - private Task HandleNack(ulong deliveryTag, bool multiple, bool isReturn, - CancellationToken cancellationToken = default) - { - if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) - { - if (multiple) - { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) - { - if (pair.Key <= deliveryTag) - { - pair.Value.SetException(new PublishException(pair.Key, isReturn)); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); - } - } - } - else - { - if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) - { - tcs.SetException(new PublishException(deliveryTag, isReturn)); - } - } - } - - return Task.CompletedTask; - } - - private BasicProperties? PopulateBasicPropertiesHeaders(TProperties basicProperties, - Activity? sendActivity, ulong publishSequenceNumber) - where TProperties : IReadOnlyBasicProperties, IAmqpHeader - { - /* - * Note: there is nothing to do in this method if *both* of these - * conditions are true: - * - * sendActivity is null - there is no activity to add as a header - * publisher confirmations are NOT enabled - */ - if (sendActivity is null && !_publisherConfirmationsEnabled) - { - return null; - } - - bool newHeaders = false; - IDictionary? headers = basicProperties.Headers; - if (headers is null) - { - headers = new Dictionary(); - newHeaders = true; - } - MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity); - MaybeAddPublishSequenceNumberToHeaders(headers); - - switch (basicProperties) - { - case BasicProperties writableProperties: - if (newHeaders) - { - writableProperties.Headers = headers; - } - return null; - case EmptyBasicProperty: - return new BasicProperties { Headers = headers }; - default: - return new BasicProperties(basicProperties) { Headers = headers }; - } - - void MaybeAddActivityToHeaders(IDictionary headers, - string? correlationId, Activity? sendActivity) - { - if (sendActivity is not null) - { - // This activity is marked as recorded, so let's propagate the trace and span ids. - if (sendActivity.IsAllDataRequested) - { - if (!string.IsNullOrEmpty(correlationId)) - { - sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId); - } - } - - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - RabbitMQActivitySource.ContextInjector(sendActivity, headers); - } - } - - void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers) - { - if (_publisherConfirmationsEnabled) - { - byte[] publishSequenceNumberBytes = new byte[8]; - NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); - headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; - } - } - } - /// /// Returning true from this method means that the command was server-originated, /// and handled already.