CockroachDB CDC to Kafka

CockroachDB’s Change Data Capture (CDC) feature lets you stream changes from the database to multiple external locations, known as sinks. In this post, I’m going to show the most basic setup to get it going. In subsequent posts, I’ll show slightly more complex scenarios.

I have written plenty of blog posts on using CockroachDB and Kafka separately, so I won’t go into the details of setting those up here. Instead, I’ll focus on the CDC to Kafka part.

Kafka

Follow my instructions on running Kafka. It doesn’t matter what type of Kafka you use.

You don’t even need to create a Kafka topic; CockroachDB will create it for you if it doesn’t exist.

But you do need a consumer to read the messages from the topic. Here is a simple CLI consumer in C#.

CockroachDB

In my earlier blog posts on CockroachDB you will find examples setting up CockroachDB, creating a salesdb, and seeding a products table with data.

I want to send notifications of any change to the products table to a Kafka topic. To do this, I need to create a changefeed. It’s a simple SQL command.

CREATE CHANGEFEED FOR TABLE products INTO 'kafka://localhost:9092/?topic_prefix=source1_&topic_name=cdc_sink' with DIFF;

Let’s break down the command -

  • CREATE CHANGEFEED FOR TABLE products - this tells CockroachDB to monitor the products table for changes.
  • INTO 'kafka://localhost:9092/?topic_prefix=source1_ - this specifies the Kafka broker address and the topic prefix. The actual topic name will be a combination of this and the topic name.
  • &topic_name=cdc_sink - this specifies the topic name. The full topic name will be source1_cdc_sink.
  • with DIFF - this option tells CockroachDB to include both the before and after values in the changefeed messages.

Why the topic_prefix and topic_name? You can have multiple changefeeds writing to the same cluster, and you want a way to differentiate them.

Trying it out

Start the Kafka consumer, setting it to consumer from the source1_cdc_sink topic using the following command -

./Consumer.cs localhost:9092 source1_cdc_sink

Perform an insert, update, or delete on the products table. You should see the change in the Kafka consumer.

Let’s start with the insert. Run the following SQL command in CockroachDB -

INSERT INTO products (name, price, product_category, code) VALUES ('Black shoes', '100', 2, 12345678);

You should see a message in the Kafka consumer that looks something like this -

{"after": {"code": "12345678", "name": "Black shoes", "price": 100, "product_category": 2, "product_id": 24}, "before": null}

Let’s try an update now. Run the following -

UPDATE products SET price = 120 WHERE product_id = 24;

You will see a message like this in the Kafka consumer -

{"after": {"code": "12345678", "name": "Black shoes", "price": 120, "product_category": 2, "product_id": 24}, "before": {"code": "12345678", "name": "Black shoes", "price": 100, "product_category": 2, "product_id": 24}}

Finally, let’s try a delete. Run the following command -

DELETE FROM products WHERE product_id = 24;

You will see a message like this in the Kafka consumer -

{"after": null, "before": {"code": "12345678", "name": "Black shoes", "price": 120, "product_category": 2, "product_id": 24}}

That’s it, you can now stream changes from CockroachDB to Kafka using CDC with the simplest setup. In future posts, I’ll show how to do more a few complex things with CDC.

comments powered by Disqus

Related