Kafka CLI in C#

I wrote a couple of blog posts this month about running Kafka commands in C# because the CLI from Confluent seems to have changed its authentication requirements. These changes make it difficult for me to use a Docker based Kafka with no authentication. Why no authentication? Because it’s on my local machine and I don’t want to have to set up authentication just to test things out.

The earlier posts were about running the producer and consumer commands from C#. This post combines those two commands and adds listing, creating, and deleting topics.

The code uses System.CommandLine to give the CLI an easy to use structure and help text.

  1#!/usr/bin/dotnet run
  2#:package Confluent.Kafka@2.5.0
  3#:package System.CommandLine@2.0.2
  4
  5using Confluent.Kafka;
  6using Confluent.Kafka.Admin;
  7using System.CommandLine;
  8
  9Option<string> urlOption = new("--url")
 10{
 11    Description = "The url of the Kafka cluster to connect to. Default: localhost:9092",
 12    Required = true,
 13    DefaultValueFactory = (_) =>  "localhost:9092"
 14};
 15
 16Option<string> topicOption = new("--topic")
 17{
 18    Description = "The topic to produce messages to.",
 19    Required = true
 20};
 21
 22Option<string> groupIdOption = new("--group-id")
 23{
 24    Description = "The group-id to use for the consumer. If not provided, a random groupId will be generated.",
 25    Required = false,
 26    DefaultValueFactory = (_) => Guid.NewGuid().ToString()
 27};
 28
 29Option<short> partitionsOption = new("--partitions")
 30{
 31    Description = "The number of partitions to create for the topic. Default is 1, but should be set to at least 3 for production use.",
 32    Required = true,
 33    DefaultValueFactory = (_) => 1
 34};
 35
 36Option<short> replicationFactorOption = new("--replication-factor")
 37{
 38    Description = "The replication factor to use for the topic. Default is 1, but should be set to at least 2 for production use.",
 39    Required = true,
 40    DefaultValueFactory = (_) => 1
 41};
 42
 43Command createTopicCommand = new ("create-topic", "Create a topic")
 44{
 45    urlOption, topicOption, partitionsOption, replicationFactorOption
 46};
 47
 48Command consumeCommand = new ("consume", "Consume from a topic")
 49{
 50    urlOption, topicOption, groupIdOption
 51};
 52
 53Command produceCommand = new ("produce", "Produce to a topic")
 54{
 55    urlOption, topicOption
 56};
 57
 58Command deleteTopicCommand = new ("delete-topic", "Delete a topic")
 59{
 60    urlOption, topicOption
 61};
 62
 63Command listTopicsCommand = new ("list-topics", "List topics in the cluster")
 64{
 65    urlOption
 66};
 67
 68RootCommand rootCommand = new ("A simple command line tool for interacting with Kafka")
 69{
 70    consumeCommand, createTopicCommand, produceCommand, deleteTopicCommand, listTopicsCommand
 71};
 72
 73ParseResult parseResult = rootCommand.Parse(args);
 74parseResult.Invoke();
 75
 76if (parseResult.Errors.Count == 0 )
 77{
 78    string? urlValue = parseResult.GetValue(urlOption);
 79    Console.WriteLine($"Url: {parseResult.GetValue(urlOption)}");
 80    string commandName = parseResult.CommandResult.Command.Name;
 81    switch (commandName)
 82    {
 83        case "create-topic":
 84        await CreateTopic(urlValue, 
 85                    parseResult.GetValue(topicOption), 
 86                    parseResult.GetValue(partitionsOption), 
 87                    parseResult.GetValue(replicationFactorOption));
 88            break;
 89        case "consume":
 90            Consume(urlValue, 
 91                    parseResult.GetValue(topicOption), 
 92                    parseResult.GetValue(groupIdOption));
 93            break;
 94        case "produce":
 95            Produce(urlValue, parseResult.GetValue(topicOption));
 96            break;
 97        case "delete-topic":
 98            await DeleteTopic(urlValue, parseResult.GetValue(topicOption));
 99            break;
100        case "list-topics":
101            ListTopics(urlValue);
102            break;
103    }
104}
105
106async Task CreateTopic(string? url, string? topic, int partitions, short replicationFactor)
107{
108    Console.WriteLine("CreateTopic command invoked");
109    var adminConfig = new AdminClientConfig { BootstrapServers = url };    
110    var admin = new AdminClientBuilder(adminConfig).Build();
111    
112    var topicSpecification = new TopicSpecification
113    {
114        Name = topic,
115        NumPartitions = partitions,
116        ReplicationFactor = replicationFactor
117    };
118
119    await admin.CreateTopicsAsync([topicSpecification]).ContinueWith(task =>
120    {
121        if (task.IsCompletedSuccessfully)
122        {
123            Console.WriteLine($"Topic {args[1]} created successfully.");
124        }
125        else
126        {
127            Console.WriteLine($"Error creating topic: {task.Exception?.Message}");
128        }
129    });
130}
131
132void Consume(string? url, string? topic, string? groupId)
133{
134    Console.WriteLine("Consume command invoked");
135    ConsumerConfig _consumerConfig = new ConsumerConfig
136    {
137        BootstrapServers = url,
138        GroupId = groupId,
139        AutoOffsetReset = AutoOffsetReset.Earliest,
140        EnableAutoCommit = true, // this is the default but good to know about
141        EnableAutoOffsetStore = true // this is the default but good to know about
142    };
143
144    using var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build();
145    consumer.Subscribe(topic);
146
147    Console.WriteLine($"Looking for messages on topic: {topic}");
148    while (true)
149    {
150        var consumeResult = consumer.Consume();
151        Console.WriteLine($"{consumeResult.Message.Value}");
152    }
153}
154
155async Task DeleteTopic(string? url, string? topic)
156{
157    Console.WriteLine("DeleteTopic command invoked");
158    var adminConfig = new AdminClientConfig { BootstrapServers = url };
159
160    using var admin = new AdminClientBuilder(adminConfig).Build();
161    await admin.DeleteTopicsAsync([topic]).ContinueWith(task =>
162    {
163        if (task.IsCompletedSuccessfully)
164        {
165            Console.WriteLine($"Topic {topic} deleted successfully.");
166        }
167        else
168        {
169            Console.WriteLine($"Error deleting topic: {task.Exception?.Message}");
170        }
171    });
172
173}
174
175void ListTopics(string? url)
176{
177    Console.WriteLine("ListTopics command invoked");
178    var adminConfig = new AdminClientConfig { BootstrapServers = url };
179    using var admin = new AdminClientBuilder(adminConfig).Build();
180
181    var metadata = admin.GetMetadata(TimeSpan.FromSeconds(10));
182
183    metadata.Topics.ForEach(topic =>
184    {
185        Console.WriteLine($"{topic.Topic}");
186    });
187
188}
189
190void Produce(string? url, string? topic)
191{
192    Console.WriteLine("Produce command invoked");
193    ProducerConfig _producerConfig = new ProducerConfig
194    {
195        BootstrapServers = url
196    };
197    var producer = new ProducerBuilder<string, string>(_producerConfig).Build();
198    Console.WriteLine($"Enter text to send to topic: {topic}. Ctrl+C to exit.");
199    while(true)
200    {
201        var text = Console.ReadLine();
202        if (string.IsNullOrWhiteSpace(text))
203        {
204            continue;
205        }
206
207        var message = new Message<string, string>
208        {
209            Key = Guid.NewGuid().ToString(),
210            Value = $"{text}"
211        };
212        producer.Produce(topic, message);
213    }
214}

Here is the output -

Required command was not provided.

Description:
  A simple command line tool for interacting with Kafka.

Usage:
  Kafka [command] [options]

Options:
  -?, -h, --help  Show help and usage information
  --version       Show version information

Commands:
  consume       Consume from a topic.
  create-topic  Create a topic
  produce       Produce to a topic
  delete-topic  Delete a topic
  list-topics   List topics in the cluster

This is the kind of thing you can easily extend to add more functionality.

comments powered by Disqus

Related