|
| 1 | +--- |
| 2 | +title: "☁️ 🤔 - Messaging between servers with RabbitMQ (Publish/Subscribe)" |
| 3 | +slug: cloud-agnostic-6 |
| 4 | +tags: [software-development, dotnet, web, azure] |
| 5 | +date: 2025-04-25 21:20:00 |
| 6 | +topic: "azure-ahead" |
| 7 | +--- |
| 8 | + |
| 9 | +<TopicToc |
| 10 | + title="Cloud agnostic series" |
| 11 | + topicId="azure-ahead" |
| 12 | + active={frontmatter.title} |
| 13 | + closed |
| 14 | + /> |
| 15 | + |
| 16 | +Time to look at the second messaging pattern. A message can have the notion of an _event_, |
| 17 | +where you want several systems to be _notified_. |
| 18 | + |
| 19 | +RabbitMQ's approach here is to declare an **Exchange**. Every participant interested in messages posted to said exchange |
| 20 | +create a **non-persistent, temporary queue** tied to said exchange and dequeue incoming messages to handle them. |
| 21 | + |
| 22 | +The following sequence diagram shows what happens, based on the example outlined in the [Ahead.Dockerized solution][3]. |
| 23 | + |
| 24 | +```mermaid |
| 25 | +sequenceDiagram |
| 26 | + participant Web |
| 27 | + participant RabbitMQ |
| 28 | + participant Backend |
| 29 | +
|
| 30 | + activate Backend |
| 31 | + activate Web |
| 32 | + Web->>RabbitMQ: DeclareExchange(E) |
| 33 | + deactivate Web |
| 34 | + create participant NotificationGenerator |
| 35 | + Backend ->> NotificationGenerator: IHostedService |
| 36 | + NotificationGenerator->>RabbitMQ: DeclareExchange(E) |
| 37 | + NotificationGenerator->>RabbitMQ: DeclareTemporaryQueue(E) |
| 38 | + create participant SearchIndexUpdater |
| 39 | + Backend ->> SearchIndexUpdater: IHostedService |
| 40 | + deactivate Backend |
| 41 | +
|
| 42 | + SearchIndexUpdater->>RabbitMQ: DeclareExchange(E) |
| 43 | + SearchIndexUpdater->>RabbitMQ: DeclareTemporaryQueue(E) |
| 44 | +
|
| 45 | + activate Web |
| 46 | + Web->>RabbitMQ: Publish(E, M); |
| 47 | + deactivate Web |
| 48 | + |
| 49 | + RabbitMQ->>NotificationGenerator: Enqueue(M, Temporary Queue) |
| 50 | + activate NotificationGenerator |
| 51 | + NotificationGenerator->>NotificationGenerator: Handle message |
| 52 | + deactivate NotificationGenerator |
| 53 | + RabbitMQ->>SearchIndexUpdater: Enqueue(M, Temporary Queue) |
| 54 | + activate SearchIndexUpdater |
| 55 | + SearchIndexUpdater->>SearchIndexUpdater: Do work |
| 56 | + deactivate SearchIndexUpdater |
| 57 | +``` |
| 58 | +<figcaption> |
| 59 | +In real life, our _"Backend"_ is an azure functions host project. As such, it comes with many affordances |
| 60 | +like easy binding to queues on a function/endpoint level. |
| 61 | +This is represented here by `IHostedService` instances that run throughout the lifetime of the process. |
| 62 | +</figcaption> |
| 63 | + |
| 64 | +The so-called `BroadcastSender` is fairly similar to the queue sender: |
| 65 | + |
| 66 | +```csharp |
| 67 | +public class BroadcastSender(IConnection connection, ILogger<BroadcastSender> logger) |
| 68 | +{ |
| 69 | + public async Task Send<T>(string broadcastExchange, T message) |
| 70 | +} |
| 71 | +``` |
| 72 | + |
| 73 | +Then we declare the exchange to be used: |
| 74 | + |
| 75 | +<GHEmbed showHint repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Web/Infrastructure/BroadcastSender.cs" start={15} end={15} /> |
| 76 | + |
| 77 | +And use the channel once more to publish, this time with a specific `PublicationAddress`. |
| 78 | + |
| 79 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Web/Infrastructure/BroadcastSender.cs" start={26} end={29} /> |
| 80 | + |
| 81 | +This class is then used in the following way: |
| 82 | + |
| 83 | +```csharp title="Publish event" |
| 84 | +routeBuilder.MapGet("/publish", async (BroadcastSender sender, Random random) => |
| 85 | +{ |
| 86 | + await sender.Send( |
| 87 | + Constants.BroadcastExchanges.UserEvents, |
| 88 | + new PagePublishedEvent(random.Next(1, 1000).ToString())); |
| 89 | + return Results.Extensions.BackToHomeWithMessage("Page has been published!"); |
| 90 | +}); |
| 91 | +``` |
| 92 | + |
| 93 | +In the `Backend` the `BroadcastListener` also exposes a method to obtain an `AsyncEnumerable` (done again with a `Channel`). |
| 94 | + |
| 95 | +The one major difference is the way we set up the queue that is being used. |
| 96 | + |
| 97 | +<GHEmbed repo="ahead-dockerized" branch="snapshot_1" file="Ahead.Backend/Infrastructure/BroadcastListener.cs" start={22} end={26} /> |
| 98 | + |
| 99 | +declaring a queue without any parameters basically makes this a transient queue, with a randomly assigned name which |
| 100 | +we can pick up via `tmpQueue.QueueName` |
| 101 | + |
| 102 | +Via injection we can then consume the same message in multiple places, for example like so: |
| 103 | + |
| 104 | +```csharp |
| 105 | +public class SearchIndexUpdater( |
| 106 | + BroadcastListener<PagePublishedEvent> eventBroadcast, |
| 107 | + ILogger<SearchIndexUpdater> logger) : IHostedService |
| 108 | +{ |
| 109 | + public Task StartAsync(CancellationToken token) |
| 110 | + { |
| 111 | + logger.LogInformation("Starting Search index updater"); |
| 112 | + _ = Task.Run(async () => |
| 113 | + { |
| 114 | + await foreach (var publishedEvent in eventBroadcast.StartListening( |
| 115 | + Constants.BroadcastExchanges.UserEvents, |
| 116 | + nameof(SearchIndexUpdater), token)) |
| 117 | + logger.LogInformation( |
| 118 | + "Received page published event for page id {pageId}, will update the search index", |
| 119 | + publishedEvent.PageId); |
| 120 | + }, token); |
| 121 | + return Task.CompletedTask; |
| 122 | + } |
| 123 | + |
| 124 | + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; |
| 125 | +} |
| 126 | +``` |
| 127 | + |
| 128 | +With those infrastructure pieces in place, we leave the lovely world of message-passing with RabbitMQ. |
| 129 | + |
| 130 | +[3]: https://github.com/flq/ahead-dockerized |
0 commit comments