Kafka CLI in C#
I wrote a couple of blog posts this month about running Kafka commands in C# because the CLI from Confluent seems to have changed its authentication requirements. These changes make it difficult for me to use a Docker based Kafka with no authentication. Why no authentication? Because it’s on my local machine and I don’t want to have to set up authentication just to test things out.
The earlier posts were about running the producer and consumer commands from C#. This post combines those two commands and adds listing, creating, and deleting topics.
The code uses System.CommandLine to give the CLI an easy to use structure and help text.
1#!/usr/bin/dotnet run
2#:package Confluent.Kafka@2.5.0
3#:package System.CommandLine@2.0.2
4
5using Confluent.Kafka;
6using Confluent.Kafka.Admin;
7using System.CommandLine;
8
9Option<string> urlOption = new("--url")
10{
11 Description = "The url of the Kafka cluster to connect to. Default: localhost:9092",
12 Required = true,
13 DefaultValueFactory = (_) => "localhost:9092"
14};
15
16Option<string> topicOption = new("--topic")
17{
18 Description = "The topic to produce messages to.",
19 Required = true
20};
21
22Option<string> groupIdOption = new("--group-id")
23{
24 Description = "The group-id to use for the consumer. If not provided, a random groupId will be generated.",
25 Required = false,
26 DefaultValueFactory = (_) => Guid.NewGuid().ToString()
27};
28
29Option<short> partitionsOption = new("--partitions")
30{
31 Description = "The number of partitions to create for the topic. Default is 1, but should be set to at least 3 for production use.",
32 Required = true,
33 DefaultValueFactory = (_) => 1
34};
35
36Option<short> replicationFactorOption = new("--replication-factor")
37{
38 Description = "The replication factor to use for the topic. Default is 1, but should be set to at least 2 for production use.",
39 Required = true,
40 DefaultValueFactory = (_) => 1
41};
42
43Command createTopicCommand = new ("create-topic", "Create a topic")
44{
45 urlOption, topicOption, partitionsOption, replicationFactorOption
46};
47
48Command consumeCommand = new ("consume", "Consume from a topic")
49{
50 urlOption, topicOption, groupIdOption
51};
52
53Command produceCommand = new ("produce", "Produce to a topic")
54{
55 urlOption, topicOption
56};
57
58Command deleteTopicCommand = new ("delete-topic", "Delete a topic")
59{
60 urlOption, topicOption
61};
62
63Command listTopicsCommand = new ("list-topics", "List topics in the cluster")
64{
65 urlOption
66};
67
68RootCommand rootCommand = new ("A simple command line tool for interacting with Kafka")
69{
70 consumeCommand, createTopicCommand, produceCommand, deleteTopicCommand, listTopicsCommand
71};
72
73ParseResult parseResult = rootCommand.Parse(args);
74parseResult.Invoke();
75
76if (parseResult.Errors.Count == 0 )
77{
78 string? urlValue = parseResult.GetValue(urlOption);
79 Console.WriteLine($"Url: {parseResult.GetValue(urlOption)}");
80 string commandName = parseResult.CommandResult.Command.Name;
81 switch (commandName)
82 {
83 case "create-topic":
84 await CreateTopic(urlValue,
85 parseResult.GetValue(topicOption),
86 parseResult.GetValue(partitionsOption),
87 parseResult.GetValue(replicationFactorOption));
88 break;
89 case "consume":
90 Consume(urlValue,
91 parseResult.GetValue(topicOption),
92 parseResult.GetValue(groupIdOption));
93 break;
94 case "produce":
95 Produce(urlValue, parseResult.GetValue(topicOption));
96 break;
97 case "delete-topic":
98 await DeleteTopic(urlValue, parseResult.GetValue(topicOption));
99 break;
100 case "list-topics":
101 ListTopics(urlValue);
102 break;
103 }
104}
105
106async Task CreateTopic(string? url, string? topic, int partitions, short replicationFactor)
107{
108 Console.WriteLine("CreateTopic command invoked");
109 var adminConfig = new AdminClientConfig { BootstrapServers = url };
110 var admin = new AdminClientBuilder(adminConfig).Build();
111
112 var topicSpecification = new TopicSpecification
113 {
114 Name = topic,
115 NumPartitions = partitions,
116 ReplicationFactor = replicationFactor
117 };
118
119 await admin.CreateTopicsAsync([topicSpecification]).ContinueWith(task =>
120 {
121 if (task.IsCompletedSuccessfully)
122 {
123 Console.WriteLine($"Topic {args[1]} created successfully.");
124 }
125 else
126 {
127 Console.WriteLine($"Error creating topic: {task.Exception?.Message}");
128 }
129 });
130}
131
132void Consume(string? url, string? topic, string? groupId)
133{
134 Console.WriteLine("Consume command invoked");
135 ConsumerConfig _consumerConfig = new ConsumerConfig
136 {
137 BootstrapServers = url,
138 GroupId = groupId,
139 AutoOffsetReset = AutoOffsetReset.Earliest,
140 EnableAutoCommit = true, // this is the default but good to know about
141 EnableAutoOffsetStore = true // this is the default but good to know about
142 };
143
144 using var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build();
145 consumer.Subscribe(topic);
146
147 Console.WriteLine($"Looking for messages on topic: {topic}");
148 while (true)
149 {
150 var consumeResult = consumer.Consume();
151 Console.WriteLine($"{consumeResult.Message.Value}");
152 }
153}
154
155async Task DeleteTopic(string? url, string? topic)
156{
157 Console.WriteLine("DeleteTopic command invoked");
158 var adminConfig = new AdminClientConfig { BootstrapServers = url };
159
160 using var admin = new AdminClientBuilder(adminConfig).Build();
161 await admin.DeleteTopicsAsync([topic]).ContinueWith(task =>
162 {
163 if (task.IsCompletedSuccessfully)
164 {
165 Console.WriteLine($"Topic {topic} deleted successfully.");
166 }
167 else
168 {
169 Console.WriteLine($"Error deleting topic: {task.Exception?.Message}");
170 }
171 });
172
173}
174
175void ListTopics(string? url)
176{
177 Console.WriteLine("ListTopics command invoked");
178 var adminConfig = new AdminClientConfig { BootstrapServers = url };
179 using var admin = new AdminClientBuilder(adminConfig).Build();
180
181 var metadata = admin.GetMetadata(TimeSpan.FromSeconds(10));
182
183 metadata.Topics.ForEach(topic =>
184 {
185 Console.WriteLine($"{topic.Topic}");
186 });
187
188}
189
190void Produce(string? url, string? topic)
191{
192 Console.WriteLine("Produce command invoked");
193 ProducerConfig _producerConfig = new ProducerConfig
194 {
195 BootstrapServers = url
196 };
197 var producer = new ProducerBuilder<string, string>(_producerConfig).Build();
198 Console.WriteLine($"Enter text to send to topic: {topic}. Ctrl+C to exit.");
199 while(true)
200 {
201 var text = Console.ReadLine();
202 if (string.IsNullOrWhiteSpace(text))
203 {
204 continue;
205 }
206
207 var message = new Message<string, string>
208 {
209 Key = Guid.NewGuid().ToString(),
210 Value = $"{text}"
211 };
212 producer.Produce(topic, message);
213 }
214}Here is the output -
Required command was not provided.
Description:
A simple command line tool for interacting with Kafka.
Usage:
Kafka [command] [options]
Options:
-?, -h, --help Show help and usage information
--version Show version information
Commands:
consume Consume from a topic.
create-topic Create a topic
produce Produce to a topic
delete-topic Delete a topic
list-topics List topics in the clusterThis is the kind of thing you can easily extend to add more functionality.