19th of March, 2022:
Interested in YugabyteDB CDC? YugabyteDB 2.13 comes with an all-new beta of a CDC SDK: YugabyteDB CDC SDK beta, a high level overview.
Today I’m going to look at change data capture (CDC) in YugabyteDB. Change data capture is a data integration method based on the identification, capture, and delivery of data changes. Enterprises can choose this method to identify what data changes in the database and to act on those changes in real-time. What are the use cases for CDC? For example:
- A fulfillment enterprise can track new stock booking-ins and automatically fulfill backorders.
- An enterprise can track account creation and modification to trigger automatic CRM updates.
- An online shop can react to updates in exchange rates to automatically recalculate prices for different currencies.
- A compliance department can automatically detect when certain user accounts or any other item in the database does not comply with a legal framework.
- In the simplest form, organizations can synchronize different databases in real-time. However, YugabyteDB offers a feature called xCluster, an asynchronous cluster to cluster replication.
Possibilities are endless and can be applied to pretty much every possible business problem. The gist here is: reacting to changes in real-time.
YugabyteDB is a horizontally scalable 100% open-source distributed SQL database providing Cassandra (YCQL) and PostgreSQL (YSQL) compatible APIs. For PostgreSQL API, YugabyteDB uses the actual PostgreSQL 11.2 engine. With just PostgreSQL, a company could use the NOTIFY/SUBSCRIBE
functionality for reactivity within the database, and a CDC solution like Debezium for outside-of-the-database integration.
YugabyteDB currently does not support NOTIFY/SUBSCRIBE
but it comes with built-in support for CDC. CDC operates on a lower level than said PostgreSQL functionality, but it’s a very good foundation to build a general purpose NOTIFY/SUBSCRIBE
alternative. The downside is, the data will always leave the cluster in order to be processed.
§commercial plug
If your company is looking for someone who can help you migrate your database to the cloud, or your company needs a solid partner who knows a thing or two about YugabyteD—reach out to Radek at radek.gruchalski@klarrio.com.
At Klarrio, we do all sorts of amazing stuff with all sorts of technologies. Bleeding edge, cloud, hybrid, but also boring and old school. We’re not chasing the new shiny. The best tool for the job.
Get in touch to find out more.
§analysis of a YugabyteDB CDC client
I started my investigation by looking at the YugabyteDB CDC Java client - yb-cdc. The client is part of the YugabyteDB source code repository and it can be found here[1]. The code footprint is pretty small so this makes it easier to identify required steps.
The Java client comes with six classes:
org.yb.cdc.CmdLineOptions
org.yb.cdc.LogClient
org.yb.cdc.LogConnector
org.yb.cdc.Main
org.yb.cdc.OutputClient
org.yb.cdc.Poller
We can safely ignore CmdLineOptions
, Main
, LogClient
, and OutputClient
, they’re just a boilerplate to have a functional Java program. OutputClient
is an interface a real Java program needs to implement to receive changes from the Poller
instance. LogClient
is a default implementation of the OutputClient
. That leaves us with two classes having some significance.
The LogConnector
class contains the YugabyteDB client and performs the setup of a CDC session. The Poller
performs the data capture.
§the connector
Let’s look at the LogConnector
. There’s only a constructor a run()
method. The constructor does the following:
- Creates YugabyteDB async and sync clients (this is an implementation detail of how the Java client works).
- Lists all tables:
In the YugabyteDB RPC API world, this call returns all tables for all databases in a YugabyteDB cluster. To find out more about the YugabyteDB RPC API, read this article[2].
The response is a protobuf ListTablesRequestPB
message:
|
|
The id
is a table ID, name
is a table name, and the NamespaceIdentifierPB namespace
contains the information about the database the table belongs to. The NamespaceIdentifierPB
message looks like this:
|
|
The id
is the namespace ID, name
is the database name and the database_type
will hold the value of YQL_DATABASE_PGSQL
for YSQL tables.
- Once all tables are listed, the
LogConnector
filters retrieved table infos for the required table indicated with thetable_name
command-line argument; this argument has the following form<keyspace>.<table-name>
; the client splits the argument by a.
and uses the pair to find the table internal table ID. - Lists cluster tablet servers.
At this point in time, the LogConnector
is connected and it verified that the table for which the CDC consumer is to be created, exists. Once run()
is executed, the connector will:
- Check if the CDC stream ID is given, if not, it will create a CDC using the
CreateCDCStreamRequestPB
message defined inmaster/master_replication.proto
. - Find all tablets for a table using the
GetTableLocationsRequestPB
defined inmaster/master_client.proto
. - For each tablet, create a
Poller
.
§the poller
The Poller
class is a little bit more complex than the connector but what it does, is rather simple to understand. The work starts by calling the poll()
method.
- Using a randomly selected TServer address, the poller sends the
GetChangesRequestPB
message to the master replication service.
The message definition is in cdc/cdc_service.proto
:
|
|
The request requires a stream_id
, a tablet_id
, and the checkpoint metadata. The checkpoint metadata definition is:
|
|
where op_id
is defined in util/opid.proto
:
|
|
- Once the response arrives, the response data propagates to the
Void doHandlePoll(GetChangesResponse getChangesResponse)
method. - The
doHandlePoll
processes individual records from the response, updates thePoller
internalterm
andindex
, and executespoll()
once again.
Thus, the Poller
continuously polls for new data and each tablet has its own Poller
instance.
§building a CDC solution from scratch
To verify the knowledge, I am going to build a very simple CDC client. Instead of Java, I used Go with the Go RPC client[3].
You can find the CDC client here[4].
§the YugabyteDB cluster
To be able to test the solution, I need a YugabyteDB cluster with some data in it. I’m going to use the YugabyteDB multi-tenant PaaS demo[5], which I’ve created a while ago.
My environment looks like this:
|
|
On Linux, Docker Compose needs to be installed separate from Docker. Instructions here[6].
|
|
This will take a few seconds to settle. There will be:
- three master servers,
- three TServers,
- an Envoy proxy in front of TServers.
Let’s verify that everything is working. In another terminal:
|
|
should produce something similar to:
Master UUID RPC Host/Port State Role
44dec826f77148929c12e4afd2ead389 yb-master-n1:7100 ALIVE FOLLOWER
b8db32a7f9f549c2bdbd824a07d173fb yb-master-n2:7100 ALIVE LEADER
1f549268a41f4759ab821042f3f2490c yb-master-n3:7100 ALIVE FOLLOWER
Your UUIDs will be different and your cluster may have a different leader. Let’s look at our TServers, again, TServer UUIDs will be different:
|
|
Tablet Server UUID RPC Host/Port Heartbeat delay Status Reads/s Writes/s Uptime SST total size SST uncomp size SST #files Memory
ce5dbc4bdc574b33b52313f0ea8bf89d yb-tserver-shared-2:9100 0.43s ALIVE 0.00 0.00 157 0 B 0 B 0 45.09 MB
49e47380d5cc4e53b8991d5f4034b596 yb-tserver-shared-1:9100 0.43s ALIVE 0.00 0.00 157 0 B 0 B 0 45.09 MB
e313e4dd6cd1410189fa73562a47fba4 yb-tserver-shared-3:9100 0.43s ALIVE 0.00 0.00 158 0 B 0 B 0 44.04 MB
§load the data into the cluster
To connect via YSQL connection, you will have to connect via Envoy. Docker Compose exposes Envoy to your host on port 35432
. We can try this with:
|
|
The password is yugabyte
.
The output is:
table_name
----------------
pg_default_acl
(1 row)
Let’s create a database and load the data:
|
|
Don’t worry about the tenant1 suffix. It doesn’t matter. Let’s check what tables do we have in our database:
|
|
table_name
------------------------
categories
customer_demographics
customers
customer_customer_demo
employees
suppliers
products
region
shippers
orders
territories
employee_territories
order_details
us_states
(14 rows)
|
|
count
-------
91
(1 row)
The database is initialized and ready to work with.
§run the minimal client
Time to build and run the minimal CDC client:
|
|
and run it against the cluster:
|
|
After a short moment, the CDC data should start streaming to the terminal. This initial output is the data inserted into the database during the initial import.
Note: the network used by the cluster is called yb-dbnet
and that’s the network we run the CDC container in.
§CDC payloads
Let’s ignore the initial data streamed to us and focus on subsequent changes. While the cluster and the CDC client are running, execute the following query:
|
|
The CDC program is going to output a JSON object similar to:
|
|
There’s going to be one JSON object per each individual tablet. What can we infer from this data?
records.*.operation
is the operation type, one of:
|
|
In this case, the operation was write
followed by an apply
.
records.*.key
contains the row key for which the write happened, that’s where we can find our5
, thecmVnaW9uX2lk
is a base64 encodedregion_id
; thekey
is an array of objects, which suggests that there may be multiple entries when the table has a composite key,records.*.changes
is an array of objects with akey
and avalue
,- the
key
of each change, is the colume name, if we base64 decode thecmVnaW9uX2Rlc2NyaXB0aW9u
value, we getregion_description
, which is the column name where the textual value goes, - the
value
of each individual change contains a typed value for the column, this is probably where the largest chunk of processing would go in a real application (do discover the value type, it could probably be correlated by loading the table schema beforehandGetTableSchemaRequestPB
message defined inmaster/master_ddl.proto
), in our case, the base64 decodeddGVzdA==
istest
- exactly what we have inserted,
- the
- the complete changeset contains the partition information; combined with the tablet ID of each individual change, it would be possible to react only to the changes happening within a specific geographical location; find out more[7].
Okay, let’s see what happens when we issue an update:
|
|
Results in JSON similar to:
|
|
Again, one object per tablet. This time we have four records. A write
, apply
, write
, and apply
again. We can see that our new value is returned here, the bmV3IHZhbHVl
is a base64 encoded new value
string.
Also, it seems that for each column, we get a full log of subsequent changes! That’s pretty cool.
Let’s try a delete:
|
|
Which will output the following, for each tablet:
|
|
The delete resulted in six rows. So clearly the history of changes for each key
is persisted. The two new operation types are 2
and 3
: delete
and apply
respectively.
What’s Looking at the protobuf object reveals more interesting detail:
|
|
It seems like we may be getting more insight into what we had before the change and what we have after the change - before
, after
, and changes
fields. However, as the comments say, it’s not supported yet.
§it’s powerful
Very straightforward to approach and a great foundation for integrating distributed SQL with real-time pipelines. CDC data can be easily ingested and transformed with Kafka or another data streaming solution to power all sorts of business applications inside of an enterprise.
Having an out-of-the-box possibility to fuse distributed SQL and real-time streaming data is a superpower with unlimited applications in today’s data-driven world.
That’s it for today, dipping the toes in YugabyteDB CDC.