Batch Consume with Kafka and .NET
Download full source code.
As I have discussed before, there are some deficiencies with the Confluent .NET Kafka client. The first was its lack of support for async message consumption, I show how to overcome that here. The second is its lack of batch message consumption, i.e. consume some preset number of messages or whatever arrives in some timeframe.
This post will show how to implement batch consumption with the .NET Kafka client.
Getting Kafka running
I have written a bunch of posts about this, see some of the earlier ones here about Kafka and Docker.
Consuming in batch
I want a couple of things from my batch consumer:
- it should consume a preset number of messages.
- it should return whatever it consumes within a given timeframe.
- consumption should not block anything else I am doing - it should run asynchronously (I solved this in a previous post).
For the first two, I create an extension method on the IConsumer
interface.
Its code is not very complicated but I will go through the interesting lines.
1internal static class ConsumerExtensions
2{
3 static Stopwatch stopwatch = new Stopwatch();
4 internal static IEnumerable<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer, int maxBatchSize, TimeSpan consumeTimeout, TimeoutBehavior timeoutBehavior, CancellationToken cancellationToken)
5 {
6 var consumeResults = new List<ConsumeResult<TKey, TValue>>(maxBatchSize);
7
8 if(timeoutBehavior == TimeoutBehavior.StartAfterFirstMessageReceived)
9 {
10 var result = consumer.Consume(cancellationToken); // this waits for the first message
11 consumeResults.Add(result);
12 }
13
14 stopwatch.Start();
15
16 while (consumeResults.Count < maxBatchSize && stopwatch.Elapsed < consumeTimeout)
17 {
18 var result = consumer.Consume(consumeTimeout - stopwatch.Elapsed); // this can timeout
19 if (result != null)
20 {
21 consumeResults.Add(result);
22 }
23 }
24 stopwatch.Reset();
25 return consumeResults;
26 }
27}
- Line 3, the Stopwatch is used to measure the elapsed time.
- Line 4, the method signature takes the batch size, timeout, behavior of timeout, and a cancellation token.
- Line 8, check if the timeout should start before or after the first message is consumed.
- Lines 10-11, consume the first message without starting the stopwatch.
- Line 14, start the stopwatch to use with timeout.
- Line 16, check if batch size has been met, or timeout has elapsed.
- Line 17-22, consume messages and add to a list.
- Line 24, exiting the method, reset the stopwatch.
Now when you use a Consumer
, there will be a ConsumeBatch
method.
Calling ConsumerBatch
The code above is an extension method, it has to be called from an instance of a Kafka consumer.
To this end, I have an AlphabetConsumer
, with a ReceiveMessagesBatch
method which uses a Kafka Consumer
to read from a Kafka topic. Because of my extension method, I have the ConsumeBatch
method available.
Calling ConsumeBatch
is very similar to calling Consume
. Build the Consumer
, subscribe to the topic, and call ConsumeBatch
with the relevant parameters.
1internal void ReceiveMessagesBatch(string topic, int batchSize, TimeSpan timeout, TimeoutBehavior timeoutBehavior, CancellationToken cancellationToken)
2{
3 using IConsumer<string, string> consumer = new ConsumerBuilder<string, string>(_consumerConfig).
4 Build();
5 consumer.Subscribe(topic);
6
7 // snip...
8 while(true)
9 {
10 var consumeResults = consumer.ConsumeBatch(batchSize, timeout, timeoutBehavior, cancellationToken);
11
12 if (!consumeResults.Any())
13 {
14 Console.WriteLine("No messages received.");
15 continue;
16 }
17 // snip...
18 }
The only difference in usage here from the usual Consume
method is that it takes more parameters.
Calling the AlphabetConsumer
As mentioned above, I want my consumption of messages to be non-blocking, but I still don’t have an async
method to call because my extension method still relies on the synchronous Consume
.
But there is a way to call it asynchronously.
1// snip...
2AlphabetConsumer alphabetConsumer = new AlphabetConsumer(kafkaConnection, groupId);
3
4var batchTask = Task.Factory.StartNew(() => alphabetConsumer.ReceiveMessagesBatch(topic, 10, TimeSpan.FromSeconds(5), TimeoutBehavior.StartAfterFirstMessageReceived, cancellationToken), TaskCreationOptions.LongRunning);
This will start the asynchronous batch consumer which looks for up to 10 messages, in up to a 5 second window, and the timeout starts after the first message is received.
All this is very good, but you also need something to put messages in the topic.
The producer
In the attached source code, you will see a Producer
project that sends letters of the alphabet to a topic.
Putting it all together
Run Kafka.
Start the Producer
, it will create the Alphabe topic and send messages to that topic.
Start the AlphabetBatchConsumer
, it will consume the messages from the topic in groups of 10 messages at a time.
Here is the output -
GroupId: default
Looking for messages on topic: alphabet
Received: 'A'. Topic 'alphabet'. Offset: 572.
Received: 'B'. Topic 'alphabet'. Offset: 573.
Received: 'C'. Topic 'alphabet'. Offset: 574.
Received: 'D'. Topic 'alphabet'. Offset: 575.
Received: 'E'. Topic 'alphabet'. Offset: 576.
Received: 'F'. Topic 'alphabet'. Offset: 577.
Received: 'G'. Topic 'alphabet'. Offset: 578.
Received: 'H'. Topic 'alphabet'. Offset: 579.
Received: 'I'. Topic 'alphabet'. Offset: 580.
Received: 'J'. Topic 'alphabet'. Offset: 581.
Looking for messages on topic: alphabet
Received: 'K'. Topic 'alphabet'. Offset: 582.
//snip...
Received: 'T'. Topic 'alphabet'. Offset: 591.
Looking for messages on topic: alphabet
Received: 'U'. Topic 'alphabet'. Offset: 592.
//snip...
Received: 'Z'. Topic 'alphabet'. Offset: 597.
Looking for messages on topic: alphabet
You can see from the output that the consumer is consuming messages in batches of 10.
Download full source code.