Skip to content

Message Storage

Robin Rodricks edited this page Jan 18, 2023 · 8 revisions

Getting Started

Messaging is intended for message passing between one or more systems in disconnected fashion. You can send a message somewhere and current or remote system picks it up for processing later when required. This paradigm somehow fits into CQRS and Message Passing architectural ideas.

To name a few examples, Apache Kafka, RabbitMQ, Azure Service Bus are all falling into this category - essentially they are designed to pass messages. Some systems are more advanced to others of course, but most often it doesn't really matter.

FluentStorage supports many messaging providers out of the box, including Azure Service Bus Topics and Queues, Azure Event Hub and others.

There are two abstractions available - message publisher and message receiver. As the name stands, one is publishing messages, and another is receiving them on another end.

Publishing Messages

To publish messages you will usually construct an instance of IMessagePublisher with an appropriate implementation. All the available implementations can be created using factory methods in the FluentStorage.StorageFactory.Messages class. More methods appear in that class as you reference an assembly containing specific implementations.

Receiving Messages

Similarly, to receive messages you can use factory methods to create receivers which all implement IMessageReceiver interface.

The primary method of this interface

Task StartMessagePumpAsync(
	Func<IEnumerable<QueueMessage>, Task> onMessageAsync,
	int maxBatchSize = 1,
	CancellationToken cancellationToken = default);

starts a message pump that listens for incoming queue messages and calls Func<IEnumerable<QueueMessage>, Task> as a call back to pass those messages to your code.

maxBatchSize is a number specifying how many messages you are ready to handle at once in your callback. Choose this number carefully as specifying number too low will result in slower message processing, whereas number too large will increase RAM requirements for your software.

cancellationToken is used to signal the message pump to stop. Not passing any parameter there will result in never stopping message pump. See example below in Use Cases for a pattern on how to use this parameter.

You can find the list of supported messaging implementations here.

Handling Large Messages

FluentStorage provides built-in capability to handle large message content by allowing you to offload message content over a certain threshold to an external blob storage. It works in the following way:

  1. Check that message content is larger than threshold value.
  2. If not, do the usual processing.
  3. If it is, upload message content as a blob to external storage, clear message content and add a custom header x-sn-large that points to the blob containing message content.

When receiving messages, it will check that x-sn-large header is present, and if so, will download blob, set it's content as message content, and return the message to the receiver.

Blob is deleted from the blob storage only when message is confirmed by the receiver.

Large message handling works on any supported queue implementation because it's implemented in the core library itself, outside of specific queue implementation. To enable it, call .HandleLargeContent on both publisher and receiver:

IBlobStorage offloadStorage = ...; // your blob storage for offloading content

IMessagePublisher publisher = StorageFactory.Messages
  .XXXPublisher(...)
  .HandleLargeContent(offloadStorage, thresholdValue);

IMessageReceiver receiver = StorageFactory.Messages
  .XXXReceiver(...)
  .HandleLargeContent(offloadStorage);

Serialising/deserialising QueueMessage

QueueMessage class itself is not a serialisable entity when we talk about JSON or built-in .NET binary serialisation due to the fact it is a functionally rich structure. However, you might want to transfer the whole QueueMessage across the wire sometimes. For these purposes you can use built-in binary methods:

var qm = new QueueMessage("id", "content");
qm.DequeueCount = 4;
qm.Properties.Add("key", "value");

byte[] wireData = qm.ToByteArray();

//transfer the bytes

QueueMessage receivedMessage = QueueMessage.FromByteArray(wireData);

These methods make sure that all of the message data is preserved, and also are backward compatible between any changes to this class.

Providers

In-Memory

In-memory provider creates messaging queue directly in memory, and is useful for mocking message publisher and receiver. It's not intended to be used in production.

The provider is built into FluentStorage main package.

To construct, use:

IMessagePublisher publisher = StorageFactory.Messages.InMemoryPublisher(name);

IMessageReceiver receiver = StorageFactory.Messages.InMemoryReceiver(name);

name in this case is a string that indicates an instance of the queue. Same name points to the same queue, therefore in order to receive messages from a queue with a name, you need to send messages to the queue with the same name.

To construct from a connection string, use:

IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("inmemory://name=the_name");

IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("inmemory://name=the_name");

Local Disk

Local disk messaging is backed by a local folder on disk. Every message publish call creates a new file in that folder with .snm extension (Storage Net Message) which is a binary representation of the message.

Message receiver polls this folder every second to check for new files, get the oldest ones and transforms into QueueMessage.

The provider is built into FluentStorage main package.

To construct, use:

IMessagePublisher publisher = StorageFactory.Messages.DirectoryFilesPublisher(path);

IMessageReceiver receiver = StorageFactory.Messages.DirectoryFilesReceiver(path);

path is the path to the storage directory. It doesn't have to exist at the moment of construction, and will be created automagically.

To construct from a connection string, use:

IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("disk://path=the_path");

IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("disk://path=the_path");

Azure Storage Queue

IMessagePublisher publisher = StorageFactory.Messages.AzureStorageQueuePublisher();

IMessageReceiver receiver = StorageFactory.Messages.AzureStorageQueueReceiver();
IMessagePublisher publisher = StorageFactory.Messages.PublisherFromConnectionString("azure.queue://account=...;key=...;queue=...");

IMessageReceiver receiver = StorageFactory.Messages.ReceiverFromConnectionString("azure.queue://account=..;key=...;queue=...");

Azure Service Bus

In order to use Microsoft Azure Service Bus you need to reference NuGet first. The provider wraps around the standard Microsoft Service Bus SDK.

This provider supports both topics and queues, for publishing and receiving. To construct a publisher use the following:

IMessagePublisher queuePublisher = StorageFactory.Messages.AzureServiceBusQueuePublisher(
                       connectionString,
                       queueName);

IMessagePublisher topicPublisher = StorageFactory.Messages.AzureServiceBusTopicPublisher(
                       connectionString,
                       topicName);

To construct a receiver, use the following:

IMessageReceiver queueReceiver = StorageFactory.Messages.AzureServiceBusQueueReceiver(
                       connectionString,
                       queueName,
                       peekLock);

IMessageReceiver topicReceiver = StorageFactory.Messages.AzureServiceBusTopicReceiver(
                       connectionString,
                       topicName,
                       subscriptionName,
                       peekLock);

peekLock is a flag indicating how to receive the message, is optional, and is true by default.

You also have an options to pass your own MessageHandlerOptions to customise service bus behavior. For instance, if you need to define an exception handler that doesn't ignore errors (default behavior) and set for instance concurrency to more than 1 message (default) you could write the following:

var options = new MessageHandlerOptions(tExceptionReceiverHandler)
{
  AutoComplete = false,
  MaxAutoRenewDuration = TimeSpan.FromMinutes(1),
  MaxConcurrentCalls = 5 //instead of 1
};

private async Task ExceptionReceiverHandler(ExceptionReceivedEventArgs args)
{
    // your exception handling code
}

IMessageReceiver topicReceiver = StorageFactory.Messages.AzureServiceBusTopicReceiver(
                       connectionString,
                       topicName,
                       subscriptionName,
                       peekLock,
                       options);

Note that exception handler in Service Bus is for informational purposes only, it doesn't actually handle exceptions, and in case of errors the SDK retries them automatically.

Amazon Simple Queue

In order to use Amazon Simple Queue you need to reference NuGet first. The provider wraps around the standard AWS SDK.

To construct a publisher use the following:

IMessagePublisher queuePublisher = StorageFactory.Messages.AmazonSQSMessagePublisher(
  accessKeyId,
  secretAccessKey,
  serviceUrl,
  queueName,
  regionEndpoint);

IMessagePublisher topicPublisher = StorageFactory.Messages.AmazonSQSMessageReceiver(
  accessKeyId,
  secretAccessKey,
  serviceUrl,
  queueName,
  regionEndpoint);
  • accessKeyId and *secretAccessKey) are credentials to access the queue.
  • serviceUrl indicates the service URL, for instance https://sqs.us-east-1.amazonaws.com
  • queueName is the name of the queue
  • retionEndpoint is optional and defaults to USEast1