diff --git a/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusClient.cs b/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusClient.cs index f6dfd10..bc95a57 100644 --- a/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusClient.cs +++ b/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusClient.cs @@ -190,18 +190,25 @@ public override async Task AcceptSessionAsync(string } - #endregion - #region Unsupported + #region CreateProcessors + + public override InMemoryServiceBusProcessor CreateProcessor(string queueName) + => new(this, queueName); - public override ServiceBusProcessor CreateProcessor(string queueName) => throw ServiceBusExceptionFactory.MethodNotSupported(); + public override InMemoryServiceBusProcessor CreateProcessor(string queueName, ServiceBusProcessorOptions options) + => new(this, queueName, options); - public override ServiceBusProcessor CreateProcessor(string queueName, ServiceBusProcessorOptions options) => throw ServiceBusExceptionFactory.MethodNotSupported(); + public override InMemoryServiceBusProcessor CreateProcessor(string topicName, string subscriptionName) + => new(this, topicName, subscriptionName); - public override ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName) => throw ServiceBusExceptionFactory.MethodNotSupported(); + public override InMemoryServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options) + => new(this, topicName, subscriptionName, options); - public override ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options) => throw ServiceBusExceptionFactory.MethodNotSupported(); + #endregion + + #region Unsupported public override ServiceBusSessionProcessor CreateSessionProcessor(string queueName, ServiceBusSessionProcessorOptions? options = null) => throw ServiceBusExceptionFactory.MethodNotSupported(); diff --git a/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusProcessor.cs b/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusProcessor.cs new file mode 100644 index 0000000..5a4a77b --- /dev/null +++ b/src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusProcessor.cs @@ -0,0 +1,316 @@ +using Azure.Messaging.ServiceBus; + +using Spotflow.InMemory.Azure.ServiceBus.Internals; +using Spotflow.InMemory.Azure.ServiceBus.Resources; + +namespace Spotflow.InMemory.Azure.ServiceBus; + +public class InMemoryServiceBusProcessor : ServiceBusProcessor +{ + private readonly TimeSpan _defaultMaxWaitTime; + private readonly SemaphoreSlim _concurrencySemaphore; + private readonly SemaphoreSlim _stateSemaphore = new(1, 1); + private readonly InMemoryServiceBusReceiver _receiver; + private readonly bool _autoCompleteMessages; + private readonly string _entityPath; + + private volatile bool _isClosed; + private volatile bool _isProcessing; + private CancellationTokenSource? _processingCts; + private Task? _processingTask; + + private readonly string _identifier; + private readonly string _fullyQualifiedNamespace; + private readonly ServiceBusReceiveMode _receiveMode; + private readonly int _prefetchCount; + private readonly int _maxConcurrentCalls; + private readonly TimeSpan _maxAutoLockRenewalDuration; + + #region Constructors + public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string queueName) + : this(client, queueName, new ServiceBusProcessorOptions()) { } + + public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string queueName, ServiceBusProcessorOptions options) + : this(client, queueName, options, + (receiverOptions, c) + => new InMemoryServiceBusReceiver(c, queueName, receiverOptions)) + { } + + public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string topicName, string subscriptionName) + : this(client, topicName, subscriptionName, new ServiceBusProcessorOptions()) { } + + public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string topicName, string subscriptionName, ServiceBusProcessorOptions options) + : this(client, FormatEntityPath(topicName, subscriptionName), options, + (receiverOptions, c) + => new InMemoryServiceBusReceiver(c, topicName, subscriptionName, receiverOptions)) + { } + + private InMemoryServiceBusProcessor( + InMemoryServiceBusClient client, + string entityPath, + ServiceBusProcessorOptions options, + Func receiverFactory) + { + _fullyQualifiedNamespace = client.FullyQualifiedNamespace; + _identifier = string.IsNullOrEmpty(options.Identifier) ? ServiceBusClientUtils.GenerateIdentifier(entityPath) : options.Identifier; + _entityPath = entityPath; + _defaultMaxWaitTime = client.DefaultMaxWaitTime; + _autoCompleteMessages = options.AutoCompleteMessages; + _receiveMode = options.ReceiveMode; + _prefetchCount = options.PrefetchCount; + _maxConcurrentCalls = options.MaxConcurrentCalls; + _maxAutoLockRenewalDuration = options.MaxAutoLockRenewalDuration; + var receiverOptions = CreateReceiverOptions(options, _identifier); + _receiver = receiverFactory(receiverOptions, client); + Provider = client.Provider; + _concurrencySemaphore = new SemaphoreSlim(_maxConcurrentCalls, _maxConcurrentCalls); + } + + private static ServiceBusReceiverOptions CreateReceiverOptions(ServiceBusProcessorOptions options, string identifier) + => new() + { + ReceiveMode = options.ReceiveMode, + PrefetchCount = options.PrefetchCount, + Identifier = $"{identifier}-receiver" + }; + + + private static string FormatEntityPath(string topicName, string subscriptionName) + => InMemoryServiceBusSubscription.FormatEntityPath(topicName, subscriptionName); + + public static InMemoryServiceBusProcessor FromQueue(InMemoryServiceBusQueue queue, ServiceBusProcessorOptions? options = null) + { + var client = InMemoryServiceBusClient.FromNamespace(queue.Namespace); + return new InMemoryServiceBusProcessor(client, queue.QueueName, options ?? new ServiceBusProcessorOptions()); + } + + public static InMemoryServiceBusProcessor FromSubscription(InMemoryServiceBusSubscription subscription, ServiceBusProcessorOptions? options = null) + { + var client = InMemoryServiceBusClient.FromNamespace(subscription.Topic.Namespace); + return new InMemoryServiceBusProcessor(client, subscription.TopicName, subscription.SubscriptionName, options ?? new ServiceBusProcessorOptions()); + } + #endregion + + #region Properties + public override bool AutoCompleteMessages => _autoCompleteMessages; + public override string FullyQualifiedNamespace => _fullyQualifiedNamespace; + public override string EntityPath => _entityPath; + public override string Identifier => _identifier; + public override ServiceBusReceiveMode ReceiveMode => _receiveMode; + public override int PrefetchCount => _prefetchCount; + public override int MaxConcurrentCalls => _maxConcurrentCalls; + public override TimeSpan MaxAutoLockRenewalDuration => _maxAutoLockRenewalDuration; + public InMemoryServiceBusProvider Provider { get; } + public override bool IsClosed => _isClosed; + public override bool IsProcessing => _isProcessing; + #endregion + + #region Close + public override async Task CloseAsync(CancellationToken cancellationToken = default) + { + await Task.Yield(); + await _stateSemaphore.WaitAsync(cancellationToken); + try + { + _isClosed = true; + if (_isProcessing) + { + await StopProcessingUnsafeAsync(); + } + } + finally + { + _stateSemaphore.Release(); + } + _concurrencySemaphore.Dispose(); + _stateSemaphore.Dispose(); + _processingCts?.Dispose(); + await _receiver.DisposeAsync(); + } + #endregion + + #region Start/Stop Processing + public override async Task StartProcessingAsync(CancellationToken cancellationToken = default) + { + await Task.Yield(); + + await _stateSemaphore.WaitAsync(cancellationToken); + try + { + ObjectDisposedException.ThrowIf(_isClosed, nameof(InMemoryServiceBusProcessor)); + + if (_isProcessing) + { + throw new InvalidOperationException("The processor is already processing messages."); + } + + _isProcessing = true; + _processingCts = new CancellationTokenSource(); + _processingTask = Task.Run(() => ProcessMessagesInBackground(_processingCts.Token), cancellationToken); + } + finally + { + _stateSemaphore.Release(); + } + } + + public override async Task StopProcessingAsync(CancellationToken cancellationToken = default) + { + await Task.Yield(); + + await _stateSemaphore.WaitAsync(cancellationToken); + try + { + await StopProcessingUnsafeAsync(); + } + finally + { + _stateSemaphore.Release(); + } + } + + /// + /// StopProcessingUnsafeAsync is used to avoid deadlock between and + /// + private async Task StopProcessingUnsafeAsync() + { + if (!_isProcessing) + { + return; + } + + _processingCts?.Cancel(); + + if (_processingTask != null) + { + try + { + await _processingTask; + } + finally + { + _processingTask.Dispose(); + _processingTask = null; + } + } + + _isProcessing = false; + } + + private async Task ProcessMessagesInBackground(CancellationToken cancellationToken) + { + var activeTasks = new List(); + + try + { + while (!cancellationToken.IsCancellationRequested && !_isClosed) + { + try + { + var messages = await _receiver.ReceiveMessagesAsync(MaxConcurrentCalls, _defaultMaxWaitTime, cancellationToken); + + if (messages.Count == 0) + { + continue; + } + + foreach (var message in messages) + { + await _concurrencySemaphore.WaitAsync(cancellationToken); + Task messageTask; + try + { + messageTask = Task.Run(() => ProcessSingleMessageAsync(message, cancellationToken), cancellationToken); + } + catch + { + _concurrencySemaphore.Release(); + throw; + } + + activeTasks.Add(messageTask); + + if (activeTasks.Count > MaxConcurrentCalls) + { + activeTasks.RemoveAll(t => t.IsCompleted); + } + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + await HandleErrorAsync(ex, cancellationToken); + } + } + } + finally + { + if (activeTasks.Count > 0) + { + await Task.WhenAll(activeTasks); + } + } + } + + private async Task ProcessSingleMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken) + { + try + { + try + { + var processMessageEventArgs = new ProcessMessageEventArgs( + message, + _receiver, + Identifier, + cancellationToken); + + await OnProcessMessageAsync(processMessageEventArgs); + if (AutoCompleteMessages) + { + await _receiver.CompleteMessageAsync(message, cancellationToken); + } + + } + catch (Exception ex) + { + await _receiver.AbandonMessageAsync(message, cancellationToken: cancellationToken); + await HandleErrorAsync(ex, cancellationToken); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Suppress OperationCanceledException to prevent it from interrupting processor shutdown + return; + } + finally + { + _concurrencySemaphore.Release(); + } + } + + private async Task HandleErrorAsync(Exception exception, CancellationToken cancellationToken) + { + try + { + var errorArgs = new ProcessErrorEventArgs( + exception, + ServiceBusErrorSource.Receive, + FullyQualifiedNamespace, + EntityPath, + Identifier, + cancellationToken); + + await OnProcessErrorAsync(errorArgs); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Suppress OperationCanceledException to prevent it from interrupting processor shutdown + return; + } + } + #endregion + +} diff --git a/src/Spotflow.InMemory.Azure.ServiceBus/Internals/ServiceBusClientUtils.cs b/src/Spotflow.InMemory.Azure.ServiceBus/Internals/ServiceBusClientUtils.cs index 5b84fd7..31adf34 100644 --- a/src/Spotflow.InMemory.Azure.ServiceBus/Internals/ServiceBusClientUtils.cs +++ b/src/Spotflow.InMemory.Azure.ServiceBus/Internals/ServiceBusClientUtils.cs @@ -134,4 +134,5 @@ public static string GetFullyQualifiedNamespace(string connectionString) return ServiceBusConnectionStringUtils.GetFullyQualifiedNamespace(connectionString); } + public static string GenerateIdentifier(string entityPath) => $"{entityPath}-{Guid.NewGuid()}"; } diff --git a/tests/Tests/ServiceBus/ServiceBusProcessorTests.cs b/tests/Tests/ServiceBus/ServiceBusProcessorTests.cs new file mode 100644 index 0000000..d8c10b9 --- /dev/null +++ b/tests/Tests/ServiceBus/ServiceBusProcessorTests.cs @@ -0,0 +1,571 @@ +using System.Collections.Concurrent; + +using Azure.Messaging.ServiceBus; + +using Spotflow.InMemory.Azure.ServiceBus; + +namespace Tests.ServiceBus; + +[TestClass] +public class ServiceBusProcessorTests +{ + #region Initialization Tests + + [TestMethod] + public async Task Constructor_WithQueue_SetsPropertiesCorrectly() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + + const string queueName = "test-queue"; + var options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 5 }; + + await using var processor = new InMemoryServiceBusProcessor(client, queueName, options); + + processor.EntityPath.Should().Be(queueName); + processor.MaxConcurrentCalls.Should().Be(5); + processor.IsProcessing.Should().BeFalse(); + processor.IsClosed.Should().BeFalse(); + } + + [TestMethod] + public async Task Constructor_WithSubscription_SetsPropertiesCorrectly() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + + const string topicName = "test-topic"; + const string subscriptionName = "test-subscription"; + var options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 3 }; + + await using var processor = new InMemoryServiceBusProcessor(client, topicName, subscriptionName, options); + + processor.EntityPath.Should().Be($"{topicName}/subscriptions/{subscriptionName}"); + processor.MaxConcurrentCalls.Should().Be(3); + processor.IsProcessing.Should().BeFalse(); + processor.IsClosed.Should().BeFalse(); + } + + #endregion + + #region Property Tests + [TestMethod] + public async Task ProcessorOptions_AreSetCorrectly() + { + + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + + var options = new ServiceBusProcessorOptions + { + MaxConcurrentCalls = 5, + AutoCompleteMessages = false, + PrefetchCount = 10, + ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete + }; + + await using var processor = client.CreateProcessor("test-queue", options); + + processor.MaxConcurrentCalls.Should().Be(5); + processor.AutoCompleteMessages.Should().BeFalse(); + processor.PrefetchCount.Should().Be(10); + processor.ReceiveMode.Should().Be(ServiceBusReceiveMode.ReceiveAndDelete); + processor.EntityPath.Should().Be("test-queue"); + processor.FullyQualifiedNamespace.Should().Be(client.FullyQualifiedNamespace); + } + + #endregion + + #region Lifecycle Tests + + [TestMethod] + public async Task StartProcessingAsync_SetsIsProcessingToTrue() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + await processor.StartProcessingAsync(); + + processor.IsProcessing.Should().BeTrue(); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task StartProcessingAsync_WhenAlreadyProcessing_ThrowsInvalidOperationException() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + await processor.StartProcessingAsync(); + + await Assert.ThrowsExceptionAsync( + () => processor.StartProcessingAsync()); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task StopProcessingAsync_SetsIsProcessingToFalse() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + await processor.StartProcessingAsync(); + + await processor.StopProcessingAsync(); + + processor.IsProcessing.Should().BeFalse(); + } + + [TestMethod] + public async Task StopProcessingAsync_WhenNotProcessing_DoesNotThrow() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task CloseAsync_SetsIsClosedToTrue() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + await processor.CloseAsync(); + processor.IsClosed.Should().BeTrue(); + + } + + [TestMethod] + public async Task DisposeAsync_SetsIsClosedToTrue() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + await processor.DisposeAsync(); + processor.IsClosed.Should().BeTrue(); + + } + + [TestMethod] + public async Task CloseAsync_WhenProcessing_StopProcessingFirst() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + await processor.StartProcessingAsync(); + + await processor.CloseAsync(); + + processor.IsProcessing.Should().BeFalse(); + processor.IsClosed.Should().BeTrue(); + + } + + [TestMethod] + public async Task StartProcessingAsync_AfterClose_ThrowsObjectDisposedException() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await processor.CloseAsync(); + await Assert.ThrowsExceptionAsync( + () => processor.StartProcessingAsync()); + } + #endregion + + #region MessageProcessing Tests + + [TestMethod] + public async Task ProcessMessage_ReceivesAndProcessMessage() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + + var messageReceived = new TaskCompletionSource(); + processor.ProcessMessageAsync += args => + { + messageReceived.TrySetResult(args.Message); + return Task.CompletedTask; + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await using var sender = client.CreateSender("test-queue"); + await sender.SendMessageAsync(new ServiceBusMessage("Test Message")); + + await processor.StartProcessingAsync(); + var receivedMessage = await messageReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + receivedMessage.Body.ToString().Should().Be("Test Message"); + + await processor.StopProcessingAsync(); + + } + + [TestMethod] + public async Task ProcessMessage_WithAutoComplete_CompletesMessageAutomatically() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue", new ServiceBusProcessorOptions() + { + AutoCompleteMessages = true + }); + var messageProcessed = new TaskCompletionSource(); + processor.ProcessMessageAsync += _ => + { + messageProcessed.TrySetResult(true); + return Task.CompletedTask; + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + var sender = client.CreateSender("test-queue"); + await sender.SendMessageAsync(new ServiceBusMessage("Test Message")); + + await processor.StartProcessingAsync(); + await messageProcessed.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var isCompleted = await WaitForMessageCompletion(client, "test-queue", TimeSpan.FromSeconds(2)); + isCompleted.Should().BeTrue("message should be auto-completed and queue should be empty"); + + await processor.StopProcessingAsync(); + } + + + [TestMethod] + public async Task ProcessMessage_WithManualComplete_DoesNotAutoComplete() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue", new ServiceBusProcessorOptions + { + AutoCompleteMessages = false + }); + + var messageProcessed = new TaskCompletionSource(); + processor.ProcessMessageAsync += async args => + { + await args.CompleteMessageAsync(args.Message); + messageProcessed.TrySetResult(true); + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await using var sender = client.CreateSender("test-queue"); + await sender.SendMessageAsync(new ServiceBusMessage("Test Message")); + + await processor.StartProcessingAsync(); + await messageProcessed.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + var isCompleted = await WaitForMessageCompletion(client, "test-queue", TimeSpan.FromSeconds(2)); + isCompleted.Should().BeTrue("message should be manually completed and queue should be empty"); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task ProcessMessage_WithException_CallsErrorHandler() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + + var errorHandled = new TaskCompletionSource(); + processor.ProcessMessageAsync += _ => throw new InvalidOperationException("Test exception"); + processor.ProcessErrorAsync += args => + { + errorHandled.TrySetResult(args); + return Task.CompletedTask; + }; + + await using var sender = client.CreateSender("test-queue"); + await sender.SendMessageAsync(new ServiceBusMessage("Test Message")); + + await processor.StartProcessingAsync(); + var errorArgs = await errorHandled.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + errorArgs.Exception.Should().BeOfType(); + errorArgs.Exception.Message.Should().Be("Test exception"); + errorArgs.ErrorSource.Should().Be(ServiceBusErrorSource.Receive); + errorArgs.EntityPath.Should().Be("test-queue"); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task ProcessMessage_WithConcurrency_ProcessesMultipleMessagesSimultaneously() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue", new ServiceBusProcessorOptions + { + MaxConcurrentCalls = 3 + }); + + var processedMessages = new ConcurrentBag(); + var processingStarted = new TaskCompletionSource(); + var continueProcessing = new TaskCompletionSource(); + var messageCount = 0; + + processor.ProcessMessageAsync += async args => + { + processedMessages.Add(args.Message.Body.ToString()); + + if (Interlocked.Increment(ref messageCount) == 1) + { + processingStarted.TrySetResult(true); + } + await continueProcessing.Task; + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await using var sender = client.CreateSender("test-queue"); + for (var i = 0; i < 3; i++) + { + await sender.SendMessageAsync(new ServiceBusMessage($"Message {i}")); + } + + await processor.StartProcessingAsync(); + await processingStarted.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + continueProcessing.TrySetResult(true); + + await Task.Delay(1000); + + processedMessages.Should().HaveCount(3); + processedMessages.Should().Contain("Message 0"); + processedMessages.Should().Contain("Message 1"); + processedMessages.Should().Contain("Message 2"); + + await processor.StopProcessingAsync(); + } + + [TestMethod] + public async Task ProcessMessage_FromSubscription_ReceivesTopicMessage() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + var subscription = ns.AddTopic("test-topic").AddSubscription("test-subscription"); + + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor(subscription.TopicName, subscription.SubscriptionName); + + var messageReceived = new TaskCompletionSource(); + processor.ProcessMessageAsync += args => + { + messageReceived.TrySetResult(args.Message); + return Task.CompletedTask; + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await using var sender = client.CreateSender("test-topic"); + await sender.SendMessageAsync(new ServiceBusMessage("Topic message")); + + await processor.StartProcessingAsync(); + var receivedMessage = await messageReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + receivedMessage.Body.ToString().Should().Be("Topic message"); + + await processor.StopProcessingAsync(); + } + + #endregion + + #region Handlers + [TestMethod] + public async Task ProcessMessageAsync_AddSecondHandler_ThrowsNotSupportedException() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + processor.ProcessMessageAsync += _ => Task.CompletedTask; + + Assert.ThrowsException(() => + { + processor.ProcessMessageAsync += _ => Task.CompletedTask; + }); + await processor.CloseAsync(); + } + + [TestMethod] + public async Task RemoveHandler_ThenAddNew_ShouldWork() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + + Func handler1 = _ => Task.CompletedTask; + Func handler2 = _ => Task.CompletedTask; + + processor.ProcessMessageAsync += handler1; + processor.ProcessMessageAsync -= handler1; + processor.ProcessMessageAsync += handler2; + } + #endregion + + #region HelperMethods + private static async Task WaitForMessageCompletion(InMemoryServiceBusClient client, string queueName, TimeSpan timeout) + { + var deadline = DateTime.UtcNow.Add(timeout); + + while (DateTime.UtcNow < deadline) + { + await using var receiver = client.CreateReceiver(queueName); + var messages = await receiver.ReceiveMessagesAsync(1, TimeSpan.FromMilliseconds(100)); + + if (messages.Count == 0) + { + return true; + } + await Task.Delay(50); + } + return false; + } + #endregion + + [TestMethod] + public async Task ConcurrentStartStop_DoesNotCauseDeadlock() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + try + { + var random = new Random(); + + for (var i = 0; i < 5; i++) + { + await processor.StartProcessingAsync(); + await Task.Delay(random.Next(10, 50)); + await processor.StopProcessingAsync(); + await Task.Delay(random.Next(10, 50)); + } + + await processor.StartProcessingAsync(); + + var concurrentStartTasks = new List(); + for (var i = 0; i < 3; i++) + { + concurrentStartTasks.Add(Task.Run(async () => + { + try + { + await processor.StartProcessingAsync(); + } + catch (InvalidOperationException) + { + // Expected - already processing + } + })); + } + + await Task.WhenAll(concurrentStartTasks); + await processor.StopProcessingAsync(); + + processor.IsProcessing.Should().BeFalse(); + } + finally + { + await processor.CloseAsync(); + await client.DisposeAsync(); + } + } + + [TestMethod] + public async Task StopProcessing_WaitsForInFlightMessages() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + await using var processor = client.CreateProcessor("test-queue"); + + var messageStarted = new TaskCompletionSource(); + var messageCanComplete = new TaskCompletionSource(); + var messageCompleted = new TaskCompletionSource(); + + processor.ProcessMessageAsync += async _ => + { + messageStarted.TrySetResult(true); + await messageCanComplete.Task; // block here until it is ok to continue + messageCompleted.TrySetResult(true); + }; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + await using var sender = client.CreateSender("test-queue"); + await sender.SendMessageAsync(new ServiceBusMessage("Test")); + + await processor.StartProcessingAsync(); + await messageStarted.Task; + + // Start stopping while message is in flight + var stopTask = processor.StopProcessingAsync(); + + // Verify stop is waiting for message completion + await Task.Delay(100); + stopTask.IsCompleted.Should().BeFalse(); + + messageCanComplete.TrySetResult(true); + await messageCompleted.Task; + await stopTask; + processor.IsProcessing.Should().BeFalse(); + } + + [TestMethod] + public async Task ConcurrentCloseAndStart_HandlesGracefully() + { + var ns = new InMemoryServiceBusProvider().AddNamespace(); + ns.AddQueue("test-queue"); + await using var client = InMemoryServiceBusClient.FromNamespace(ns); + var processor = client.CreateProcessor("test-queue"); + + processor.ProcessMessageAsync += _ => Task.CompletedTask; + processor.ProcessErrorAsync += _ => Task.CompletedTask; + + var closeTask = Task.Run(() => processor.CloseAsync()); + var startTask = Task.Run(async () => + { + try + { + await processor.StartProcessingAsync(); + } + catch (ObjectDisposedException) + { + // Expected if close operation is successful + } + }); + + await Task.WhenAll(closeTask, startTask); + processor.IsClosed.Should().BeTrue(); + } +} +