Using the Confluent Kafka Consumer in a Non-Blocking Way in .NET and C#

Download full source code.

I’ve been using Kafka with the Confluent libraries for a while, one major complaint you see is that the Consume method is synchronous. This issue has been raised on the project’s GitHub page, but there has been no movement on it. The C# library for Kafka is a wrapper around a C library, and the discussion says that updating that library is the issue.

Given that the issue was raised in 2018, and there hasn’t been a fix yet, it makes sense to devise your own way of dealing with it.

Fortunately, there are a few ways to handle it and not block on a call to Consume.

The Happy Path

Some applications that use Kafka consume a message, process it, and move to the next message when it arrives. They are fine with a blocking call to the Consume method. While the consume is blocking, the application cannot do anything else - can’t respond to other types of requests, can’t interact with the user, can’t do anything. If this is ok for your application then a blocking call is ok.

The Unhappy Path

Your application does multiple things at the same time. It might be a Web API application that also consumes messages from Kafka (maybe Web API is not the best choice, but set that aside). Once the Consume request blocks the API can’t receive requests.

Another scenario is a background worker application that has multiple workers that do a few things at the same time. It might be monitoring a queue, and consuming messages from Kafka. Once the Consume request blocks the queue won’t be monitored.

The Solution

I’m showing only snippets here, the full source code and the producer are in the attached zip file.

There are a few ways to make the Consume call non-blocking. I covered three ways to make a synchronous call non-blocking in a previous post.

But in this post, I’ll show just one.

The Background Service

I have two identical background services, both using a synchronous call to the Confluent Kafka Consume method. The two consumers use different group ids, so they will both consume the same messages. I’m going to start the two of them. If they were running in a blocking fashion, the first one to start would block the second one from running.

The ExecuteAsync method uses Task.Factory.StartNew to call the ConsumerLoop method. This will make the synchronous ConsumerLoop call non-blocking.

 1protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 2{
 3    await Task.Factory.StartNew(() => ConsumerLoop(stoppingToken), TaskCreationOptions.LongRunning);
 4}
 5
 6private void ConsumerLoop(CancellationToken stoppingToken)
 7{
 8    while (!stoppingToken.IsCancellationRequested)
 9    {
10        var consumeResult = consumer.Consume(stoppingToken);
11
12        Console.WriteLine($"ConsumerA Received: '{consumeResult.Message.Value}'.");
13    }
14}

In Program.cs I add the two background services -

1using ConsumerWorker;
2
3var builder = Host.CreateApplicationBuilder(args);
4builder.Services.AddHostedService<ConsumerA>();
5builder.Services.AddHostedService<ConsumerB>();
6
7var host = builder.Build();
8host.Run();

I’m not showing the code here, but I have a producer that also starts and sends messages to the topic, you can find the code in the attached zip.

When this application starts I see the two background services consuming messages in parallel - neither is blocking.

ConsumerA Received: 'A'.
        ConsumerB Received: 'A'.
        ConsumerB Received: 'B'.
ConsumerA Received: 'B'.
        ConsumerB Received: 'C'.
ConsumerA Received: 'C'.
ConsumerA Received: 'D'.
        ConsumerB Received: 'D'.

Conclusion

It’s unfortunate that there isn’t a natively asynchronous way to consume a message in C#, but this should help some people.

Download full source code.

comments powered by Disqus

Related