Using Kafka with .NET

Download full source code.

Like with a lot of things in software, getting started on a new topic can be quite confusing and difficult because there is too little introductory info, or what is there is poorly written.

I had this experience again when trying to get started with Apache Kafka. It wasn’t the general principles of it, they are well enough documented, but I couldn’t figure out how to get it running easily and then use it easily with .NET. I use the word “easily” because I did find plenty of how to run it docs, but some were contradictory, and very complicated talking about installing Java, and Zookeeper, and Docker Compose, and, and…!

Then the .NET examples looked like they were written in a different language and converted to C#, and also seemed overly long with boilerplate code that probably wasn’t necessary.

So that brings me to where I am with this post. I will explain how to get Kafka running easily, and then how to write a .NET application that uses it.

Running Kafka

It wasn’t easy to figure how to run Kafka locally, because there seem to be many ways to do it and a search leads to many pages without clear instructions.

Here is what I did on -

  1. Install and run Docker Desktop.
  2. Download the Confluent executable from here and put it in your path.

There is some more info about the Confluent executable here.

The actual Docker commands that get run are long and complex, but the Confluent application abstracts all that away.

Once you have the Confluent executable, all I need to do is run -

confluent local kafka start

This will do everything that is needed to start Kafka, and the output will show two ports -

+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 64886 |
+-----------------+-------+

The Plaintext Ports is the one I need to connect to Kafka from my applications.

The .NET Applications

I’m not going to explain how Kafka works, or why you should or shouldn’t use it in this post. All I will say is that you need a producer and a consumer.

The producer sends messages to a topic, and the consumer reads them.

The Producer

As the name suggests, this application produces messages and puts them in a Kafka topic. Its code is very simple. I start with a console application and add the Confluent.Kafka package to the project.

The Program.cs file looks like this -

1using Confluent.Kafka;
2
3string kafkaConnection = "127.0.0.1:64886"; // you will need to change this port to what you saw earlier
4string topic = "alphabet";
5
6Producer producer = new Producer(kafkaConnection);
7await producer.SendMessages(topic);

Then the Producer.cs looks like -

 1using Confluent.Kafka;
 2
 3public class Producer
 4{
 5    ProducerConfig _producerConfig;
 6    Random random = new Random(123);
 7
 8    public Producer(string connection)
 9    {
10        _producerConfig = new ProducerConfig
11        {
12            BootstrapServers = connection
13        };
14    }
15
16    public async Task SendMessages(string topic)
17    {
18        using var producer = new ProducerBuilder<string, string>(_producerConfig).Build();
19        for (var letter = 'A'; letter <= 'Z'; letter++)
20        {
21            var message = new Message<string, string>
22            {
23                Key = Guid.NewGuid().ToString(),
24                Value = $"{letter}"
25            };
26
27            var deliveryResult = await producer.ProduceAsync(topic, message);
28            Console.WriteLine($"Sent Message:'{deliveryResult.Value}'. Topic {deliveryResult.TopicPartitionOffset}. Status:{deliveryResult.Status}");
29
30            Thread.Sleep(random.Next(500, 1500));
31        }
32    }
33}

This sends a letter of the alphabet, then sleeps for a period before sending the next letter.

The Consumer

The application consumes messages from the topic and prints them to the screen. If no message is available, the Kafka client will wait until one is.

Again, it has two files, Program.cs looks like this -

1using Confluent.Kafka;
2
3string kafkaConnection = "127.0.0.1:64886";
4string topic = "alphabet";
5
6Consumer consumer = new Consumer(kafkaConnection);
7consumer.ReceiveMessages(topic);

And here is the Consumer.cs file -

 1using Confluent.Kafka;
 2
 3public class Consumer 
 4{
 5    ConsumerConfig _consumerConfig;
 6
 7    public Consumer(string connection)
 8    {
 9        _consumerConfig = new ConsumerConfig
10        {
11            BootstrapServers = connection,
12            GroupId = "default",
13            AutoOffsetReset = AutoOffsetReset.Earliest,
14            EnableAutoCommit = true, // this is the default but good to know about
15            EnableAutoOffsetStore = true // this is the default but good to know about
16        };
17    }
18
19    public void ReceiveMessages(string topic)
20    {
21        using var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build();
22        consumer.Subscribe(topic);
23
24        while (true)
25        {
26            Console.WriteLine($"Looking for a message on topic: {topic}");
27            var consumeResult = consumer.Consume();
28
29            Console.WriteLine($"Received: '{consumeResult.Message.Value}'. Topic '{consumeResult.Topic}'.");
30        }
31    }
32}

Running it

Make sure Kafka is running on the ports you put in the applications.

Start the Producer first. This is important, it will create the topic on Kafka.

Then start the Consumer, it will try to read from the topic.

The output will look like this for the producer -

Sent Message:'A'. Topic alphabet [[0]] @1. Status:Persisted
Sent Message:'B'. Topic alphabet [[0]] @2. Status:Persisted
Sent Message:'C'. Topic alphabet [[0]] @3. Status:Persisted
Sent Message:'D'. Topic alphabet [[0]] @4. Status:Persisted

And for the consumer -

Looking for a message on topic: alphabet
Received: 'A'. Topic 'alphabet'.
Looking for a message on topic: alphabet
Received: 'B'. Topic 'alphabet'.        
Looking for a message on topic: alphabet
Received: 'C'. Topic 'alphabet'.        
Looking for a message on topic: alphabet
Received: 'D'. Topic 'alphabet'.        

Download full source code.

comments powered by Disqus

Related