YugabyteDB change data capture

real-time systems + distributed SQL

19th of March, 2022:
Interested in YugabyteDB CDC? YugabyteDB 2.13 comes with an all-new beta of a CDC SDK: YugabyteDB CDC SDK beta, a high level overview.

Today I’m going to look at change data capture (CDC) in YugabyteDB. Change data capture is a data integration method based on the identification, capture, and delivery of data changes. Enterprises can choose this method to identify what data changes in the database and to act on those changes in real-time. What are the use cases for CDC? For example:

  • A fulfillment enterprise can track new stock booking-ins and automatically fulfill backorders.
  • An enterprise can track account creation and modification to trigger automatic CRM updates.
  • An online shop can react to updates in exchange rates to automatically recalculate prices for different currencies.
  • A compliance department can automatically detect when certain user accounts or any other item in the database does not comply with a legal framework.
  • In the simplest form, organizations can synchronize different databases in real-time. However, YugabyteDB offers a feature called xCluster, an asynchronous cluster to cluster replication.

Possibilities are endless and can be applied to pretty much every possible business problem. The gist here is: reacting to changes in real-time.

YugabyteDB is a horizontally scalable 100% open-source distributed SQL database providing Cassandra (YCQL) and PostgreSQL (YSQL) compatible APIs. For PostgreSQL API, YugabyteDB uses the actual PostgreSQL 11.2 engine. With just PostgreSQL, a company could use the NOTIFY/SUBSCRIBE functionality for reactivity within the database, and a CDC solution like Debezium for outside-of-the-database integration.

YugabyteDB currently does not support NOTIFY/SUBSCRIBE but it comes with built-in support for CDC. CDC operates on a lower level than said PostgreSQL functionality, but it’s a very good foundation to build a general purpose NOTIFY/SUBSCRIBE alternative. The downside is, the data will always leave the cluster in order to be processed.

§commercial plug

If your company is looking for someone who can help you migrate your database to the cloud, or your company needs a solid partner who knows a thing or two about YugabyteD—reach out to Radek at radek.gruchalski@klarrio.com.

At Klarrio, we do all sorts of amazing stuff with all sorts of technologies. Bleeding edge, cloud, hybrid, but also boring and old school. We’re not chasing the new shiny. The best tool for the job.

Get in touch to find out more.

§analysis of a YugabyteDB CDC client

I started my investigation by looking at the YugabyteDB CDC Java client - yb-cdc. The client is part of the YugabyteDB source code repository and it can be found here[1]. The code footprint is pretty small so this makes it easier to identify required steps.

The Java client comes with six classes:

  • org.yb.cdc.CmdLineOptions
  • org.yb.cdc.LogClient
  • org.yb.cdc.LogConnector
  • org.yb.cdc.Main
  • org.yb.cdc.OutputClient
  • org.yb.cdc.Poller

We can safely ignore CmdLineOptions, Main, LogClient, and OutputClient, they’re just a boilerplate to have a functional Java program. OutputClient is an interface a real Java program needs to implement to receive changes from the Poller instance. LogClient is a default implementation of the OutputClient. That leaves us with two classes having some significance.

The LogConnector class contains the YugabyteDB client and performs the setup of a CDC session. The Poller performs the data capture.

§the connector

Let’s look at the LogConnector. There’s only a constructor a run() method. The constructor does the following:

  • Creates YugabyteDB async and sync clients (this is an implementation detail of how the Java client works).
  • Lists all tables:

In the YugabyteDB RPC API world, this call returns all tables for all databases in a YugabyteDB cluster. To find out more about the YugabyteDB RPC API, read this article[2].

The response is a protobuf ListTablesRequestPB message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
message ListTablesResponsePB {
  // The error, if an error occurred with this request.
  optional MasterErrorPB error = 1;

  message TableInfo {
    required bytes id = 1;
    required string name = 2;
    optional TableType table_type = 3;
    optional NamespaceIdentifierPB namespace = 4;
    optional RelationType relation_type = 5 [default = USER_TABLE_RELATION];
    optional SysTablesEntryPB.State state = 6;
  }

  repeated TableInfo tables = 2;
}

The id is a table ID, name is a table name, and the NamespaceIdentifierPB namespace contains the information about the database the table belongs to. The NamespaceIdentifierPB message looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
message NamespaceIdentifierPB {
  // The namespace ID to fetch info.
  optional bytes id = 1;

  // The namespace name to fetch info.
  optional string name = 2;

  // Database type.
  optional YQLDatabase database_type = 3 [ default = YQL_DATABASE_CQL ];
}

The id is the namespace ID, name is the database name and the database_type will hold the value of YQL_DATABASE_PGSQL for YSQL tables.

  • Once all tables are listed, the LogConnector filters retrieved table infos for the required table indicated with the table_name command-line argument; this argument has the following form <keyspace>.<table-name>; the client splits the argument by a . and uses the pair to find the table internal table ID.
  • Lists cluster tablet servers.

At this point in time, the LogConnector is connected and it verified that the table for which the CDC consumer is to be created, exists. Once run() is executed, the connector will:

  • Check if the CDC stream ID is given, if not, it will create a CDC using the CreateCDCStreamRequestPB message defined in master/master_replication.proto.
  • Find all tablets for a table using the GetTableLocationsRequestPB defined in master/master_client.proto.
  • For each tablet, create a Poller.

§the poller

The Poller class is a little bit more complex than the connector but what it does, is rather simple to understand. The work starts by calling the poll() method.

  • Using a randomly selected TServer address, the poller sends the GetChangesRequestPB message to the master replication service.

The message definition is in cdc/cdc_service.proto:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
message GetChangesRequestPB {
  optional bytes stream_id = 1;
  // Tablet to get the changes for.
  optional bytes tablet_id = 2;

  // Checkpoint to start reading from (exclusive).
  // Start reading from the first record after this checkpoint.
  optional CDCCheckpointPB from_checkpoint = 3;

  // Maximum records to read.
  optional uint32 max_records = 4;

  // Whether the caller knows the tablet address or needs to use us as a proxy.
  optional bool serve_as_proxy = 5 [default = true];
}

The request requires a stream_id, a tablet_id, and the checkpoint metadata. The checkpoint metadata definition is:

1
2
3
message CDCCheckpointPB {
  optional OpIdPB op_id = 1;
}

where op_id is defined in util/opid.proto:

1
2
3
4
5
message OpIdPB {
  // The term of an operation or the leader's sequence id.
  required int64 term = 1;
  required int64 index = 2;
}
  • Once the response arrives, the response data propagates to the Void doHandlePoll(GetChangesResponse getChangesResponse) method.
  • The doHandlePoll processes individual records from the response, updates the Poller internal term and index, and executes poll() once again.

Thus, the Poller continuously polls for new data and each tablet has its own Poller instance.

§building a CDC solution from scratch

To verify the knowledge, I am going to build a very simple CDC client. Instead of Java, I used Go with the Go RPC client[3].

You can find the CDC client here[4].

§the YugabyteDB cluster

To be able to test the solution, I need a YugabyteDB cluster with some data in it. I’m going to use the YugabyteDB multi-tenant PaaS demo[5], which I’ve created a while ago.

My environment looks like this:

1
2
3
4
$ docker-compose -v
Docker Compose version v2.2.3
$ docker -v
Docker version 20.10.12, build e91ed57

On Linux, Docker Compose needs to be installed separate from Docker. Instructions here[6].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
cd /tmp
git clone https://github.com/radekg/yugabyte-db-multi-tenant-paas-demo.git
cd yugabyte-db-multi-tenant-paas-demo
# build the Docker image used by the infrastructure
make docker-image-upstream
# point to an env file
export DC_ENV_FILE=.env
# start the cluster
docker compose --env-file "$(pwd)/${DC_ENV_FILE}" \
  -f compose-masters.yml \
  -f compose-tservers-shared.yml up

This will take a few seconds to settle. There will be:

  • three master servers,
  • three TServers,
  • an Envoy proxy in front of TServers.

Let’s verify that everything is working. In another terminal:

1
2
docker exec -ti yb-master-n1 /bin/bash -c \
    'yb-admin -master_addresses yb-master-n1:7100,yb-master-n2:7100,yb-master-n3:7100 list_all_masters'

should produce something similar to:

Master UUID                      	RPC Host/Port        	State    	Role
44dec826f77148929c12e4afd2ead389 	yb-master-n1:7100    	ALIVE    	FOLLOWER
b8db32a7f9f549c2bdbd824a07d173fb 	yb-master-n2:7100    	ALIVE    	LEADER
1f549268a41f4759ab821042f3f2490c 	yb-master-n3:7100    	ALIVE    	FOLLOWER

Your UUIDs will be different and your cluster may have a different leader. Let’s look at our TServers, again, TServer UUIDs will be different:

1
2
docker exec -ti yb-master-n1 /bin/bash -c \
    'yb-admin -master_addresses yb-master-n1:7100,yb-master-n2:7100,yb-master-n3:7100 list_all_tablet_servers'
Tablet Server UUID               RPC Host/Port Heartbeat delay Status   Reads/s  Writes/s Uptime   SST total size  SST uncomp size SST #files      Memory
ce5dbc4bdc574b33b52313f0ea8bf89d yb-tserver-shared-2:9100 0.43s           ALIVE    0.00     0.00     157      0 B             0 B             0               45.09 MB
49e47380d5cc4e53b8991d5f4034b596 yb-tserver-shared-1:9100 0.43s           ALIVE    0.00     0.00     157      0 B             0 B             0               45.09 MB
e313e4dd6cd1410189fa73562a47fba4 yb-tserver-shared-3:9100 0.43s           ALIVE    0.00     0.00     158      0 B             0 B             0               44.04 MB

§load the data into the cluster

To connect via YSQL connection, you will have to connect via Envoy. Docker Compose exposes Envoy to your host on port 35432. We can try this with:

1
2
psql "host=localhost port=35432 user=yugabyte dbname=yugabyte" \
  -c "select table_name from information_schema.tables limit 1"

The password is yugabyte.

The output is:

   table_name
----------------
 pg_default_acl
(1 row)

Let’s create a database and load the data:

1
2
3
4
5
6
7
psql "host=localhost port=35432 user=yugabyte dbname=yugabyte" \
  -c "create database cdctest"
# the database name changes here:
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -f sql-init-northwind-tenant1.sql
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -f sql-init-northwind-data-tenant1.sql

Don’t worry about the tenant1 suffix. It doesn’t matter. Let’s check what tables do we have in our database:

1
2
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -c "select table_name from information_schema.tables where table_schema='public'"
       table_name
------------------------
 categories
 customer_demographics
 customers
 customer_customer_demo
 employees
 suppliers
 products
 region
 shippers
 orders
 territories
 employee_territories
 order_details
 us_states
(14 rows)
1
2
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -c "select count(*) from customers"
 count
-------
    91
(1 row)

The database is initialized and ready to work with.

§run the minimal client

Time to build and run the minimal CDC client:

1
2
3
4
cd /tmp
git clone https://github.com/radekg/yugabyte-db-cdctest.git
cd yugabyte-db-cdctest/
docker build -t local/yugabyte-db-cdctest:latest .

and run it against the cluster:

1
2
3
4
5
docker run --net=yb-dbnet --rm \
  -ti local/yugabyte-db-cdctest:latest \
  --database=cdctest \
  --table=region \
  --masters=yb-master-n1:7100,yb-master-n2:7100,yb-master-n3:7100

After a short moment, the CDC data should start streaming to the terminal. This initial output is the data inserted into the database during the initial import.

Note: the network used by the cluster is called yb-dbnet and that’s the network we run the CDC container in.

§CDC payloads

Let’s ignore the initial data streamed to us and focus on subsequent changes. While the cluster and the CDC client are running, execute the following query:

1
2
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -c "insert into region values (5, 'test')"

The CDC program is going to output a JSON object similar to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
  "records": [
    {
      "time": 6740254233675632640,
      "operation": 1,
      "key": [
        {
          "key": "cmVnaW9uX2lk",
          "value": {
            "Value": {
              "Int16Value": 5
            }
          }
        }
      ],
      "changes": [
        {
          "key": "cmVnaW9uX2Rlc2NyaXB0aW9u",
          "value": {
            "Value": {
              "StringValue": "dGVzdA=="
            }
          }
        }
      ],
      "transaction_state": {
        "transaction_id": "5f3eR+OKSbWaOYDWE+FRIQ==",
        "tablets": [
          "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
        ]
      }
    },
    {
      "time": 6740254233712037888,
      "operation": 3,
      "transaction_state": {
        "transaction_id": "5f3eR+OKSbWaOYDWE+FRIQ==",
        "commit_hybrid_time": 6740254233699799040
      },
      "partition": {
        "partition_key_start": "VVU=",
        "partition_key_end": "qqo="
      }
    }
  ],
  "checkpoint": {
    "op_id": {
      "term": 2,
      "index": 61
    }
  }
}

There’s going to be one JSON object per each individual tablet. What can we infer from this data?

  • records.*.operation is the operation type, one of:
1
2
3
4
5
6
7
type CDCRecordPB_OperationType int32

const (
	CDCRecordPB_WRITE  CDCRecordPB_OperationType = 1
	CDCRecordPB_DELETE CDCRecordPB_OperationType = 2
	CDCRecordPB_APPLY  CDCRecordPB_OperationType = 3
)

In this case, the operation was write followed by an apply.

  • records.*.key contains the row key for which the write happened, that’s where we can find our 5, the cmVnaW9uX2lk is a base64 encoded region_id; the key is an array of objects, which suggests that there may be multiple entries when the table has a composite key,
  • records.*.changes is an array of objects with a key and a value,
    • the key of each change, is the colume name, if we base64 decode the cmVnaW9uX2Rlc2NyaXB0aW9u value, we get region_description, which is the column name where the textual value goes,
    • the value of each individual change contains a typed value for the column, this is probably where the largest chunk of processing would go in a real application (do discover the value type, it could probably be correlated by loading the table schema beforehand GetTableSchemaRequestPB message defined in master/master_ddl.proto), in our case, the base64 decoded dGVzdA== is test - exactly what we have inserted,
  • the complete changeset contains the partition information; combined with the tablet ID of each individual change, it would be possible to react only to the changes happening within a specific geographical location; find out more[7].

Okay, let’s see what happens when we issue an update:

1
2
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -c "update region set region_description='new value' where region_id=5"

Results in JSON similar to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
{
   "records":[
      {
         "time":6740254233675632640,
         "operation":1,
         "key":[
            {
               "key":"cmVnaW9uX2lk",
               "value":{
                  "Value":{
                     "Int16Value":5
                  }
               }
            }
         ],
         "changes":[
            {
               "key":"cmVnaW9uX2Rlc2NyaXB0aW9u",
               "value":{
                  "Value":{
                     "StringValue":"dGVzdA=="
                  }
               }
            }
         ],
         "transaction_state":{
            "transaction_id":"5f3eR+OKSbWaOYDWE+FRIQ==",
            "tablets":[
               "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
            ]
         }
      },
      {
         "time":6740254233712037888,
         "operation":3,
         "transaction_state":{
            "transaction_id":"5f3eR+OKSbWaOYDWE+FRIQ==",
            "commit_hybrid_time":6740254233699799040
         },
         "partition":{
            "partition_key_start":"VVU=",
            "partition_key_end":"qqo="
         }
      },
      {
         "time":6740257126013730816,
         "operation":1,
         "key":[
            {
               "key":"cmVnaW9uX2lk",
               "value":{
                  "Value":{
                     "Int16Value":5
                  }
               }
            }
         ],
         "changes":[
            {
               "key":"cmVnaW9uX2Rlc2NyaXB0aW9u",
               "value":{
                  "Value":{
                     "StringValue":"bmV3IHZhbHVl"
                  }
               }
            }
         ],
         "transaction_state":{
            "transaction_id":"B7Ewh2S6RmW7T9R0kx/JSw==",
            "tablets":[
               "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
            ]
         }
      },
      {
         "time":6740257126053367808,
         "operation":3,
         "transaction_state":{
            "transaction_id":"B7Ewh2S6RmW7T9R0kx/JSw==",
            "commit_hybrid_time":6740257126043394048
         },
         "partition":{
            "partition_key_start":"VVU=",
            "partition_key_end":"qqo="
         }
      }
   ],
   "checkpoint":{
      "op_id":{
         "term":2,
         "index":63
      }
   }
}

Again, one object per tablet. This time we have four records. A write, apply, write, and apply again. We can see that our new value is returned here, the bmV3IHZhbHVl is a base64 encoded new value string.

Also, it seems that for each column, we get a full log of subsequent changes! That’s pretty cool.

Let’s try a delete:

1
2
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" \
  -c "delete from region where region_id=5"

Which will output the following, for each tablet:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
{
   "records":[
      {
         "time":6740254233675632640,
         "operation":1,
         "key":[
            {
               "key":"cmVnaW9uX2lk",
               "value":{
                  "Value":{
                     "Int16Value":5
                  }
               }
            }
         ],
         "changes":[
            {
               "key":"cmVnaW9uX2Rlc2NyaXB0aW9u",
               "value":{
                  "Value":{
                     "StringValue":"dGVzdA=="
                  }
               }
            }
         ],
         "transaction_state":{
            "transaction_id":"5f3eR+OKSbWaOYDWE+FRIQ==",
            "tablets":[
               "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
            ]
         }
      },
      {
         "time":6740254233712037888,
         "operation":3,
         "transaction_state":{
            "transaction_id":"5f3eR+OKSbWaOYDWE+FRIQ==",
            "commit_hybrid_time":6740254233699799040
         },
         "partition":{
            "partition_key_start":"VVU=",
            "partition_key_end":"qqo="
         }
      },
      {
         "time":6740257126013730816,
         "operation":1,
         "key":[
            {
               "key":"cmVnaW9uX2lk",
               "value":{
                  "Value":{
                     "Int16Value":5
                  }
               }
            }
         ],
         "changes":[
            {
               "key":"cmVnaW9uX2Rlc2NyaXB0aW9u",
               "value":{
                  "Value":{
                     "StringValue":"bmV3IHZhbHVl"
                  }
               }
            }
         ],
         "transaction_state":{
            "transaction_id":"B7Ewh2S6RmW7T9R0kx/JSw==",
            "tablets":[
               "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
            ]
         }
      },
      {
         "time":6740257126053367808,
         "operation":3,
         "transaction_state":{
            "transaction_id":"B7Ewh2S6RmW7T9R0kx/JSw==",
            "commit_hybrid_time":6740257126043394048
         },
         "partition":{
            "partition_key_start":"VVU=",
            "partition_key_end":"qqo="
         }
      },
      {
         "time":6740259770935541760,
         "operation":2,
         "key":[
            {
               "key":"cmVnaW9uX2lk",
               "value":{
                  "Value":{
                     "Int16Value":5
                  }
               }
            }
         ],
         "transaction_state":{
            "transaction_id":"w/OzDpk5TQSZrSOR2lz6yg==",
            "tablets":[
               "MzczZDRmM2E5N2JhNDk5Mzk1ZWI5NzRlNWQyOGM3NGU="
            ]
         }
      },
      {
         "time":6740259771120664576,
         "operation":3,
         "transaction_state":{
            "transaction_id":"w/OzDpk5TQSZrSOR2lz6yg==",
            "commit_hybrid_time":6740259771113287680
         },
         "partition":{
            "partition_key_start":"VVU=",
            "partition_key_end":"qqo="
         }
      }
   ],
   "checkpoint":{
      "op_id":{
         "term":2,
         "index":65
      }
   }
}

The delete resulted in six rows. So clearly the history of changes for each key is persisted. The two new operation types are 2 and 3: delete and apply respectively.

What’s Looking at the protobuf object reveals more interesting detail:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
message CDCRecordPB {
  enum OperationType {
    WRITE = 1;
    DELETE = 2;
    APPLY = 3;
    SPLIT_OP = 4;
  }
  optional uint64 time = 1;
  optional OperationType operation = 2;

  // Primary key of the record that changed
  repeated KeyValuePairPB key = 3;

  // Key-value pairs (column_name : value) of changes / before record / after record
  repeated KeyValuePairPB changes = 4;
  repeated KeyValuePairPB before = 5;  // NOT CURRENTLY USED
  repeated KeyValuePairPB after = 6;   // NOT CURRENTLY USED

  optional tablet.TransactionStatePB transaction_state = 7;

  // If transaction_state.status is APPLYING, then partitions will contain the partition for the
  // tablet being polled.
  optional PartitionPB partition = 9;

  // If operation type is a SPLIT_OP, then include the split request.
  optional tablet.SplitTabletRequestPB split_tablet_request = 10;
}

It seems like we may be getting more insight into what we had before the change and what we have after the change - before, after, and changes fields. However, as the comments say, it’s not supported yet.

§it’s powerful

Very straightforward to approach and a great foundation for integrating distributed SQL with real-time pipelines. CDC data can be easily ingested and transformed with Kafka or another data streaming solution to power all sorts of business applications inside of an enterprise.

Having an out-of-the-box possibility to fuse distributed SQL and real-time streaming data is a superpower with unlimited applications in today’s data-driven world.

That’s it for today, dipping the toes in YugabyteDB CDC.