☁️ Agnostic - Messaging between servers with RabbitMQ (Queueing)

Cloud agnostic series
  1. Depending on a colour
  2. Depending on a colour Pt. 2 - Needs, Possibilities and Limitations
  3. Cloud Agnostic - introducing the AheadDockerized .NET Solution
  4. ☁️ Agnostic - Storing & serving files
  5. ☁️ Agnostic - Messaging between servers with RabbitMQ (Queueing)
  6. ☁️ Agnostic – Messaging between servers with RabbitMQ (Publish/Subscribe)
  7. ☁️ Agnostic – Intermezzo: Insightful spans with OpenTelemetry
  8. ☁️ Agnostic – On gremlins and graphs

Sending messages between processes is an ubiquitous pattern that has been in use for many decades - as such, the available alternatives are plenty.

In the past I have only heard good things about RabbitMQ. What I did not know before I looked at it is that it is able to cover both our Inter-process communication (IPC) requirements by using queues as well as providing a publish / subscribe model.

RabbitMQ as an Aspire resource

Support to use RabbitMQ within an Aspire solution is done via the Aspire.Hosting.RabbitMQ nuget package:

Gist not showing up? Most likely you're using Chrome. Reload the page to see the gist.

This references two secrets that are defined as ParameterResources:

All of this gets referenced by our two projects Web and Backend (here exemplified with the Backend project).

I only learned of WaitFor a little bit later - which is a very practical way to ensure that an infrastructure resource like RabbitMQ is already available once our own code starts running.

For our playground, I chose sending a queue message for triggering the making of a report and a page published event that in our real system is handled by multiple consumers to perform a number of tasks of which two here are exemplified.

Using queues

Queues are a very useful pattern to offload work to a different process as well as helping to ensure that your compute resources don’t get overwhelmed, as you have control over how many “workers” may dequeue messages.

Aspire Dashboard showing the known resources

Queues as a way to pass a message to a different process and control the workload at any given time

When using RabbitMQ, we should declare the queue we want to use from “both sides”, after which we can perform en- & dequeue operations:

Declaring Queues is an idempotent operation and is used by “both sides” to ensure a queue

In order to use RabbitMQ, I used the Aspire.RabbitMQ.Client.v7 Nuget, and ensure that it gets set up correctly by adding it to the Services in Program.cs:

builder.AddRabbitMQClient(connectionName: "messaging");

The name messaging is related to how we defined the resource in the Aspire project (see above). By letting our projects reference the resource, the correct information to correctly set up a client is injected into our projects.

Enqueueing

Ahead.Web registers a QueueSender at startup with the following shape:

public class QueueSender(IConnection connection, ILogger<QueueSender> logger) {
public async Task Send<T>(string queueName, T message)
}

Pretty much everything we want to do with RabbitMQ appears to have to be done with a Channel:

In this demo solution I did not bother making anything from RabbitMQ persistent, hence we do not have to ask for persistence guarantees. Note that…

An exclusive queue can only be used (consumed from, purged, deleted, etc) by its declaring connection

so that would be the wrong thing to use here. The passed in message gets serialized to a byte array (by serializing the message as JSON stored in a variable named body) and sent to the queue:

Dequeueing

A similar helper, but for dequeueing, is defined in the Ahead.Backend project and looks like this:

public class QueueListener<T>(IConnection connection, ILogger<QueueListener<T>> logger)
{
public async IAsyncEnumerable<T> StartListening(
string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
}

The IAsyncEnumerable needs to be built with the aid of the RabbitMQ client library and the trusty Channel abstraction that .NET provides:

Once more we declare the queue in much the same way as we did on the sending part.

Then we build a so-called consumer that allows us to provide a callback when something is being received:

I am trying to remember why I called the main input to the handling callback “ea” – bad name!

At the core of the handler we deserialize the message and pass it to the Writer part of a Channel instance.

In order to satisfy the signature of the method, we use the Reader part of the Channel:

line 52 also shows how the consumer is associated with the particular queue name.

Given all this Infrastructure we can enqueue a message like

routeBuilder.MapGet("/report", async (QueueSender sender, TimeProvider timeProvider) =>
{
await sender.Send(
Constants.QueueNames.Basic,
new ReportRequest
{
Type = "onboarding",
RelevantDate = timeProvider.GetUtcNow().Date
});
return Results.Extensions.BackToHomeWithMessage("Report sent!");
});

and dequeue it as

await foreach (var reportRequest in queue.StartListening(Constants.QueueNames.Basic, cancellationToken))
logger.LogInformation(
"Report request of type {reportType} received for date {reportDate}",
reportRequest.Type,
reportRequest.RelevantDate);

Let’s put a ✅ on queueing, next will be publishing messages to which others may subscribe.