Shortly after I have published the previous article on YugabyteDB CDC[1], the amazing Yugabyte team released the 2.13 version of the database with a beta implementation of the new change data capture SDK. Before diving into the new SDK, let’s quickly recap the first implementation.
This is a living article. Updates will be listed here.
§first CDC version: xcluster
With the first implementation, we can create and delete JSON based streams, where each stream is created for exactly one table. Each individual stream delivers messages for three types of database events: inserts (WRITE
and APPLY
), updates (WRITE
and APPLY
) and deletes (DELETE
and APPLY
).
|
|
For majority of the use cases, this might be sufficient but certain operations aren’t possible:
- No
truncate
support. - No DDL related events.
- No transaction indication.
With the xcluster implementation, the CDC service delivers messages on the tablet level. Each individual insert, update, or delete results in multiple messages: one for a tablet leader and one for each follower.
This results in plenty of duplication on the consumer side and an operational complexity of tracking the change source of the truth. This first CDC implementation hands over the complexity of the architecture to the consumer. Instead of working just on the row (entity) level, the consumer must also understand the nature of how the cluster is deployed and replicated.
The new CDC SDK addresses a large number of those problems.
§CDC SDK
Let’s start the CDC SDK investigation by looking at the CDC 2.13 service definition:
|
|
This doesn’t look much different from the previous version. There are a couple of new operations and some request and response types contain new SDK-specific fields.
An example, there is a single CreateCDCStream
method taking the CreateCDCStreamRequestPB
argument known from the previous incarnation. However, the request argument contains some new capabilities.
It’s obvious that one needs to know the correct set of parameters to work with the SDK. A reference implementation would be handy. As it happens, there is one: the YugabyteDB Debezium connector[2].
The most relevant pieces of the connector code:
YugabyteDBConnector.validateTServerConnection(...)
[3]: handles stream creation and setup.YugabyteDBConnector.fetchTabletList(...)
[4]: fetches list of tablet IDs to consume changes for.YugabyteDBStreamingChangeEventSource.getChanges2(...)
[5]: handles CDC SDK rows returned fromGetChanges
operation.
§creating a stream
To create a CDC SDK stream, called a database stream in 2.13, we have to call the CreateCDCStream
operation with an exact combination of CreateCDCStreamRequestPB
parameters. Hints are in two places:
- YugabyteDB Debezium connector: calls an underlying Java client with required parameters.
- Currently undocumented yb-admin create_change_data_stream operation[6]; this one calls into the
ClusterAdminClient::CreateCDCSDKDBStream
[7] operation.
To create a database stream, the request needs to contain:
- A database name (namespace).
- Record type
CDCRecordType.CHANGE
: defines what each individual record contains:CHANGE
: changed values only; it appears that this is the required value,AFTER
: the entire row after the change,ALL
: the before and after value of the row.
- Record format must be
CDCRecordFormat.PROTO
: data is in the protobuf format. - Source type must be
CDCRequestSource.CDCSDK
,- other values:
XCLUSTER
: simple CDC form.2DC
: two datacenter deployments in YugabyteDB leverage change data capture at the core.
- other values:
- Checkpoint type must be
IMPLICT
orEXPLICIT
: whenIMPLICIT
, CDC SDK automatically manages checkpoints.
If the stream has been successfully created, the ID of the stream will be in CreateCDCStreamResponsePB.db_stream_id
field:
|
|
§consuming the stream
As with the original CDC, to consume a stream, we have to call the GetChanges
operation. The GetChangesRequestPB
type looks like this in 2.13:
|
|
Most interesting fields:
db_stream_id
: takes the database stream ID returned inCreateCDCStreamResponsePB.db_stream_id
; we can see that the original CDC is still supported with thestream_id
field.from_cdc_sdk_checkpoint
: CDC SDK streams require this new checkpoint format.
The table_id
field does not appear to be used.
To consume changes from the CDC SDK database stream, we have to execute a GetChanges
request with the following configurattion:
GetChangesRequestPB.db_stream_id
: contains the database stream ID.GetChangesRequestPB.from_cdc_sdk_checkpoint
: contains the checkpoint, an empty checkpoint is an instance of a checkpoint with all values set to defaults; this will make a consumer receive all changes from the beginning of the stream.GetChangesRequestPB.tablet_id
: tablet ID is required.
§discovering tablet IDs to consume changes from
The implication of the GetChangesRequestPB.tablet_id
being required, is that we have to find all tablet IDs for each table of a database.
Forunately, the Debezium connector does this already. We can use the YugabyteDBConnector.fetchTabletList(...)
4 method to reverse engineer this step.
It boils down to listing all tables using the master service ListTables
request and filtering for PGSQL
relations, where the relation type isn’t INDEX_TABLE_RELATION
or SYSTEM_TABLE_RELATION
(effectively, USER_TABLE_RELATION
only).
|
|
For each discovered table, we list its tablets using the master service GetTableLocations
RPC call. As a result, we have a list of all tablet IDs we want to consume from.
There will be one consumer per tablet.
We can, of course, consume changes for specific tables (or tablets) only. To do so, we can skip tablets we don’t want to consume from.
§the checkpoint
The old checkpoint type contains term
and index
fields delivered in an OpIdPB
message. The term
is most likely referring to the Raft voter term of the tablet, index
is most likely a Raft log entry index of the tablet.
The CDC SDK introduces a new checkpoint type. The new checkpoint type doesn’t use the OpId
anymore. It defines term
and index
fields directly, next to additional properties. The protobuf type:
|
|
The write_id
is the order of changes within a multi-statement transaction and does not make sense when key
is null
.
I’m not sure what’s snapshot_time
for. Something to investigate further, snapshot handling is here[8].
§comparing checkpoints
Checkpoints can be compared by term
, index
and write_id
. These properties should be compared in order:
- Higher
term
implies higher checkpoint, regardless of theindex
values. - If
term
is equal, higherindex
indicates a newer event. write_id
makes sense only within a transaction, higherwrite_id
implies a newer event within a single transaction.
§checkpoint management
The CDC service manages checkpoints by itself when the CDC checkpoint type is IMPLICIT
. Only very specific use cases would require the client to manage checkpoints manually, in the EXPLICIT
checkpoint type mode.
For example, a client buffering records before processing them might require fine-grained checkpoint control.
It’s not clear to me how to put a CDC stream in a mode where CDC checkpoint can be set. Every time I attempted calling SetCDCCheckpoint
for an EXPLICIT
setting, YugabyteDB TServers crash.
Weird, considering that the Java client tests appear to be testing this functionality[9].
§checkpoint management vs consumer offset management
So I kind of made the consumer offset management term up here. This is a term from Apache Kafka world and there’s no reference to it anywhere in YugabyteDB documentation. A single stream ID+tablet ID
combination can have multiple CDC consumers sourcing events from, and that’s perfectly fine.
For example:
- Consumer A performs audits.
- Consumer B observes selected tables and rewrites specific updates to an internal ERP system.
- Consumer C observes individual colums for secret values and raises alerts when those values appear.
Each of those consumers is a standalone program with its own lifecycle. They are started and stopped independently. Each of them needs to have their own last processed checkpoint (a consumer offset) stored somwehere. It’s clear that the SetCDCCheckpoint
operation isn’t designed to perform this role because it does not have a notion of a consumer ID.
Instead, a consumer should process changes received from GetChanges
. When finished, or when it is ready to commit a batch of processed changes, it should store the term
and index
for a stream ID+tablet ID
combination. Next to those values, it needs to store a consumer name. The name should be unique to the consumer application but shared between different starts of the same consumer instance.
I’m not sure if my assumptions are correct, but I would definitely try this approach if I was to build anything on the YugabyteDB CDC SDK. I would probably store checkpoints by myself in a dedicated table. The table could have the following schema:
|
|
§processing changes
The meat of every CDC consumer is the change processing. A response from GetChanges
operation for a database stream returns an array of CDCSDKProtoRecordPB
. Each of those items has the following structure:
|
|
where a row message is:
|
|
The inline Op
identifies the type of the operation. This is where the CDC SDK shows its biggest difference from the original CDC solution. The CDC SDK provides additional context for DDL events, table truncation events, and transaction state. The YugabyteDBStreamingChangeEventSource.getChanges2(...)
Debezium connector method mentioned earlier is a reference for event processing.
There are three groups of messages:
- Transactional messages:
Op.BEGIN
andOp.COMMIT
. - DDL messages:
Op.DDL
. - DML messages:
Op.INSERT
,Op.UPDATE
,Op.DELETE
andOp.TRUNCATE
.
I was not able to trigger anything resulting in the Op.READ
message type.
§DDL messages
Here’s an example of a DDL message emitted after creating a table:
|
|
§DDM messages
Let’s have a look at the data modification events in the following order: insert
, update
, delete
, and truncate
. Find out how the cluster is started and configured[10].
insert
:psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "insert into region values (1, 'test')"
:
|
|
The psql
client wraps statements in a transaction. This gives us the opportunity to see what transactional messages look like.
We have three messages: Op.BEGIN
, Op.INSERT
, Op.COMMIT
.
Op.BEGIN
and Op.COMMIT
should be used as a boundary of consumer offset acceptance. Consumer offset should be advanced only after an Op.COMMIT
message is processed. These could be useful in scenarios where a new value should only be visible after a batch of changes enclosed in the transaction.
The Op.INSERT
message new_tuple
field contains inserted values. old_tuple
contains no values, which is to be expected.
update
:psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "update region set region_description='updated' where region_id=1"
:
|
|
Again, a statement has been wrapped in a transaction so we have received three messages.
The new_tuple
field contains values after the update. old_tuple
does not contain any data, this is most likely the effect of CDCRecordType: CHANGE
.
delete
:psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "delete from region where region_id=1"
:
|
|
Yet again, the statement is wrapped in a transacton. We can see that the new_tuple
is empty, which makes sense. old_tuple
contains the value of the primary key of a deleted item.
truncate
:psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "truncate table region cascade"
:
This statement does not seem to be handled properly on the database level yet. Issuing a truncate
statement breaks the stream (the consumer is not able to get changes anymore), and TServers log errors like these:
yb-tserver-shared-3 | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3 | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3 | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3 | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
§no primary key, no CDC?
All those tests were done on a table with a primary key. Naturally, I wanted to check if it is possible to retrieve changes from a table without a primary key:
|
|
Trying to get changes for tablets of that table returns a CDC RPC error similar to:
reason=
| CDC rpc error: code: 10 (INVALID_REQUEST), status: 4 (INVALID_ARGUMENT)
| message: Tablet ID d17e6b58a79f418dbae5e95978bf2f59 is not part of stream ID 2861d4b6f25d49f7982bcfb2592b54f0
| source: ../../ent/src/yb/cdc/cdc_service.cc@457
This would imply that it’s impossible to use CDC SDK on tables without a primary key. And indeed, this part of the CDC service[11]:
|
|
confirms that. The original, simple CDC has the same restriction.
§changes on the entity level
Another major difference from the first CDC implementation is that change events are per row (entity). There’s no longer an event per tablet. The internal CDC service implementation locates a tablet leader and redelivers tablet leader events only.
Order of events reflects order of operations on a row. An arrival of event implies that all previous operations on that row have been committed.
§consuming from latest operation
Sometimes we might want to skip consuming historical changes and consume from the latest entry only. There are two possibilities, depending on how the database stream has been created:
- CDC checkpoint type
IMPLICIT
: this is the default behaviour when reusing stream IDs. - CDC checkpoint type
EXPLICIT
: useGetCheckpoint
to discover latest saved checkpoint, a value ofterm: 0, index: 0
might indicate that noSetCDCCheckpoint
has been called, consider usingGetLatestEntryOpId
in that case.
§reference go client
My reference CDC SDK client can be found in this GitHub repository[12].
§summary
It’s clear that the CDC SDK is a higher level solution than the first CDC implementation.
Current state:
- Does what it says on the tin.
- Has rough edges and requires some clarifications:
- Fully functional support for
truncate
will be great to have. - No support for tables without primary key. This would be great to have, it would enable some very interesting use cases for continous backup/restore.
- What’s the checkpoint’s
snapshot_time
? What can it be used for? - Why crashes when executing
SetCDCCheckpoint
inEXPLICIT
mode?
- Fully functional support for
YugabyteDB CDC is one of the features to track. The documentation in GitHub[13] suggests some pretty cool features coming.
-
YugabyteDB Debezium connector:
YugabyteDBConnector.validateTServerConnection(...)
-
YugabyteDB Debezium connector:
YugabyteDBConnector.fetchTabletList(...)
↩︎ -
YugabyteDB Debezium connector:
YugabyteDBStreamingChangeEventSource.getChanges2(...)
-
Currently undocumented yb-admin create_change_data_stream operation
-
YugabyteDB source code:
ClusterAdminClient::CreateCDCSDKDBStream
-
YugabyteDB source code: do not create a CDC stream if table doesn’t have a primary key