MassTransit - Sending a Message to a Single Consumer

Download full source code.

I’ve been playing with MassTransit recently, trying to use it in a similar way to how I used Kafka - as a simple way to send messages, as publisher/subscriber and competing consumer.

I found the docs difficult to follow, and short of examples. So I thought I’d write a few posts on how to use it.

I’m not going into what MassTransit is, or the concepts around producers, consumers, sagas, etc. I’m just going to give you some code that works!

In this first post, I’ll do the simplest thing - send a message to a queue, and consume it. This uses a single producer and consumer.

RabbitMQ

I’m using RabbitMQ as the message broker, so that needs to be set up first. It is very easy as long as you have Docker installed.

docker run -p 15672:15672 -p 5672:5672 masstransit/rabbitmq

This will set up RabbitMQ on your local machine, and you can access the management console at http://localhost:15672. The default username and password are guest and guest. But I’m not going to discuss RabbitMQ in this post.

The Message

To make it easy to have the same message type for both the producer and consumer, I am creating a class library for the message. It is a simple class with a single string.

Create a class library for the message -

dotnet new classlib -n Messages

Add a class named SimpleTextMessage.cs -

1namespace Messages;
2
3public class SimpleTextMessage 
4{
5    public string Text { get; set; }
6}

Note, the namespace is important. Without it, you will get an exception like - System.ArgumentException: Messages types must have a valid namespace: SimpleTextMessage (Parameter 'T').

The Producer

Create a Worker Service for the producer -

dotnet new worker -n Producer

Add the MassTransit.RabbitMQ NuGet package -

dotnet add package MassTransit.RabbitMQ

Add the Messages project to the Producer project -

dotnet add reference ../Messages/Messages.csproj

Open the Program.cs file and replace it with -

 1using MassTransit;
 2using Producer;
 3
 4var builder = Host.CreateApplicationBuilder(args);
 5
 6builder.Services.AddMassTransit(x =>
 7{
 8    x.UsingRabbitMq((context, cfg) =>
 9    {
10        // optional, these are the defaults
11        cfg.Host("rabbitmq://localhost", h =>
12        {
13            h.Username("guest");
14            h.Password("guest");
15        }); 
16    });
17});
18
19builder.Services.AddHostedService<MessageProducer>();
20
21var host = builder.Build();
22host.Run();

As you can see there is very little to it. Add MassTransit to the services, configure RabbitMQ, and add the MessageProducer as a hosted service.

Create a MessageProducer.cs file and add the following -

 1using MassTransit;
 2using Messages;
 3
 4namespace Producer;
 5
 6public class MessageProducer(IBus _bus) : BackgroundService
 7{
 8    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 9    {
10        var endpoint_message_queue1 = await _bus.GetSendEndpoint(new Uri("rabbitmq://localhost/message-queue1"));
11        int counter = 0;   
12
13        while (!stoppingToken.IsCancellationRequested)
14        {
15            Console.WriteLine($"{++counter} Sending message to message-queue1");
16            await endpoint_message_queue1.Send(new SimpleTextMessage { Text = $"{counter} Current time: {DateTimeOffset.Now}" }, stoppingToken);
17            await Task.Delay(1000, stoppingToken);
18        }
19    }
20}
  • Line 10, gets the endpoint for the queue message-queue1
  • Line 16, sends a message to the endpoint, note that Send is asynchronous.

That’s all there is to the producer.

The Consumer

This follows a similar pattern to the producer. Create a console application for the consumer -

dotnet new console -n Consumer

Add the MassTransit.RabbitMQ NuGet package -

   
dotnet add package MassTransit.RabbitMQ

Add the Messages project to the Consumer project -

dotnet add reference ../Messages/Messages.csproj

Open the Program.cs file and replace it with -

 1using Consumer;
 2using MassTransit;
 3using Microsoft.Extensions.Hosting;
 4
 5var builder = Host.CreateApplicationBuilder(args);
 6builder.Services.AddMassTransit(x =>
 7{
 8    x.AddConsumer<MessageConsumer>();
 9    x.UsingRabbitMq((context, cfg) =>
10    {
11        cfg.ReceiveEndpoint("message-queue1", e =>
12        {
13            e.ConfigureConsumer<MessageConsumer>(context);
14        });
15        cfg.ConfigureEndpoints(context);
16    });
17});
18
19var host = builder.Build();
20host.Run();

Line 6, adds MassTransit to the service collection Line 8, adds the MessageConsumer as a consumer Line 9, configures RabbitMQ

Create a MessageConsumer.cs file and add the following -

 1 
 2using Messages;
 3using MassTransit;
 4
 5namespace Consumer;
 6
 7public class MessageConsumer : IConsumer<SimpleTextMessage>
 8{
 9    public Task Consume(ConsumeContext<SimpleTextMessage> context)
10    {
11        Console.WriteLine($"Received Text: {context.Message.Text}");
12
13        return Task.CompletedTask;
14    }
15}

That’s all there is to the consumer.

Running the Producer and Consumer

Start the producer, and the consumer - it doesn’t matter which order you do it in.

From the producer you will see output like -

1 Sending message to message-queue1
2 Sending message to message-queue1
3 Sending message to message-queue1
...

And from the consumer you will see output like -

   
Received Text: 1 Current time: 12/4/2024 9:03:42 PM -05:00
Received Text: 2 Current time: 12/4/2024 9:03:43 PM -05:00
Received Text: 3 Current time: 12/4/2024 9:03:44 PM -05:00
...

That’s it, you are now sending messages via MassTransit and RabbitMQ and consuming them.

This example shows how a single consumer can consume messages from a single queue. In the next post, I will show to have multiple consumers consuming messages from the same queue - the competing consumer approach.

Download full source code.

comments powered by Disqus

Related