-
-
Notifications
You must be signed in to change notification settings - Fork 51
Message Storage
Messaging is intended for message passing between one or more systems in disconnected fashion. You can send a message somewhere and current or remote system picks it up for processing later when required. This paradigm somehow fits into CQRS and Message Passing architectural ideas.
To name a few examples, Apache Kafka, RabbitMQ, Azure Service Bus are all falling into this category - essentially they are designed to pass messages. Some systems are more advanced to others of course, but most often it doesn't really matter.
FluentStorage supports many messaging providers out of the box, including Azure Service Bus Topics and Queues, Azure Event Hub and others.
There are two abstractions available - message publisher and message receiver. As the name stands, one is publishing messages, and another is receiving them on another end.
To publish messages you will usually construct an instance of IMessagePublisher
with an appropriate implementation. All the available implementations can be created using factory methods in the FluentStorage.StorageFactory.Messages
class. More methods appear in that class as you reference an assembly containing specific implementations.
Similarly, to receive messages you can use factory methods to create receivers which all implement IMessageReceiver
interface.
The primary method of this interface
Task StartMessagePumpAsync(
Func<IEnumerable<QueueMessage>, Task> onMessageAsync,
int maxBatchSize = 1,
CancellationToken cancellationToken = default);
starts a message pump that listens for incoming queue messages and calls Func<IEnumerable<QueueMessage>, Task>
as a call back to pass those messages to your code.
maxBatchSize
is a number specifying how many messages you are ready to handle at once in your callback. Choose this number carefully as specifying number too low will result in slower message processing, whereas number too large will increase RAM requirements for your software.
cancellationToken
is used to signal the message pump to stop. Not passing any parameter there will result in never stopping message pump. See example below in Use Cases for a pattern on how to use this parameter.
You can find the list of supported messaging implementations here.
FluentStorage provides built-in capability to handle large message content by allowing you to offload message content over a certain threshold to an external blob storage. It works in the following way:
- Check that message content is larger than
threshold value
. - If not, do the usual processing.
- If it is, upload message content as a blob to external storage, clear message content and add a custom header
x-sn-large
that points to the blob containing message content.
When receiving messages, it will check that x-sn-large
header is present, and if so, will download blob, set it's content as message content, and return the message to the receiver.
Blob is deleted from the blob storage only when message is confirmed by the receiver.
Large message handling works on any supported queue implementation because it's implemented in the core library itself, outside of specific queue implementation. To enable it, call .HandleLargeContent
on both publisher and receiver:
IBlobStorage offloadStorage = ...; // your blob storage for offloading content
IMessagePublisher publisher = StorageFactory.Messages
.XXXPublisher(...)
.HandleLargeContent(offloadStorage, thresholdValue);
IMessageReceiver receiver = StorageFactory.Messages
.XXXReceiver(...)
.HandleLargeContent(offloadStorage);
QueueMessage
class itself is not a serialisable entity when we talk about JSON or built-in .NET binary serialisation due to the fact it is a functionally rich structure. However, you might want to transfer the whole QueueMessage
across the wire sometimes. For these purposes you can use built-in binary methods:
var qm = new QueueMessage("id", "content");
qm.DequeueCount = 4;
qm.Properties.Add("key", "value");
byte[] wireData = qm.ToByteArray();
//transfer the bytes
QueueMessage receivedMessage = QueueMessage.FromByteArray(wireData);
These methods make sure that all of the message data is preserved, and also are backward compatible between any changes to this class.
In-memory provider creates messaging queue directly in memory, and is useful for mocking message publisher and receiver. It's not intended to be used in production.
The provider is built into FluentStorage main package.
To construct, use:
IMessagePublisher publisher = StorageFactory.Messages.InMemoryPublisher(name);
IMessageReceiver receiver = StorageFactory.Messages.InMemoryReceiver(name);
name
in this case is a string that indicates an instance of the queue. Same name points to the same queue, therefore in order to receive messages from a queue with a name, you need to send messages to the queue with the same name.
To construct from a connection string, use:
IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("inmemory://name=the_name");
IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("inmemory://name=the_name");
Local disk messaging is backed by a local folder on disk. Every message publish call creates a new file in that folder with .snm
extension (Storage Net Message) which is a binary representation of the message.
Message receiver polls this folder every second to check for new files, get the oldest ones and transforms into QueueMessage
.
The provider is built into FluentStorage main package.
To construct, use:
IMessagePublisher publisher = StorageFactory.Messages.DirectoryFilesPublisher(path);
IMessageReceiver receiver = StorageFactory.Messages.DirectoryFilesReceiver(path);
path
is the path to the storage directory. It doesn't have to exist at the moment of construction, and will be created automagically.
To construct from a connection string, use:
IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("disk://path=the_path");
IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("disk://path=the_path");
IMessagePublisher publisher = StorageFactory.Messages.AzureStorageQueuePublisher();
IMessageReceiver receiver = StorageFactory.Messages.AzureStorageQueueReceiver();
IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("azure.queue://account=...;key=...;queue=...");
IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("azure.queue://account=..;key=...;queue=...");
In order to use Microsoft Azure Service Bus you need to reference
first. The provider wraps around the standard Microsoft Service Bus SDK.
This provider supports both topics and queues, for publishing and receiving. To construct a publisher use the following:
IMessagePublisher queuePublisher = StorageFactory.Messages.AzureServiceBusQueuePublisher(
connectionString,
queueName);
IMessagePublisher topicPublisher = StorageFactory.Messages.AzureServiceBusTopicPublisher(
connectionString,
topicName);
To construct a receiver, use the following:
IMessageReceiver queueReceiver = StorageFactory.Messages.AzureServiceBusQueueReceiver(
connectionString,
queueName,
peekLock);
IMessageReceiver topicReceiver = StorageFactory.Messages.AzureServiceBusTopicReceiver(
connectionString,
topicName,
subscriptionName,
peekLock);
peekLock is a flag indicating how to receive the message, is optional, and is true
by default.
You also have an options to pass your own MessageHandlerOptions to customise service bus behavior. For instance, if you need to define an exception handler that doesn't ignore errors (default behavior) and set for instance concurrency to more than 1 message (default) you could write the following:
var options = new MessageHandlerOptions(tExceptionReceiverHandler)
{
AutoComplete = false,
MaxAutoRenewDuration = TimeSpan.FromMinutes(1),
MaxConcurrentCalls = 5 //instead of 1
};
private async Task ExceptionReceiverHandler(ExceptionReceivedEventArgs args)
{
// your exception handling code
}
IMessageReceiver topicReceiver = StorageFactory.Messages.AzureServiceBusTopicReceiver(
connectionString,
topicName,
subscriptionName,
peekLock,
options);
Note that exception handler in Service Bus is for informational purposes only, it doesn't actually handle exceptions, and in case of errors the SDK retries them automatically.
In order to use AWS Simple Queue Service you need to reference
first. The provider wraps around the standard AWS SDK.
To construct a publisher use the following:
IMessagePublisher queuePublisher = StorageFactory.Messages.AmazonSQSMessagePublisher(
accessKeyId,
secretAccessKey,
serviceUrl,
queueName,
regionEndpoint);
IMessagePublisher topicPublisher = StorageFactory.Messages.AmazonSQSMessageReceiver(
accessKeyId,
secretAccessKey,
serviceUrl,
queueName,
regionEndpoint);
- accessKeyId and *secretAccessKey) are credentials to access the queue.
-
serviceUrl indicates the service URL, for instance
https://sqs.us-east-1.amazonaws.com
- queueName is the name of the queue
-
retionEndpoint is optional and defaults to
USEast1