|
| 1 | +--- |
| 2 | +title: "☁️ 🤔 - Messaging between servers with RabbitMQ (Queueing)" |
| 3 | +slug: cloud-agnostic-5 |
| 4 | +tags: [software-development, dotnet, web, azure] |
| 5 | +date: 2025-04-24 21:20:00 |
| 6 | +topic: "azure-ahead" |
| 7 | +--- |
| 8 | + |
| 9 | +<TopicToc |
| 10 | + title="Cloud agnostic series" |
| 11 | + topicId="azure-ahead" |
| 12 | + active={frontmatter.title} |
| 13 | + closed |
| 14 | + /> |
| 15 | + |
| 16 | +Sending messages between processes is an ubiquitous pattern that has been in use for many decades - as such, |
| 17 | +the available alternatives are plenty. |
| 18 | + |
| 19 | +In the past I have only heard good things about [RabbitMQ][1]. What I did not know before I looked at it is |
| 20 | +that it is able to cover both our Inter-process communication (**IPC**) requirements by using queues as well as providing |
| 21 | +a publish / subscribe model. |
| 22 | + |
| 23 | +## RabbitMQ as an Aspire resource |
| 24 | + |
| 25 | +Support to use RabbitMQ within an Aspire solution is done via the `Aspire.Hosting.RabbitMQ` nuget package: |
| 26 | + |
| 27 | +<GHEmbed showHint repo="ahead-dockerized" branch="snapshot_1" file="AppHost/Program.cs" start={20} end={22} /> |
| 28 | + |
| 29 | +This references two secrets that are defined as `ParameterResource`s: |
| 30 | + |
| 31 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="AppHost/Program.cs" start={6} end={7} /> |
| 32 | + |
| 33 | +All of this gets referenced by our two projects **Web** and **Backend** (here exemplified with the Backend project). |
| 34 | + |
| 35 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="AppHost/Program.cs" start={33} end={37} /> |
| 36 | + |
| 37 | +I only learned of `WaitFor` a little bit later - which is a very practical way to ensure |
| 38 | +that an infrastructure resource like RabbitMQ is already available once our own code starts running. |
| 39 | + |
| 40 | +For our playground, I chose sending a queue message for triggering the **making of a report** and a **page published** _event_ |
| 41 | +that in our real system is handled by multiple consumers to perform a number of tasks of which two here are exemplified. |
| 42 | + |
| 43 | +<Info> |
| 44 | +### A word about RabbitMQ |
| 45 | +From the time I've had to look into RabbitMQ, it appears to be a Swiss army knife of messaging. |
| 46 | +* Queues can take many forms (temporary or well-known, persistent or transient) |
| 47 | +* Exchanges can be defined and on top of this different strategies can be used to get the message as often and to as many locations as necessary. |
| 48 | + |
| 49 | +If you have IPC demands maybe even between processes written in different programming languages, I suggest that you take a look at RabbitMQ, |
| 50 | +chances are that it will be able to cover your particular scenarios as it provides clients for a multitude of different environments. |
| 51 | + |
| 52 | +The relevant .NET-specific documentation [can be found here][2]. |
| 53 | +</Info> |
| 54 | + |
| 55 | +## Using queues |
| 56 | + |
| 57 | +Queues are a very useful pattern to offload work to a different process as well as helping to ensure that your |
| 58 | +compute resources don't get overwhelmed, as you have control over how many "workers" may dequeue messages. |
| 59 | + |
| 60 | + |
| 61 | +<figcaption>Queues as a way to pass a message to a different process and control the workload at any given time</figcaption> |
| 62 | + |
| 63 | +When using RabbitMQ, we should declare the queue we want to use from "both sides", after which we can perform en- & dequeue operations: |
| 64 | + |
| 65 | +```mermaid |
| 66 | +sequenceDiagram |
| 67 | + participant Web |
| 68 | + participant RabbitMQ |
| 69 | + participant Backend |
| 70 | +
|
| 71 | + activate Backend |
| 72 | + Backend->>RabbitMQ: DeclareQueue(Q) |
| 73 | + activate RabbitMQ |
| 74 | + RabbitMQ-->>Backend: ok |
| 75 | + deactivate RabbitMQ |
| 76 | + deactivate Backend |
| 77 | +
|
| 78 | + activate Web |
| 79 | + Web->>RabbitMQ: DeclareQueue(Q) |
| 80 | + activate RabbitMQ |
| 81 | + RabbitMQ-->>Web: ok |
| 82 | + Web->>RabbitMQ: Publish(M) |
| 83 | + activate RabbitMQ |
| 84 | + deactivate Web |
| 85 | + RabbitMQ->>Backend: Deliver(M) |
| 86 | + activate Backend |
| 87 | + deactivate RabbitMQ |
| 88 | + deactivate Backend |
| 89 | +``` |
| 90 | +<figcaption>Declaring Queues is an idempotent operation and is used by "both sides" to ensure a queue</figcaption> |
| 91 | + |
| 92 | +In order to use RabbitMQ, I used the `Aspire.RabbitMQ.Client.v7` Nuget, and ensure that it gets set up correctly |
| 93 | +by adding it to the Services in `Program.cs`: |
| 94 | + |
| 95 | +```csharp |
| 96 | +builder.AddRabbitMQClient(connectionName: "messaging"); |
| 97 | +``` |
| 98 | + |
| 99 | +The name `messaging` is related to how we defined the resource in the **Aspire** project (see above). By letting our projects |
| 100 | +reference the resource, the correct information to correctly set up a client is injected into our projects. |
| 101 | + |
| 102 | + |
| 103 | +## Enqueueing |
| 104 | + |
| 105 | +Ahead.Web registers a `QueueSender` at startup with the following shape: |
| 106 | + |
| 107 | +```csharp |
| 108 | +public class QueueSender(IConnection connection, ILogger<QueueSender> logger) { |
| 109 | + public async Task Send<T>(string queueName, T message) |
| 110 | +} |
| 111 | +``` |
| 112 | + |
| 113 | +Pretty much everything we want to do with RabbitMQ appears to have to be done with a `Channel`: |
| 114 | + |
| 115 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Web/Infrastructure/QueueSender.cs" start={15} end={21} /> |
| 116 | + |
| 117 | +In this demo solution I did not bother making anything from RabbitMQ persistent, hence we do not have to ask for persistence guarantees. |
| 118 | +Note that… |
| 119 | + |
| 120 | +> An exclusive queue can only be used (consumed from, purged, deleted, etc) by its declaring connection |
| 121 | + |
| 122 | +so that would be the wrong thing to use here. The passed in message gets serialized to a **byte array** |
| 123 | +(by serializing the message as JSON stored in a variable named `body`) and sent to the queue: |
| 124 | + |
| 125 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Web/Infrastructure/QueueSender.cs" start={32} end={35} /> |
| 126 | + |
| 127 | +## Dequeueing |
| 128 | + |
| 129 | +A similar helper, but for dequeueing, is defined in the `Ahead.Backend` project and looks like this: |
| 130 | + |
| 131 | +```csharp |
| 132 | +public class QueueListener<T>(IConnection connection, ILogger<QueueListener<T>> logger) |
| 133 | +{ |
| 134 | + public async IAsyncEnumerable<T> StartListening( |
| 135 | + string queueName, |
| 136 | + [EnumeratorCancellation] CancellationToken cancellationToken = default) |
| 137 | +} |
| 138 | +``` |
| 139 | + |
| 140 | +The `IAsyncEnumerable` needs to be built with the aid of the RabbitMQ client library and the trusty |
| 141 | +`Channel` abstraction that .NET provides: |
| 142 | + |
| 143 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Backend/Infrastructure/QueueListener.cs" start={18} end={22} /> |
| 144 | + |
| 145 | +Once more we declare the queue in much the same way as we did on the sending part. |
| 146 | + |
| 147 | +Then we build a so-called consumer that allows us to provide a callback when something is being received: |
| 148 | + |
| 149 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Backend/Infrastructure/QueueListener.cs" start={28} end={29} /> |
| 150 | +<figcaption>I am trying to remember why I called the main input to the handling callback "ea" – bad name!</figcaption> |
| 151 | + |
| 152 | +At the core of the handler we deserialize the message and pass it to the Writer part of a `Channel` instance. |
| 153 | + |
| 154 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Backend/Infrastructure/QueueListener.cs" start={37} end={43} /> |
| 155 | + |
| 156 | +In order to satisfy the signature of the method, we use the `Reader` part of the `Channel`: |
| 157 | + |
| 158 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Backend/Infrastructure/QueueListener.cs" start={52} end={57} /> |
| 159 | + |
| 160 | +_line 52_ also shows how the consumer is associated with the particular queue name. |
| 161 | + |
| 162 | +Given all this Infrastructure we can enqueue a message like |
| 163 | + |
| 164 | +```csharp |
| 165 | +routeBuilder.MapGet("/report", async (QueueSender sender, TimeProvider timeProvider) => |
| 166 | +{ |
| 167 | + await sender.Send( |
| 168 | + Constants.QueueNames.Basic, |
| 169 | + new ReportRequest |
| 170 | + { |
| 171 | + Type = "onboarding", |
| 172 | + RelevantDate = timeProvider.GetUtcNow().Date |
| 173 | + }); |
| 174 | + |
| 175 | + return Results.Extensions.BackToHomeWithMessage("Report sent!"); |
| 176 | +}); |
| 177 | +``` |
| 178 | + |
| 179 | +and dequeue it as |
| 180 | + |
| 181 | +```csharp |
| 182 | +await foreach (var reportRequest in queue.StartListening(Constants.QueueNames.Basic, cancellationToken)) |
| 183 | + logger.LogInformation( |
| 184 | + "Report request of type {reportType} received for date {reportDate}", |
| 185 | + reportRequest.Type, |
| 186 | + reportRequest.RelevantDate); |
| 187 | +``` |
| 188 | + |
| 189 | +Let's put a ✅ on queueing, next will be publishing messages to which others may subscribe. |
| 190 | + |
| 191 | +[1]: https://www.rabbitmq.com |
| 192 | +[2]: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet |
0 commit comments