19th of March, 2022:
Interested in YugabyteDB CDC? YugabyteDB 2.13 comes with an all-new beta of a CDC SDK: YugabyteDB CDC SDK beta, a high level overview.
Today I’m going to look at change data capture (CDC) in YugabyteDB. Change data capture is a data integration method based on the identification, capture, and delivery of data changes. Enterprises can choose this method to identify what data changes in the database and to act on those changes in real-time. What are the use cases for CDC? For example:
- A fulfillment enterprise can track new stock booking-ins and automatically fulfill backorders.
- An enterprise can track account creation and modification to trigger automatic CRM updates.
- An online shop can react to updates in exchange rates to automatically recalculate prices for different currencies.
- A compliance department can automatically detect when certain user accounts or any other item in the database does not comply with a legal framework.
- In the simplest form, organizations can synchronize different databases in real-time. However, YugabyteDB offers a feature called xCluster, an asynchronous cluster to cluster replication.
Possibilities are endless and can be applied to pretty much every possible business problem. The gist here is: reacting to changes in real-time.
YugabyteDB is a horizontally scalable 100% open-source distributed SQL database providing Cassandra (YCQL) and PostgreSQL (YSQL) compatible APIs. For PostgreSQL API, YugabyteDB uses the actual PostgreSQL 11.2 engine. With just PostgreSQL, a company could use the
NOTIFY/SUBSCRIBE functionality for reactivity within the database, and a CDC solution like Debezium for outside-of-the-database integration.
YugabyteDB currently does not support
NOTIFY/SUBSCRIBE but it comes with built-in support for CDC. CDC operates on a lower level than said PostgreSQL functionality, but it’s a very good foundation to build a general purpose
NOTIFY/SUBSCRIBE alternative. The downside is, the data will always leave the cluster in order to be processed.
If your company is looking for someone who can help you migrate your database to the cloud, or your company needs a solid partner who knows a thing or two about YugabyteD—reach out to Radek at email@example.com.
At Klarrio, we do all sorts of amazing stuff with all sorts of technologies. Bleeding edge, cloud, hybrid, but also boring and old school. We’re not chasing the new shiny. The best tool for the job.
Get in touch to find out more.
§analysis of a YugabyteDB CDC client
I started my investigation by looking at the YugabyteDB CDC Java client - yb-cdc. The client is part of the YugabyteDB source code repository and it can be found here. The code footprint is pretty small so this makes it easier to identify required steps.
The Java client comes with six classes:
We can safely ignore
OutputClient, they’re just a boilerplate to have a functional Java program.
OutputClient is an interface a real Java program needs to implement to receive changes from the
LogClient is a default implementation of the
OutputClient. That leaves us with two classes having some significance.
LogConnector class contains the YugabyteDB client and performs the setup of a CDC session. The
Poller performs the data capture.
Let’s look at the
LogConnector. There’s only a constructor a
run() method. The constructor does the following:
- Creates YugabyteDB async and sync clients (this is an implementation detail of how the Java client works).
- Lists all tables:
In the YugabyteDB RPC API world, this call returns all tables for all databases in a YugabyteDB cluster. To find out more about the YugabyteDB RPC API, read this article.
The response is a protobuf
id is a table ID,
name is a table name, and the
NamespaceIdentifierPB namespace contains the information about the database the table belongs to. The
NamespaceIdentifierPB message looks like this:
id is the namespace ID,
name is the database name and the
database_type will hold the value of
YQL_DATABASE_PGSQL for YSQL tables.
- Once all tables are listed, the
LogConnectorfilters retrieved table infos for the required table indicated with the
table_namecommand-line argument; this argument has the following form
<keyspace>.<table-name>; the client splits the argument by a
.and uses the pair to find the table internal table ID.
- Lists cluster tablet servers.
At this point in time, the
LogConnector is connected and it verified that the table for which the CDC consumer is to be created, exists. Once
run() is executed, the connector will:
- Check if the CDC stream ID is given, if not, it will create a CDC using the
CreateCDCStreamRequestPBmessage defined in
- Find all tablets for a table using the
- For each tablet, create a
Poller class is a little bit more complex than the connector but what it does, is rather simple to understand. The work starts by calling the
- Using a randomly selected TServer address, the poller sends the
GetChangesRequestPBmessage to the master replication service.
The message definition is in
The request requires a
tablet_id, and the checkpoint metadata. The checkpoint metadata definition is:
op_id is defined in
- Once the response arrives, the response data propagates to the
Void doHandlePoll(GetChangesResponse getChangesResponse)method.
doHandlePollprocesses individual records from the response, updates the
index, and executes
Poller continuously polls for new data and each tablet has its own
§building a CDC solution from scratch
To verify the knowledge, I am going to build a very simple CDC client. Instead of Java, I used Go with the Go RPC client.
You can find the CDC client here.
§the YugabyteDB cluster
To be able to test the solution, I need a YugabyteDB cluster with some data in it. I’m going to use the YugabyteDB multi-tenant PaaS demo, which I’ve created a while ago.
My environment looks like this:
On Linux, Docker Compose needs to be installed separate from Docker. Instructions here.
This will take a few seconds to settle. There will be:
- three master servers,
- three TServers,
- an Envoy proxy in front of TServers.
Let’s verify that everything is working. In another terminal:
should produce something similar to:
Master UUID RPC Host/Port State Role 44dec826f77148929c12e4afd2ead389 yb-master-n1:7100 ALIVE FOLLOWER b8db32a7f9f549c2bdbd824a07d173fb yb-master-n2:7100 ALIVE LEADER 1f549268a41f4759ab821042f3f2490c yb-master-n3:7100 ALIVE FOLLOWER
Your UUIDs will be different and your cluster may have a different leader. Let’s look at our TServers, again, TServer UUIDs will be different:
Tablet Server UUID RPC Host/Port Heartbeat delay Status Reads/s Writes/s Uptime SST total size SST uncomp size SST #files Memory ce5dbc4bdc574b33b52313f0ea8bf89d yb-tserver-shared-2:9100 0.43s ALIVE 0.00 0.00 157 0 B 0 B 0 45.09 MB 49e47380d5cc4e53b8991d5f4034b596 yb-tserver-shared-1:9100 0.43s ALIVE 0.00 0.00 157 0 B 0 B 0 45.09 MB e313e4dd6cd1410189fa73562a47fba4 yb-tserver-shared-3:9100 0.43s ALIVE 0.00 0.00 158 0 B 0 B 0 44.04 MB
§load the data into the cluster
To connect via YSQL connection, you will have to connect via Envoy. Docker Compose exposes Envoy to your host on port
35432. We can try this with:
The password is
The output is:
table_name ---------------- pg_default_acl (1 row)
Let’s create a database and load the data:
Don’t worry about the tenant1 suffix. It doesn’t matter. Let’s check what tables do we have in our database:
table_name ------------------------ categories customer_demographics customers customer_customer_demo employees suppliers products region shippers orders territories employee_territories order_details us_states (14 rows)
count ------- 91 (1 row)
The database is initialized and ready to work with.
§run the minimal client
Time to build and run the minimal CDC client:
and run it against the cluster:
After a short moment, the CDC data should start streaming to the terminal. This initial output is the data inserted into the database during the initial import.
Note: the network used by the cluster is called
yb-dbnet and that’s the network we run the CDC container in.
Let’s ignore the initial data streamed to us and focus on subsequent changes. While the cluster and the CDC client are running, execute the following query:
The CDC program is going to output a JSON object similar to:
There’s going to be one JSON object per each individual tablet. What can we infer from this data?
records.*.operationis the operation type, one of:
In this case, the operation was
write followed by an
records.*.keycontains the row key for which the write happened, that’s where we can find our
cmVnaW9uX2lkis a base64 encoded
keyis an array of objects, which suggests that there may be multiple entries when the table has a composite key,
records.*.changesis an array of objects with a
keyof each change, is the colume name, if we base64 decode the
cmVnaW9uX2Rlc2NyaXB0aW9uvalue, we get
region_description, which is the column name where the textual value goes,
valueof each individual change contains a typed value for the column, this is probably where the largest chunk of processing would go in a real application (do discover the value type, it could probably be correlated by loading the table schema beforehand
GetTableSchemaRequestPBmessage defined in
master/master_ddl.proto), in our case, the base64 decoded
test- exactly what we have inserted,
- the complete changeset contains the partition information; combined with the tablet ID of each individual change, it would be possible to react only to the changes happening within a specific geographical location; find out more.
Okay, let’s see what happens when we issue an update:
Results in JSON similar to:
Again, one object per tablet. This time we have four records. A
apply again. We can see that our new value is returned here, the
bmV3IHZhbHVl is a base64 encoded
new value string.
Also, it seems that for each column, we get a full log of subsequent changes! That’s pretty cool.
Let’s try a delete:
Which will output the following, for each tablet:
The delete resulted in six rows. So clearly the history of changes for each
key is persisted. The two new operation types are
What’s Looking at the protobuf object reveals more interesting detail:
It seems like we may be getting more insight into what we had before the change and what we have after the change -
changes fields. However, as the comments say, it’s not supported yet.
Very straightforward to approach and a great foundation for integrating distributed SQL with real-time pipelines. CDC data can be easily ingested and transformed with Kafka or another data streaming solution to power all sorts of business applications inside of an enterprise.
Having an out-of-the-box possibility to fuse distributed SQL and real-time streaming data is a superpower with unlimited applications in today’s data-driven world.
That’s it for today, dipping the toes in YugabyteDB CDC.