-
Notifications
You must be signed in to change notification settings - Fork 2
Support ServiceBusProcessor #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tomas-pajurek
merged 16 commits into
spotflow-io:main
from
altsak:feature/create-processor
Aug 23, 2025
Merged
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
1650557
in memory processor implementation
altsak eb9e96f
remove base calls and fix tests
altsak c2a50f4
add regions removed catch and obsolete test
altsak 74876aa
Merge branch 'main' into feature/create-processor
altsak 77fa17a
fix wrong constructor call
altsak 4d6d8e7
use azure sdk auto-generated identifier
altsak 8d7f0c5
change receiver construction
altsak 27edde8
remove potentially blocking operations
altsak 9294560
add tests
altsak 0ac09a6
Merge branch 'main' into feature/create-processor
tomas-pajurek 0353d8a
format files
altsak 52760bc
suppress operationcancelled exception during shutdown
altsak 235fe64
move generateidentifier to utils and rename
altsak 27c1890
Merge branch 'main' into feature/create-processor
tomas-pajurek a38fac2
move operationcancelledexception to outer scope
altsak 1d6323a
use TrySetResult in servicebus tests
altsak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
314 changes: 314 additions & 0 deletions
314
src/Spotflow.InMemory.Azure.ServiceBus/InMemoryServiceBusProcessor.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
using Azure.Messaging.ServiceBus; | ||
|
||
using Spotflow.InMemory.Azure.ServiceBus.Internals; | ||
using Spotflow.InMemory.Azure.ServiceBus.Resources; | ||
|
||
namespace Spotflow.InMemory.Azure.ServiceBus; | ||
|
||
public class InMemoryServiceBusProcessor : ServiceBusProcessor | ||
{ | ||
private readonly TimeSpan _defaultMaxWaitTime; | ||
private readonly SemaphoreSlim _concurrencySemaphore; | ||
private readonly SemaphoreSlim _stateSemaphore = new(1, 1); | ||
private readonly InMemoryServiceBusReceiver _receiver; | ||
private readonly bool _autoCompleteMessages; | ||
private readonly string _entityPath; | ||
|
||
private volatile bool _isClosed; | ||
private volatile bool _isProcessing; | ||
private CancellationTokenSource? _processingCts; | ||
private Task? _processingTask; | ||
|
||
private readonly string _identifier; | ||
private readonly string _fullyQualifiedNamespace; | ||
private readonly ServiceBusReceiveMode _receiveMode; | ||
private readonly int _prefetchCount; | ||
private readonly int _maxConcurrentCalls; | ||
private readonly TimeSpan _maxAutoLockRenewalDuration; | ||
|
||
#region Constructors | ||
public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string queueName) | ||
: this(client, queueName, new ServiceBusProcessorOptions()) { } | ||
|
||
public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string queueName, ServiceBusProcessorOptions options) | ||
: this(client, queueName, options, | ||
(receiverOptions, c) | ||
=> new InMemoryServiceBusReceiver(c, queueName, receiverOptions)) | ||
{ } | ||
|
||
public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string topicName, string subscriptionName) | ||
: this(client, topicName, subscriptionName, new ServiceBusProcessorOptions()) { } | ||
|
||
public InMemoryServiceBusProcessor(InMemoryServiceBusClient client, string topicName, string subscriptionName, ServiceBusProcessorOptions options) | ||
: this(client, FormatEntityPath(topicName, subscriptionName), options, | ||
(receiverOptions, c) | ||
=> new InMemoryServiceBusReceiver(c, topicName, subscriptionName, receiverOptions)) | ||
{ } | ||
|
||
private InMemoryServiceBusProcessor( | ||
InMemoryServiceBusClient client, | ||
string entityPath, | ||
ServiceBusProcessorOptions options, | ||
Func<ServiceBusReceiverOptions, InMemoryServiceBusClient, InMemoryServiceBusReceiver> receiverFactory) | ||
{ | ||
_fullyQualifiedNamespace = client.FullyQualifiedNamespace; | ||
_identifier = string.IsNullOrEmpty(options.Identifier) ? ServiceBusClientUtils.GenerateIdentifier(entityPath) : options.Identifier; | ||
_entityPath = entityPath; | ||
_defaultMaxWaitTime = client.DefaultMaxWaitTime; | ||
_autoCompleteMessages = options.AutoCompleteMessages; | ||
_receiveMode = options.ReceiveMode; | ||
_prefetchCount = options.PrefetchCount; | ||
_maxConcurrentCalls = options.MaxConcurrentCalls; | ||
_maxAutoLockRenewalDuration = options.MaxAutoLockRenewalDuration; | ||
var receiverOptions = CreateReceiverOptions(options, _identifier); | ||
_receiver = receiverFactory(receiverOptions, client); | ||
Provider = client.Provider; | ||
_concurrencySemaphore = new SemaphoreSlim(_maxConcurrentCalls, _maxConcurrentCalls); | ||
} | ||
|
||
private static ServiceBusReceiverOptions CreateReceiverOptions(ServiceBusProcessorOptions options, string identifier) | ||
=> new() | ||
{ | ||
ReceiveMode = options.ReceiveMode, | ||
PrefetchCount = options.PrefetchCount, | ||
Identifier = $"{identifier}-receiver" | ||
}; | ||
|
||
|
||
private static string FormatEntityPath(string topicName, string subscriptionName) | ||
=> InMemoryServiceBusSubscription.FormatEntityPath(topicName, subscriptionName); | ||
|
||
public static InMemoryServiceBusProcessor FromQueue(InMemoryServiceBusQueue queue, ServiceBusProcessorOptions? options = null) | ||
{ | ||
var client = InMemoryServiceBusClient.FromNamespace(queue.Namespace); | ||
return new InMemoryServiceBusProcessor(client, queue.QueueName, options ?? new ServiceBusProcessorOptions()); | ||
} | ||
|
||
public static InMemoryServiceBusProcessor FromSubscription(InMemoryServiceBusSubscription subscription, ServiceBusProcessorOptions? options = null) | ||
{ | ||
var client = InMemoryServiceBusClient.FromNamespace(subscription.Topic.Namespace); | ||
return new InMemoryServiceBusProcessor(client, subscription.TopicName, subscription.SubscriptionName, options ?? new ServiceBusProcessorOptions()); | ||
} | ||
#endregion | ||
|
||
#region Properties | ||
public override bool AutoCompleteMessages => _autoCompleteMessages; | ||
public override string FullyQualifiedNamespace => _fullyQualifiedNamespace; | ||
public override string EntityPath => _entityPath; | ||
public override string Identifier => _identifier; | ||
public override ServiceBusReceiveMode ReceiveMode => _receiveMode; | ||
public override int PrefetchCount => _prefetchCount; | ||
public override int MaxConcurrentCalls => _maxConcurrentCalls; | ||
public override TimeSpan MaxAutoLockRenewalDuration => _maxAutoLockRenewalDuration; | ||
public InMemoryServiceBusProvider Provider { get; } | ||
public override bool IsClosed => _isClosed; | ||
public override bool IsProcessing => _isProcessing; | ||
#endregion | ||
|
||
#region Close | ||
public override async Task CloseAsync(CancellationToken cancellationToken = default) | ||
{ | ||
await Task.Yield(); | ||
await _stateSemaphore.WaitAsync(cancellationToken); | ||
try | ||
{ | ||
_isClosed = true; | ||
if (_isProcessing) | ||
{ | ||
await StopProcessingUnsafeAsync(); | ||
} | ||
} | ||
finally | ||
{ | ||
_stateSemaphore.Release(); | ||
} | ||
_concurrencySemaphore.Dispose(); | ||
_stateSemaphore.Dispose(); | ||
_processingCts?.Dispose(); | ||
await _receiver.DisposeAsync(); | ||
} | ||
#endregion | ||
|
||
#region Start/Stop Processing | ||
public override async Task StartProcessingAsync(CancellationToken cancellationToken = default) | ||
{ | ||
await Task.Yield(); | ||
|
||
await _stateSemaphore.WaitAsync(cancellationToken); | ||
try | ||
{ | ||
ObjectDisposedException.ThrowIf(_isClosed, nameof(InMemoryServiceBusProcessor)); | ||
|
||
if (_isProcessing) | ||
{ | ||
throw new InvalidOperationException("The processor is already processing messages."); | ||
} | ||
|
||
_isProcessing = true; | ||
_processingCts = new CancellationTokenSource(); | ||
_processingTask = Task.Run(() => ProcessMessagesInBackground(_processingCts.Token), cancellationToken); | ||
} | ||
finally | ||
{ | ||
_stateSemaphore.Release(); | ||
} | ||
} | ||
|
||
public override async Task StopProcessingAsync(CancellationToken cancellationToken = default) | ||
{ | ||
await Task.Yield(); | ||
|
||
await _stateSemaphore.WaitAsync(cancellationToken); | ||
try | ||
{ | ||
await StopProcessingUnsafeAsync(); | ||
} | ||
finally | ||
{ | ||
_stateSemaphore.Release(); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// StopProcessingUnsafeAsync is used to avoid deadlock between <see cref="CloseAsync"/> and <see cref="StopProcessingAsync"/> | ||
/// </summary> | ||
private async Task StopProcessingUnsafeAsync() | ||
{ | ||
if (!_isProcessing) | ||
{ | ||
return; | ||
} | ||
|
||
_processingCts?.Cancel(); | ||
|
||
if (_processingTask != null) | ||
{ | ||
try | ||
{ | ||
await _processingTask; | ||
} | ||
finally | ||
{ | ||
_processingTask.Dispose(); | ||
_processingTask = null; | ||
} | ||
} | ||
|
||
_isProcessing = false; | ||
} | ||
|
||
private async Task ProcessMessagesInBackground(CancellationToken cancellationToken) | ||
{ | ||
var activeTasks = new List<Task>(); | ||
|
||
try | ||
{ | ||
while (!cancellationToken.IsCancellationRequested && !_isClosed) | ||
{ | ||
try | ||
{ | ||
var messages = await _receiver.ReceiveMessagesAsync(MaxConcurrentCalls, _defaultMaxWaitTime, cancellationToken); | ||
|
||
if (messages.Count == 0) | ||
{ | ||
continue; | ||
} | ||
|
||
foreach (var message in messages) | ||
{ | ||
await _concurrencySemaphore.WaitAsync(cancellationToken); | ||
Task messageTask; | ||
try | ||
{ | ||
messageTask = Task.Run(() => ProcessSingleMessageAsync(message, cancellationToken), cancellationToken); | ||
} | ||
catch | ||
{ | ||
_concurrencySemaphore.Release(); | ||
throw; | ||
} | ||
|
||
activeTasks.Add(messageTask); | ||
|
||
if (activeTasks.Count > MaxConcurrentCalls) | ||
{ | ||
activeTasks.RemoveAll(t => t.IsCompleted); | ||
} | ||
} | ||
} | ||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
{ | ||
break; | ||
} | ||
catch (Exception ex) | ||
{ | ||
await HandleErrorAsync(ex, cancellationToken); | ||
} | ||
} | ||
} | ||
finally | ||
{ | ||
if (activeTasks.Count > 0) | ||
{ | ||
await Task.WhenAll(activeTasks); | ||
} | ||
} | ||
} | ||
|
||
private async Task ProcessSingleMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken) | ||
{ | ||
try | ||
{ | ||
var processMessageEventArgs = new ProcessMessageEventArgs( | ||
message, | ||
_receiver, | ||
Identifier, | ||
cancellationToken); | ||
|
||
await OnProcessMessageAsync(processMessageEventArgs); | ||
if (AutoCompleteMessages) | ||
{ | ||
await _receiver.CompleteMessageAsync(message, cancellationToken); | ||
} | ||
|
||
} | ||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
{ | ||
// Suppress OperationCanceledException to prevent it from interrupting processor shutdown | ||
return; | ||
} | ||
catch (Exception ex) | ||
{ | ||
await _receiver.AbandonMessageAsync(message, cancellationToken: cancellationToken); | ||
tomas-pajurek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
await HandleErrorAsync(ex, cancellationToken); | ||
} | ||
finally | ||
{ | ||
_concurrencySemaphore.Release(); | ||
} | ||
} | ||
|
||
private async Task HandleErrorAsync(Exception exception, CancellationToken cancellationToken) | ||
{ | ||
try | ||
{ | ||
var errorArgs = new ProcessErrorEventArgs( | ||
exception, | ||
ServiceBusErrorSource.Receive, | ||
FullyQualifiedNamespace, | ||
EntityPath, | ||
Identifier, | ||
cancellationToken); | ||
|
||
await OnProcessErrorAsync(errorArgs); | ||
} | ||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
{ | ||
// Suppress OperationCanceledException to prevent it from interrupting processor shutdown | ||
return; | ||
} | ||
} | ||
#endregion | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.