Shortly after I have published the previous article on YugabyteDB CDC, the amazing Yugabyte team released the 2.13 version of the database with a beta implementation of the new change data capture SDK. Before diving into the new SDK, let’s quickly recap the first implementation.
This is a living article. Updates will be listed here.
§first CDC version: xcluster
With the first implementation, we can create and delete JSON based streams, where each stream is created for exactly one table. Each individual stream delivers messages for three types of database events: inserts (
APPLY), updates (
APPLY) and deletes (
For majority of the use cases, this might be sufficient but certain operations aren’t possible:
- No DDL related events.
- No transaction indication.
With the xcluster implementation, the CDC service delivers messages on the tablet level. Each individual insert, update, or delete results in multiple messages: one for a tablet leader and one for each follower.
This results in plenty of duplication on the consumer side and an operational complexity of tracking the change source of the truth. This first CDC implementation hands over the complexity of the architecture to the consumer. Instead of working just on the row (entity) level, the consumer must also understand the nature of how the cluster is deployed and replicated.
The new CDC SDK addresses a large number of those problems.
Let’s start the CDC SDK investigation by looking at the CDC 2.13 service definition:
This doesn’t look much different from the previous version. There are a couple of new operations and some request and response types contain new SDK-specific fields.
An example, there is a single
CreateCDCStream method taking the
CreateCDCStreamRequestPB argument known from the previous incarnation. However, the request argument contains some new capabilities.
The most relevant pieces of the connector code:
YugabyteDBConnector.validateTServerConnection(...): handles stream creation and setup.
YugabyteDBConnector.fetchTabletList(...): fetches list of tablet IDs to consume changes for.
YugabyteDBStreamingChangeEventSource.getChanges2(...): handles CDC SDK rows returned from
§creating a stream
To create a CDC SDK stream, called a database stream in 2.13, we have to call the
CreateCDCStream operation with an exact combination of
CreateCDCStreamRequestPB parameters. Hints are in two places:
- YugabyteDB Debezium connector: calls an underlying Java client with required parameters.
- Currently undocumented yb-admin create_change_data_stream operation; this one calls into the
To create a database stream, the request needs to contain:
- A database name (namespace).
- Record type
CDCRecordType.CHANGE: defines what each individual record contains:
CHANGE: changed values only; it appears that this is the required value,
AFTER: the entire row after the change,
ALL: the before and after value of the row.
- Record format must be
CDCRecordFormat.PROTO: data is in the protobuf format.
- Source type must be
- other values:
XCLUSTER: simple CDC form.
2DC: two datacenter deployments in YugabyteDB leverage change data capture at the core.
- other values:
- Checkpoint type must be
IMPLICIT, CDC SDK automatically manages checkpoints.
If the stream has been successfully created, the ID of the stream will be in
§consuming the stream
As with the original CDC, to consume a stream, we have to call the
GetChanges operation. The
GetChangesRequestPB type looks like this in 2.13:
Most interesting fields:
db_stream_id: takes the database stream ID returned in
CreateCDCStreamResponsePB.db_stream_id; we can see that the original CDC is still supported with the
from_cdc_sdk_checkpoint: CDC SDK streams require this new checkpoint format.
table_id field does not appear to be used.
To consume changes from the CDC SDK database stream, we have to execute a
GetChanges request with the following configurattion:
GetChangesRequestPB.db_stream_id: contains the database stream ID.
GetChangesRequestPB.from_cdc_sdk_checkpoint: contains the checkpoint, an empty checkpoint is an instance of a checkpoint with all values set to defaults; this will make a consumer receive all changes from the beginning of the stream.
GetChangesRequestPB.tablet_id: tablet ID is required.
§discovering tablet IDs to consume changes from
The implication of the
GetChangesRequestPB.tablet_id being required, is that we have to find all tablet IDs for each table of a database.
It boils down to listing all tables using the master service
ListTables request and filtering for
PGSQL relations, where the relation type isn’t
For each discovered table, we list its tablets using the master service
GetTableLocations RPC call. As a result, we have a list of all tablet IDs we want to consume from.
There will be one consumer per tablet.
We can, of course, consume changes for specific tables (or tablets) only. To do so, we can skip tablets we don’t want to consume from.
The old checkpoint type contains
index fields delivered in an
OpIdPB message. The
term is most likely referring to the Raft voter term of the tablet,
index is most likely a Raft log entry index of the tablet.
The CDC SDK introduces a new checkpoint type. The new checkpoint type doesn’t use the
OpId anymore. It defines
index fields directly, next to additional properties. The protobuf type:
write_id is the order of changes within a multi-statement transaction and does not make sense when
Checkpoints can be compared by
write_id. These properties should be compared in order:
termimplies higher checkpoint, regardless of the
termis equal, higher
indexindicates a newer event.
write_idmakes sense only within a transaction, higher
write_idimplies a newer event within a single transaction.
The CDC service manages checkpoints by itself when the CDC checkpoint type is
IMPLICIT. Only very specific use cases would require the client to manage checkpoints manually, in the
EXPLICIT checkpoint type mode.
For example, a client buffering records before processing them might require fine-grained checkpoint control.
It’s not clear to me how to put a CDC stream in a mode where CDC checkpoint can be set. Every time I attempted calling
SetCDCCheckpoint for an
EXPLICIT setting, YugabyteDB TServers crash.
Weird, considering that the Java client tests appear to be testing this functionality.
§checkpoint management vs consumer offset management
So I kind of made the consumer offset management term up here. This is a term from Apache Kafka world and there’s no reference to it anywhere in YugabyteDB documentation. A single
stream ID+tablet ID combination can have multiple CDC consumers sourcing events from, and that’s perfectly fine.
- Consumer A performs audits.
- Consumer B observes selected tables and rewrites specific updates to an internal ERP system.
- Consumer C observes individual colums for secret values and raises alerts when those values appear.
Each of those consumers is a standalone program with its own lifecycle. They are started and stopped independently. Each of them needs to have their own last processed checkpoint (a consumer offset) stored somwehere. It’s clear that the
SetCDCCheckpoint operation isn’t designed to perform this role because it does not have a notion of a consumer ID.
Instead, a consumer should process changes received from
GetChanges. When finished, or when it is ready to commit a batch of processed changes, it should store the
index for a
stream ID+tablet ID combination. Next to those values, it needs to store a consumer name. The name should be unique to the consumer application but shared between different starts of the same consumer instance.
I’m not sure if my assumptions are correct, but I would definitely try this approach if I was to build anything on the YugabyteDB CDC SDK. I would probably store checkpoints by myself in a dedicated table. The table could have the following schema:
The meat of every CDC consumer is the change processing. A response from
GetChanges operation for a database stream returns an array of
CDCSDKProtoRecordPB. Each of those items has the following structure:
where a row message is:
Op identifies the type of the operation. This is where the CDC SDK shows its biggest difference from the original CDC solution. The CDC SDK provides additional context for DDL events, table truncation events, and transaction state. The
YugabyteDBStreamingChangeEventSource.getChanges2(...) Debezium connector method mentioned earlier is a reference for event processing.
There are three groups of messages:
- Transactional messages:
- DDL messages:
- DML messages:
I was not able to trigger anything resulting in the
Op.READ message type.
Here’s an example of a DDL message emitted after creating a table:
Let’s have a look at the data modification events in the following order:
truncate. Find out how the cluster is started and configured.
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "insert into region values (1, 'test')":
psql client wraps statements in a transaction. This gives us the opportunity to see what transactional messages look like.
We have three messages:
Op.COMMIT should be used as a boundary of consumer offset acceptance. Consumer offset should be advanced only after an
Op.COMMIT message is processed. These could be useful in scenarios where a new value should only be visible after a batch of changes enclosed in the transaction.
new_tuple field contains inserted values.
old_tuple contains no values, which is to be expected.
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "update region set region_description='updated' where region_id=1":
Again, a statement has been wrapped in a transaction so we have received three messages.
new_tuple field contains values after the update.
old_tuple does not contain any data, this is most likely the effect of
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "delete from region where region_id=1":
Yet again, the statement is wrapped in a transacton. We can see that the
new_tuple is empty, which makes sense.
old_tuple contains the value of the primary key of a deleted item.
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "truncate table region cascade":
This statement does not seem to be handled properly on the database level yet. Issuing a
truncate statement breaks the stream (the consumer is not able to get changes anymore), and TServers log errors like these:
yb-tserver-shared-3 | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB) yb-tserver-shared-3 | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB) yb-tserver-shared-3 | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB) yb-tserver-shared-3 | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
§no primary key, no CDC?
All those tests were done on a table with a primary key. Naturally, I wanted to check if it is possible to retrieve changes from a table without a primary key:
Trying to get changes for tablets of that table returns a CDC RPC error similar to:
reason= | CDC rpc error: code: 10 (INVALID_REQUEST), status: 4 (INVALID_ARGUMENT) | message: Tablet ID d17e6b58a79f418dbae5e95978bf2f59 is not part of stream ID 2861d4b6f25d49f7982bcfb2592b54f0 | source: ../../ent/src/yb/cdc/cdc_service.cc@457
confirms that. The original, simple CDC has the same restriction.
§changes on the entity level
Another major difference from the first CDC implementation is that change events are per row (entity). There’s no longer an event per tablet. The internal CDC service implementation locates a tablet leader and redelivers tablet leader events only.
Order of events reflects order of operations on a row. An arrival of event implies that all previous operations on that row have been committed.
§consuming from latest operation
Sometimes we might want to skip consuming historical changes and consume from the latest entry only. There are two possibilities, depending on how the database stream has been created:
- CDC checkpoint type
IMPLICIT: this is the default behaviour when reusing stream IDs.
- CDC checkpoint type
GetCheckpointto discover latest saved checkpoint, a value of
term: 0, index: 0might indicate that no
SetCDCCheckpointhas been called, consider using
GetLatestEntryOpIdin that case.
§reference go client
It’s clear that the CDC SDK is a higher level solution than the first CDC implementation.
- Does what it says on the tin.
- Has rough edges and requires some clarifications:
- Fully functional support for
truncatewill be great to have.
- No support for tables without primary key. This would be great to have, it would enable some very interesting use cases for continous backup/restore.
- What’s the checkpoint’s
snapshot_time? What can it be used for?
- Why crashes when executing
- Fully functional support for