YugabyteDB CDC SDK beta, a high level overview

an overview of the all-new CDC SDK beta

Shortly after I have published the previous article on YugabyteDB CDC[1], the amazing Yugabyte team released the 2.13 version of the database with a beta implementation of the new change data capture SDK. Before diving into the new SDK, let’s quickly recap the first implementation.

This is a living article. Updates will be listed here.

§first CDC version: xcluster

With the first implementation, we can create and delete JSON based streams, where each stream is created for exactly one table. Each individual stream delivers messages for three types of database events: inserts (WRITE and APPLY), updates (WRITE and APPLY) and deletes (DELETE and APPLY).

1
2
3
4
5
6
7
8
// https://github.com/yugabyte/yugabyte-db/blob/v2.11.2/src/yb/cdc/cdc_service.proto#L165
message CDCRecordPB {
  enum OperationType {
    WRITE = 1;
    DELETE = 2;
    APPLY = 3;
  }
  // ...

For majority of the use cases, this might be sufficient but certain operations aren’t possible:

  • No truncate support.
  • No DDL related events.
  • No transaction indication.

With the xcluster implementation, the CDC service delivers messages on the tablet level. Each individual insert, update, or delete results in multiple messages: one for a tablet leader and one for each follower.

This results in plenty of duplication on the consumer side and an operational complexity of tracking the change source of the truth. This first CDC implementation hands over the complexity of the architecture to the consumer. Instead of working just on the row (entity) level, the consumer must also understand the nature of how the cluster is deployed and replicated.

The new CDC SDK addresses a large number of those problems.

§CDC SDK

Let’s start the CDC SDK investigation by looking at the CDC 2.13 service definition:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L47
service CDCService {
  rpc CreateCDCStream (CreateCDCStreamRequestPB) returns (CreateCDCStreamResponsePB);
  rpc DeleteCDCStream (DeleteCDCStreamRequestPB) returns (DeleteCDCStreamResponsePB);
  rpc ListTablets (ListTabletsRequestPB) returns (ListTabletsResponsePB);
  rpc GetChanges (GetChangesRequestPB) returns (GetChangesResponsePB);
  rpc GetCheckpoint (GetCheckpointRequestPB) returns (GetCheckpointResponsePB);
  rpc UpdateCdcReplicatedIndex (UpdateCdcReplicatedIndexRequestPB)
      returns (UpdateCdcReplicatedIndexResponsePB);
  rpc BootstrapProducer (BootstrapProducerRequestPB) returns (BootstrapProducerResponsePB);
  rpc GetLatestEntryOpId (GetLatestEntryOpIdRequestPB) returns (GetLatestEntryOpIdResponsePB);
  rpc GetCDCDBStreamInfo (GetCDCDBStreamInfoRequestPB) returns (GetCDCDBStreamInfoResponsePB);
  rpc SetCDCCheckpoint (SetCDCCheckpointRequestPB) returns (SetCDCCheckpointResponsePB) {
    option (yb.rpc.trivial) = true;
  };
}

This doesn’t look much different from the previous version. There are a couple of new operations and some request and response types contain new SDK-specific fields.

An example, there is a single CreateCDCStream method taking the CreateCDCStreamRequestPB argument known from the previous incarnation. However, the request argument contains some new capabilities.

It’s obvious that one needs to know the correct set of parameters to work with the SDK. A reference implementation would be handy. As it happens, there is one: the YugabyteDB Debezium connector[2].

The most relevant pieces of the connector code:

§creating a stream

To create a CDC SDK stream, called a database stream in 2.13, we have to call the CreateCDCStream operation with an exact combination of CreateCDCStreamRequestPB parameters. Hints are in two places:

To create a database stream, the request needs to contain:

  • A database name (namespace).
  • Record type CDCRecordType.CHANGE: defines what each individual record contains:
    • CHANGE: changed values only; it appears that this is the required value,
    • AFTER: the entire row after the change,
    • ALL: the before and after value of the row.
  • Record format must be CDCRecordFormat.PROTO: data is in the protobuf format.
  • Source type must be CDCRequestSource.CDCSDK,
    • other values:
      • XCLUSTER: simple CDC form.
      • 2DC: two datacenter deployments in YugabyteDB leverage change data capture at the core.
  • Checkpoint type must be IMPLICT or EXPLICIT: when IMPLICIT, CDC SDK automatically manages checkpoints.

If the stream has been successfully created, the ID of the stream will be in CreateCDCStreamResponsePB.db_stream_id field:

1
2
3
4
5
6
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L124
message CreateCDCStreamResponsePB {
  optional CDCErrorPB error = 1;
  optional bytes stream_id = 2;
  optional bytes db_stream_id = 3;
}

§consuming the stream

As with the original CDC, to consume a stream, we have to call the GetChanges operation. The GetChangesRequestPB type looks like this in 2.13:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L178
message GetChangesRequestPB {
  optional bytes stream_id = 1;
  // Tablet to get the changes for.
  optional bytes tablet_id = 2;

  // Checkpoint to start reading from (exclusive).
  // Start reading from the first record after this checkpoint.
  optional CDCCheckpointPB from_checkpoint = 3;

  // Maximum records to read.
  optional uint32 max_records = 4;

  // Whether the caller knows the tablet address or needs to use us as a proxy.
  optional bool serve_as_proxy = 5 [default = true];

  optional bytes db_stream_id = 6;

  optional bytes table_id = 7;

  optional CDCSDKCheckpointPB from_cdc_sdk_checkpoint = 8;
}

Most interesting fields:

  • db_stream_id: takes the database stream ID returned in CreateCDCStreamResponsePB.db_stream_id; we can see that the original CDC is still supported with the stream_id field.
  • from_cdc_sdk_checkpoint: CDC SDK streams require this new checkpoint format.

The table_id field does not appear to be used.

To consume changes from the CDC SDK database stream, we have to execute a GetChanges request with the following configurattion:

  • GetChangesRequestPB.db_stream_id: contains the database stream ID.
  • GetChangesRequestPB.from_cdc_sdk_checkpoint: contains the checkpoint, an empty checkpoint is an instance of a checkpoint with all values set to defaults; this will make a consumer receive all changes from the beginning of the stream.
  • GetChangesRequestPB.tablet_id: tablet ID is required.

§discovering tablet IDs to consume changes from

The implication of the GetChangesRequestPB.tablet_id being required, is that we have to find all tablet IDs for each table of a database.

Forunately, the Debezium connector does this already. We can use the YugabyteDBConnector.fetchTabletList(...)[4] method to reverse engineer this step.

It boils down to listing all tables using the master service ListTables request and filtering for PGSQL relations, where the relation type isn’t INDEX_TABLE_RELATION or SYSTEM_TABLE_RELATION (effectively, USER_TABLE_RELATION only).

1
2
3
4
5
6
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/master/master_types.proto#L22
enum RelationType {
  SYSTEM_TABLE_RELATION = 1;
  USER_TABLE_RELATION = 2;
  INDEX_TABLE_RELATION = 3;
}

For each discovered table, we list its tablets using the master service GetTableLocations RPC call. As a result, we have a list of all tablet IDs we want to consume from.

There will be one consumer per tablet.

We can, of course, consume changes for specific tables (or tablets) only. To do so, we can skip tablets we don’t want to consume from.

§the checkpoint

The old checkpoint type contains term and index fields delivered in an OpIdPB message. The term is most likely referring to the Raft voter term of the tablet, index is most likely a Raft log entry index of the tablet.

The CDC SDK introduces a new checkpoint type. The new checkpoint type doesn’t use the OpId anymore. It defines term and index fields directly, next to additional properties. The protobuf type:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L162
message CDCSDKCheckpointPB {
  optional int64 term = 1;
  optional int64 index = 2;
  // write_id, key which is the reverse_index_iterator are used to resume from partially
  // streamed intents when the number of intents to be streamed are more than the intent max batch
  // size
  optional bytes key = 3;
  optional int32 write_id = 4 [default = 0];
  // snapshot_time is used in the context of bootstrap process
  optional uint64 snapshot_time = 5;
}

The write_id is the order of changes within a multi-statement transaction and does not make sense when key is null.

I’m not sure what’s snapshot_time for. Something to investigate further, snapshot handling is here[8].

§comparing checkpoints

Checkpoints can be compared by term, index and write_id. These properties should be compared in order:

  • Higher term implies higher checkpoint, regardless of the index values.
  • If term is equal, higher index indicates a newer event.
  • write_id makes sense only within a transaction, higher write_id implies a newer event within a single transaction.

§checkpoint management

The CDC service manages checkpoints by itself when the CDC checkpoint type is IMPLICIT. Only very specific use cases would require the client to manage checkpoints manually, in the EXPLICIT checkpoint type mode.

For example, a client buffering records before processing them might require fine-grained checkpoint control.

It’s not clear to me how to put a CDC stream in a mode where CDC checkpoint can be set. Every time I attempted calling SetCDCCheckpoint for an EXPLICIT setting, YugabyteDB TServers crash.

Weird, considering that the Java client tests appear to be testing this functionality[9].

§checkpoint management vs consumer offset management

So I kind of made the consumer offset management term up here. This is a term from Apache Kafka world and there’s no reference to it anywhere in YugabyteDB documentation. A single stream ID+tablet ID combination can have multiple CDC consumers sourcing events from, and that’s perfectly fine.

For example:

  • Consumer A performs audits.
  • Consumer B observes selected tables and rewrites specific updates to an internal ERP system.
  • Consumer C observes individual colums for secret values and raises alerts when those values appear.

Each of those consumers is a standalone program with its own lifecycle. They are started and stopped independently. Each of them needs to have their own last processed checkpoint (a consumer offset) stored somwehere. It’s clear that the SetCDCCheckpoint operation isn’t designed to perform this role because it does not have a notion of a consumer ID.

Instead, a consumer should process changes received from GetChanges. When finished, or when it is ready to commit a batch of processed changes, it should store the term and index for a stream ID+tablet ID combination. Next to those values, it needs to store a consumer name. The name should be unique to the consumer application but shared between different starts of the same consumer instance.

I’m not sure if my assumptions are correct, but I would definitely try this approach if I was to build anything on the YugabyteDB CDC SDK. I would probably store checkpoints by myself in a dedicated table. The table could have the following schema:

1
2
3
4
5
6
7
create table consumer_offsets (
  streamID bytea,
  tabletID bytea,
  consumerID varchar(64),
  term bigint,
  index bigint
);

§processing changes

The meat of every CDC consumer is the change processing. A response from GetChanges operation for a database stream returns an array of CDCSDKProtoRecordPB. Each of those items has the following structure:

1
2
3
4
5
6
7
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L355
message CDCSDKProtoRecordPB {
  optional RowMessage row_message = 1;

  // Op id information to identify duplicate entries
  optional CDCSDKOpIdPB cdc_sdk_op_id = 2;
}

where a row message is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// https://github.com/yugabyte/yugabyte-db/blob/v2.13.0/src/yb/cdc/cdc_service.proto#L319
message RowMessage {
  enum Op {
    UNKNOWN  = -1;
    INSERT   = 0;
    UPDATE   = 1;
    DELETE   = 2;
    BEGIN    = 3;
    COMMIT   = 4;
    DDL      = 5;
    TRUNCATE = 6;
    READ     = 7;
  }

  optional bytes  transaction_id = 1;
  optional uint64 commit_time = 2;
  optional string table = 3;
  optional Op op = 4;
  repeated DatumMessagePB new_tuple = 5;
  repeated DatumMessagePB old_tuple = 6;
  repeated TypeInfo new_typeinfo = 7;

  // Schema information used in case of DDL
  optional CDCSDKSchemaPB schema = 8;

  // Schema version
  optional uint32 schema_version = 9;

  // New table name, used in the case of rename table
  optional string new_table_name = 10;

  optional string pgschema_name = 11;

  // truncate info
  optional yb.tserver.TruncateRequestPB truncate_request_info = 13;
}

The inline Op identifies the type of the operation. This is where the CDC SDK shows its biggest difference from the original CDC solution. The CDC SDK provides additional context for DDL events, table truncation events, and transaction state. The YugabyteDBStreamingChangeEventSource.getChanges2(...) Debezium connector method mentioned earlier is a reference for event processing.

There are three groups of messages:

  • Transactional messages: Op.BEGIN and Op.COMMIT.
  • DDL messages: Op.DDL.
  • DML messages: Op.INSERT, Op.UPDATE, Op.DELETE and Op.TRUNCATE.

I was not able to trigger anything resulting in the Op.READ message type.

§DDL messages

Here’s an example of a DDL message emitted after creating a table:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
{
  "checkpoint": {
    "op_id": {
      "term": 2,
      "index": 3
    }
  },
  "cdc_sdk_proto_records": [
    {
      "row_message": {
        "table": "region",
        "op": 5,
        "schema": {
          "column_info": [
            {
              "name": "region_id",
              "type": {
                "main": 2
              },
              "is_key": true,
              "is_hash_key": true,
              "is_nullable": false,
              "oid": 21
            },
            {
              "name": "region_description",
              "type": {
                "main": 5
              },
              "is_key": false,
              "is_hash_key": false,
              "is_nullable": true,
              "oid": 1042
            }
          ],
          "tab_info": {
            "default_time_to_live": 0,
            "num_tablets": 3,
            "is_ysql_catalog_table": false
          }
        },
        "schema_version": 0,
        "pgschema_name": "public"
      }
    }
  ],
  "cdc_sdk_checkpoint": {
    "term": 2,
    "index": 3,
    "write_id": 0,
    "snapshot_time": 0
  }
}

§DDM messages

Let’s have a look at the data modification events in the following order: insert, update, delete, and truncate. Find out how the cluster is started and configured[10].

  • insert: psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "insert into region values (1, 'test')":
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
  "checkpoint": {
    "op_id": {
      "term": 1,
      "index": 4
    }
  },
  "cdc_sdk_proto_records": [
    {
      "row_message": {
        "transaction_id": "NTEwMzRlZTMtMjVkYS00YWRjLWJlYjItNjBmZjcwNjc4YTMz",
        "table": "region",
        "op": 3
      }
    },
    {
      "row_message": {
        "transaction_id": "NTEwMzRlZTMtMjVkYS00YWRjLWJlYjItNjBmZjcwNjc4YTMz",
        "table": "region",
        "op": 0,
        "new_tuple": [
          {
            "column_name": "region_id",
            "column_type": 21,
            "Datum": {
              "DatumInt32": 1
            }
          },
          {
            "column_name": "region_description",
            "column_type": 1042,
            "Datum": {
              "DatumString": "test"
            }
          }
        ],
        "old_tuple": [
          {
            "Datum": null
          },
          {
            "Datum": null
          }
        ],
        "pgschema_name": "public"
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 4,
        "write_id": 1,
        "write_id_key": "eFEDTuMl2krcvrJg/3BnijPcf/6GQXF3Ly5/wFQ="
      }
    },
    {
      "row_message": {
        "transaction_id": "NTEwMzRlZTMtMjVkYS00YWRjLWJlYjItNjBmZjcwNjc4YTMz",
        "table": "region",
        "op": 4
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 4,
        "write_id": 0
      }
    }
  ],
  "cdc_sdk_checkpoint": {
    "term": 1,
    "index": 4,
    "write_id": 0
  }
}

The psql client wraps statements in a transaction. This gives us the opportunity to see what transactional messages look like.

We have three messages: Op.BEGIN, Op.INSERT, Op.COMMIT.

Op.BEGIN and Op.COMMIT should be used as a boundary of consumer offset acceptance. Consumer offset should be advanced only after an Op.COMMIT message is processed. These could be useful in scenarios where a new value should only be visible after a batch of changes enclosed in the transaction.

The Op.INSERT message new_tuple field contains inserted values. old_tuple contains no values, which is to be expected.

  • update: psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "update region set region_description='updated' where region_id=1":
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
  "checkpoint": {
    "op_id": {
      "term": 1,
      "index": 6
    }
  },
  "cdc_sdk_proto_records": [
    {
      "row_message": {
        "transaction_id": "ZDk1NTA2NDQtNThmMC00N2JiLWExNDEtMTI2OTdjZWE1ODRi",
        "table": "region",
        "op": 3
      }
    },
    {
      "row_message": {
        "transaction_id": "ZDk1NTA2NDQtNThmMC00N2JiLWExNDEtMTI2OTdjZWE1ODRi",
        "table": "region",
        "op": 1,
        "new_tuple": [
          {
            "column_name": "region_id",
            "column_type": 21,
            "Datum": {
              "DatumInt32": 1
            }
          },
          {
            "column_name": "region_description",
            "column_type": 1042,
            "Datum": {
              "DatumString": "updated"
            }
          }
        ],
        "old_tuple": [
          {
            "Datum": null
          },
          {
            "Datum": null
          }
        ],
        "pgschema_name": "public"
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 6,
        "write_id": 0,
        "write_id_key": "eNlVBkRY8Ee7oUESaXzqWEvcf/6GQXXRDYl/tQ=="
      }
    },
    {
      "row_message": {
        "transaction_id": "ZDk1NTA2NDQtNThmMC00N2JiLWExNDEtMTI2OTdjZWE1ODRi",
        "table": "region",
        "op": 4
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 6,
        "write_id": 0
      }
    }
  ],
  "cdc_sdk_checkpoint": {
    "term": 1,
    "index": 6,
    "write_id": 0
  }
}

Again, a statement has been wrapped in a transaction so we have received three messages.

The new_tuple field contains values after the update. old_tuple does not contain any data, this is most likely the effect of CDCRecordType: CHANGE.

  • delete: psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "delete from region where region_id=1":
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
  "checkpoint": {
    "op_id": {
      "term": 1,
      "index": 8
    }
  },
  "cdc_sdk_proto_records": [
    {
      "row_message": {
        "transaction_id": "NTQyMGRhYjUtYTJhMS00Y2FlLTk3OWMtMWUxMWJmMzgyNDMx",
        "table": "region",
        "op": 3
      }
    },
    {
      "row_message": {
        "transaction_id": "NTQyMGRhYjUtYTJhMS00Y2FlLTk3OWMtMWUxMWJmMzgyNDMx",
        "table": "region",
        "op": 2,
        "new_tuple": [
          {
            "Datum": null
          }
        ],
        "old_tuple": [
          {
            "column_name": "region_id",
            "column_type": 21,
            "Datum": {
              "DatumInt32": 1
            }
          }
        ],
        "pgschema_name": "public"
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 8,
        "write_id": 0,
        "write_id_key": "eFQg2rWioUyul5weEb84JDHcf/6GQXoUJj9/tQ=="
      }
    },
    {
      "row_message": {
        "transaction_id": "NTQyMGRhYjUtYTJhMS00Y2FlLTk3OWMtMWUxMWJmMzgyNDMx",
        "table": "region",
        "op": 4
      },
      "cdc_sdk_op_id": {
        "term": 1,
        "index": 8,
        "write_id": 0
      }
    }
  ],
  "cdc_sdk_checkpoint": {
    "term": 1,
    "index": 8,
    "write_id": 0
  }
}

Yet again, the statement is wrapped in a transacton. We can see that the new_tuple is empty, which makes sense. old_tuple contains the value of the primary key of a deleted item.

  • truncate: psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "truncate table region cascade":

This statement does not seem to be handled properly on the database level yet. Issuing a truncate statement breaks the stream (the consumer is not able to get changes anymore), and TServers log errors like these:

yb-tserver-shared-3    | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3    | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3    | [libprotobuf FATAL google/protobuf/reflection_ops.cc:59] CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)
yb-tserver-shared-3    | libc++abi: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: (to->GetDescriptor()) == (descriptor): Tried to merge messages of different types (merge yb.tablet.TruncatePB to yb.tserver.TruncateRequestPB)

§no primary key, no CDC?

All those tests were done on a table with a primary key. Naturally, I wanted to check if it is possible to retrieve changes from a table without a primary key:

1
2
3
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "create table test_table(a text, b text)"
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "insert into test_table values ('test value 1a', 'test value 1b')"
psql "host=localhost port=35432 user=yugabyte dbname=cdctest" -c "insert into test_table values ('test value 2a', 'test value 2b')"

Trying to get changes for tablets of that table returns a CDC RPC error similar to:

  reason=
  | CDC rpc error: code: 10 (INVALID_REQUEST), status: 4 (INVALID_ARGUMENT)
  | 	message: Tablet ID d17e6b58a79f418dbae5e95978bf2f59 is not part of stream ID 2861d4b6f25d49f7982bcfb2592b54f0
  | 	source: ../../ent/src/yb/cdc/cdc_service.cc@457

This would imply that it’s impossible to use CDC SDK on tables without a primary key. And indeed, this part of the CDC service[11]:

1
2
3
4
5
6
7
    // internally if any of the table doesn't have a primary key, then do not create
    // a CDC stream ID for that table
    if (!YsqlTableHasPrimaryKey(table->schema())) {
      LOG(WARNING) << "Skipping CDC stream creation on " << table->name().table_name()
                   << " because it does not have a primary key";
      continue;
    }

confirms that. The original, simple CDC has the same restriction.

§changes on the entity level

Another major difference from the first CDC implementation is that change events are per row (entity). There’s no longer an event per tablet. The internal CDC service implementation locates a tablet leader and redelivers tablet leader events only.

Order of events reflects order of operations on a row. An arrival of event implies that all previous operations on that row have been committed.

§consuming from latest operation

Sometimes we might want to skip consuming historical changes and consume from the latest entry only. There are two possibilities, depending on how the database stream has been created:

  • CDC checkpoint type IMPLICIT: this is the default behaviour when reusing stream IDs.
  • CDC checkpoint type EXPLICIT: use GetCheckpoint to discover latest saved checkpoint, a value of term: 0, index: 0 might indicate that no SetCDCCheckpoint has been called, consider using GetLatestEntryOpId in that case.

§reference go client

My reference CDC SDK client can be found in this GitHub repository[12].

§summary

It’s clear that the CDC SDK is a higher level solution than the first CDC implementation.

Current state:

  • Does what it says on the tin.
  • Has rough edges and requires some clarifications:
    • Fully functional support for truncate will be great to have.
    • No support for tables without primary key. This would be great to have, it would enable some very interesting use cases for continous backup/restore.
    • What’s the checkpoint’s snapshot_time? What can it be used for?
    • Why crashes when executing SetCDCCheckpoint in EXPLICIT mode?

YugabyteDB CDC is one of the features to track. The documentation in GitHub[13] suggests some pretty cool features coming.


  1. YugabyteDB change data capture 

  2. YugabyteDB Debezium connector 

  3. YugabyteDB Debezium connector: YugabyteDBConnector.validateTServerConnection(...) 

  4. YugabyteDB Debezium connector: YugabyteDBConnector.fetchTabletList(...) 

  5. YugabyteDB Debezium connector: YugabyteDBStreamingChangeEventSource.getChanges2(...) 

  6. Currently undocumented yb-admin create_change_data_stream operation 

  7. YugabyteDB source code: ClusterAdminClient::CreateCDCSDKDBStream 

  8. snapshot handling in YugabyteDB CDC SDK producer 

  9. Java client checkpoint handling tests 

  10. YugabyteDB change data capture: cluster configuration 

  11. YugabyteDB source code: do not create a CDC stream if table doesn’t have a primary key 

  12. YugabyteDB CDC test golang client 

  13. 2.13 DocDB CDC documentation