☁️ Agnostic – Messaging between servers with RabbitMQ (Publish/Subscribe)

Cloud agnostic series
  1. Depending on a colour
  2. Depending on a colour Pt. 2 - Needs, Possibilities and Limitations
  3. Cloud Agnostic - introducing the AheadDockerized .NET Solution
  4. ☁️ Agnostic - Storing & serving files
  5. ☁️ Agnostic - Messaging between servers with RabbitMQ (Queueing)
  6. ☁️ Agnostic – Messaging between servers with RabbitMQ (Publish/Subscribe)
  7. ☁️ Agnostic – Intermezzo: Insightful spans with OpenTelemetry
  8. ☁️ Agnostic – On gremlins and graphs

Time to look at the second messaging pattern. A message can have the notion of an event, where you want several systems to be notified.

RabbitMQ’s approach here is to declare an Exchange. Every participant interested in messages posted to said exchange create a non-persistent, temporary queue tied to said exchange and dequeue incoming messages to handle them.

The following sequence diagram shows what happens, based on the example outlined in the Ahead.Dockerized solution.

In real life, our “Backend” is an azure functions host project. As such, it comes with many affordances like easy binding to queues on a function/endpoint level. This is represented here by IHostedService instances that run throughout the lifetime of the process.

The so-called BroadcastSender is fairly similar to the queue sender:

public class BroadcastSender(IConnection connection, ILogger<BroadcastSender> logger)
{
public async Task Send<T>(string broadcastExchange, T message)
}

Then we declare the exchange to be used:

Gist not showing up? Most likely you're using Chrome. Reload the page to see the gist.

And use the channel once more to publish, this time with a specific PublicationAddress.

This class is then used in the following way:

Publish event
routeBuilder.MapGet("/publish", async (BroadcastSender sender, Random random) =>
{
await sender.Send(
Constants.BroadcastExchanges.UserEvents,
new PagePublishedEvent(random.Next(1, 1000).ToString()));
return Results.Extensions.BackToHomeWithMessage("Page has been published!");
});

In the Backend the BroadcastListener also exposes a method to obtain an AsyncEnumerable (done again with a Channel).

The one major difference is the way we set up the queue that is being used.

declaring a queue without any parameters basically makes this a transient queue, with a randomly assigned name which we can pick up via tmpQueue.QueueName

Via injection we can then consume the same message in multiple places, for example like so:

public class SearchIndexUpdater(
BroadcastListener<PagePublishedEvent> eventBroadcast,
ILogger<SearchIndexUpdater> logger) : IHostedService
{
public Task StartAsync(CancellationToken token)
{
logger.LogInformation("Starting Search index updater");
_ = Task.Run(async () =>
{
await foreach (var publishedEvent in eventBroadcast.StartListening(
Constants.BroadcastExchanges.UserEvents,
nameof(SearchIndexUpdater), token))
logger.LogInformation(
"Received page published event for page id {pageId}, will update the search index",
publishedEvent.PageId);
}, token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

With those infrastructure pieces in place, we leave the lovely world of message-passing with RabbitMQ.