Changefeed Messages

On this page Carat arrow pointing down

Changefeeds generate and emit messages (on a per-key basis) as changes happen to the rows in watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different options (e.g., format=avro).

This page describes the format and behavior of changefeed messages. You will find the following information on this page:

Note:

Changefeed types are not fully integrated with user-defined composite types. Running changefeeds with user-defined composite types is in Preview. Certain changefeed types do not support user-defined composite types. Refer to the change data capture Known Limitations for more detail.

Responses

By default, changefeed messages emitted to a sink contain keys and values of the watched table rows that have changed. The message will contain the following fields depending on the type of emitted change and the options you specified to create the changefeed:

  • Key: An array composed of the row's PRIMARY KEY field(s) (e.g., [1] for JSON or {"id":{"long":1}} for Avro).
  • Value:
    • One of four possible top-level fields:
      • after, which contains the state of the row after the update (or null for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an after value since they only function as checkpoints.
      • before, which contains the state of the row before an update. Changefeeds must use the diff option with the default wrapped envelope to emit the before field. When a row did not previously have any data, the before field will emit null.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.
Note:

If you use the envelope option to alter the changefeed message fields, your messages may not contain one or more of the values noted in the preceding list. As an example, when emitting to a Kafka sink, you can limit messages to just the changed key value by using the envelope option set to key_only. For more detail, refer to Message envelopes.

For example, changefeeds emitting to a sink will have the default message format:

Statement Response
INSERT INTO office_dogs VALUES (1, 'Petee'); JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}
DELETE FROM office_dogs WHERE name = 'Petee' JSON: [1] {"after": null}
Avro: {"id":{"long":1}} {"after":null}

When a changefeed targets a table with multiple column families, the family name is appended to the table name as part of the topic. Refer to Tables with columns families in changefeeds for guidance.

For webhook sinks, the response format arrives as a batch of changefeed messages with a payload and length.

{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2}

Webhook message batching is subject to the same key ordering guarantee as other sinks. Therefore, as messages are batched, you will not receive two batches at the same time with overlapping keys. You may receive a single batch containing multiple messages about one key, because ordering is maintained for a single key within its batch.

Refer to changefeed files for more detail on the file naming format for Enterprise changefeeds.

Message envelopes

The envelope defines the structure of a changefeed message. You can use the envelope option to manipulate the changefeed envelope. The values that the envelope option accepts are compatible with different changefeed sinks, and the structure of the message will vary depending on the sink.

Note:

Changefeeds created with EXPERIMENTAL CHANGEFEED FOR or CREATE CHANGEFEED with no sink specified (sinkless changefeeds) produce messages without the envelope metadata fields of changefeeds emitting to sinks.

The following sections provide examples of changefeed messages that are emitted when you specify each of the supported envelope options. Other changefeed options can affect the message envelope and what messages are emitted. Therefore, the examples are a guide for what you can expect when only the envelope option is specified.

wrapped

wrapped is the default envelope structure for changefeed messages. This envelope contains an array of the primary key (or the key as part of the message metadata), a top-level field for the type of message, and the current state of the row (or null for deleted rows).

The message envelope contains a primary key array when your changefeed is emitting to a sink that does not have a message key as part of its protocol, (e.g., cloud storage, webhook sinks, or Google Pub/Sub). By default, messages emitted to Kafka sinks do not have the primary key array, because the key is part of the message metadata. If you would like messages emitted to Kafka sinks to contain a primary key array, you can use the key_in_value option. Refer to the following message outputs for examples of this.

  • Cloud storage sink:

    CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud';
    
    {"after": {"city": "seattle", "creation_time": "2019-01-02T03:04:05", "current_location": "86359 Jeffrey Ranch", "ext": {"color": "yellow"}, "id": "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8", "owner_id": "570a3d70-a3d7-4c00-8000-000000000011", "status": "in_use", "type": "scooter"}, "key": ["seattle", "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8"]}
    
  • Kafka sink:

    • Default when envelope=wrapped or envelope is not specified:

      CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka';
      
      {"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "24315 Elizabeth Mountains", "ext": {"color": "yellow"}, "id": "dadc1c0b-30f0-4c8b-bd16-046c8612bbea", "owner_id": "034075b6-5380-4996-a267-5a129781f4d3", "status": "in_use", "type": "scooter"}}
      
    • Kafka sink message with key_in_value provided:

      CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH key_in_value, envelope=wrapped;
      
      {"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "46227 Jeremy Haven Suite 92", "ext": {"brand": "Schwinn", "color": "red"}, "id": "298cc7a0-de6b-4659-ae57-eaa2de9d99c3", "owner_id": "beda1202-63f7-41d2-aa35-ee3a835679d1", "status": "in_use", "type": "bike"}, "key": ["washington dc", "298cc7a0-de6b-4659-ae57-eaa2de9d99c3"]}
      

wrapped and diff

To include a before field in the changefeed message that contains the state of a row before an update in the changefeed message, use the diff option with wrapped:

CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wrapped;
{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "2 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "before": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "5 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "key": ["seattle", "f6c02fe0-a4e0-476d-a3b7-91934d15dce2"]}

bare

bare removes the after key from the changefeed message and stores any metadata in a crdb field. When used with avro format, record will replace the after key.

  • Cloud storage sink:

    CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud' WITH envelope=bare;
    
    {"__crdb__": {"key": ["washington dc", "cd48e501-e86d-4019-9923-2fc9a964b264"]}, "city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "87247 Diane Park", "ext": {"brand": "Fuji", "color": "yellow"}, "id": "cd48e501-e86d-4019-9923-2fc9a964b264", "owner_id": "a616ce61-ade4-43d2-9aab-0e3b24a9aa9a", "status": "available", "type": "bike"}
    

CDC queries use envelope=bare message format by default. The bare message envelope places the output of the SELECT clause at the top level of the message instead of under an "after" key. When there is additional information that the changefeed is sending, such as updated or resolved timestamps, the messages will include a crdb field containing this information.

  • In CDC queries:

    • A changefeed containing a SELECT clause without any additional options:

      CREATE CHANGEFEED INTO 'external://kafka' AS SELECT city, type FROM movr.vehicles;
      
      {"city": "los angeles", "type": "skateboard"}
      
    • A changefeed containing a SELECT clause with the topic_in_value option specified:

      CREATE CHANGEFEED INTO 'external://kafka' WITH topic_in_value AS SELECT city, type FROM movr.vehicles;
      
      {"__crdb__": {"topic": "vehicles"}, "city": "los angeles", "type": "skateboard"}
      

key_only

key_only emits only the key and no value, which is faster if you only need to know the key of the changed row. This envelope option is only supported for Kafka sinks or sinkless changefeeds.

  • Kafka sink:

    CREATE CHANGEFEED FOR TABLE users INTO 'external://kafka' WITH envelope=key_only;
    
    ["boston", "22222222-2222-4200-8000-000000000002"]
    
    Note:

    It is necessary to set up a Kafka consumer to display the key because the key is part of the metadata in Kafka messages, rather than in its own field. When you start a Kafka consumer, you can use --property print.key=true to have the key print in the changefeed message.

  • Sinkless changefeeds:

    CREATE CHANGEFEED FOR TABLE users WITH envelope=key_only;
    
    {"key":"[\"seattle\", \"fff726cc-13b3-475f-ad92-a21cafee5d3f\"]","table":"users","value":""}
    

row

row emits the row without any additional metadata fields in the message. This envelope option is only supported for Kafka sinks or sinkless changefeeds. row does not support avro format—if you are using avro, refer to the bare envelope option.

  • Kafka sink:

    CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH envelope=row;
    
    {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "85551 Moore Mountains Apt. 47", "ext": {"color": "red"}, "id": "d3b37607-1e9f-4e25-b772-efb9374b08e3", "owner_id": "4f26b516-f13f-4136-83e1-2ea1ae151c20", "status": "available", "type": "skateboard"}
    

Ordering and delivery guarantees

Changefeeds provide the following guarantees for message delivery to changefeed sinks:

Note:

Changefeeds do not support total message ordering or transactional ordering of messages.

Per-key ordering

Changefeeds provide a per-key ordering guarantee for the first emission of a message to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a new change for that row at an earlier timestamp.

For example, a changefeed can emit updates to rows A at timestamp T1, B at T2, and C at T3 in any order.

When there are updates to rows A at T1, B at T2, and A at T3, the changefeed will always emit A at T3 (for the first time) after emitting A at T1 (for the first time). However, A at T3 could precede or follow B at T2, because there is no timestamp ordering between keys.

Under some circumstances, a changefeed will emit duplicate messages of row updates. Changefeeds can emit duplicate messages in any order.

As an example, you run the following sequence of SQL statements to create a changefeed:

  1. Create a table:

    icon/buttons/copy
    CREATE TABLE employees (
        id INT PRIMARY KEY,
        name STRING,
        office STRING
    );
    
  2. Create a changefeed targeting the employees table:

    icon/buttons/copy
    CREATE CHANGEFEED FOR TABLE employees INTO 'external://sink' WITH updated;
    
  3. Insert and update values in employees:

    icon/buttons/copy
    INSERT INTO employees VALUES (1, 'Terry', 'new york city');
    INSERT INTO employees VALUES (2, 'Alex', 'los angeles');
    UPDATE employees SET name = 'Terri' WHERE id = 1;
    INSERT INTO employees VALUES (3, 'Ash', 'london');
    UPDATE employees SET name = 'Terrence' WHERE id = 1;
    UPDATE employees SET office = 'new york city' WHERE id = 2;
    INSERT INTO employees VALUES (4, 'Danny', 'los angeles');
    INSERT INTO employees VALUES (5, 'Robbie', 'london');
    
    Note:

    In a transaction, if a row is modified more than once in the same transaction, the changefeed will only emit the last change.

  4. The sink will receive messages of the inserted rows emitted per timestamp:

    {"after": {"id": 1, "name": "Terry", "office": "new york city"}, "key": [1], "updated": "1701102296662969433.0000000000"}
    {"after": {"id": 1, "name": "Terri", "office": "new york city"}, "key": [1], "updated": "1701102311425045162.0000000000"}
    {"after": {"id": 2, "name": "Alex", "office": "los angeles"}, "key": [2], "updated": "1701102305519323705.0000000000"}
    {"after": {"id": 3, "name": "Ash", "office": "london"}, "key": [3], "updated": "1701102316388801052.0000000000"}
    {"after": {"id": 1, "name": "Terrence", "office": "new york city"}, "key": [1], "updated": "1701102320607990564.0000000000"}
    {"after": {"id": 2, "name": "Alex", "office": "new york city"}, "key": [2], "updated": "1701102325724272373.0000000000"}
    {"after": {"id": 5, "name": "Robbie", "office": "london"}, "key": [5], "updated": "1701102330377135318.0000000000"}
    {"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"}
    

    The messages received at the sink are in order by timestamp for each key. Here, the update for key [1] is emitted before the insertion of key [2] even though the timestamp for the update to key [1] is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will be in the correct timestamp order. However, if a changefeed starts to re-emit messages after the last checkpoint, it may not emit all duplicate messages between the first duplicate message and new updates to the table. For details on when changefeeds might re-emit messages, refer to Duplicate messages.

    The updated option adds an updated timestamp to each emitted row. You can also use the resolved option to emit a resolved timestamp message to each Kafka partition, or to a separate file at a cloud storage sink. A resolved timestamp guarantees that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

    Note:

    Depending on the workload, you can use resolved timestamp notifications on every Kafka partition to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures. Use the resolved timestamp to see every row that changed at a certain time.

Define a key column

Typically, changefeeds that emit to Kafka sinks shard rows between Kafka partitions using the row's primary key, which is hashed. The resulting hash remains the same and ensures a row will always emit to the same Kafka partition.

In some cases, you may want to specify another column in a table as the key by using the key_column option, which will determine the partition your messages will emit to. However, if you implement key_column with a changefeed, consider that other columns may have arbitrary values that change. As a result, the same row (i.e., by primary key) may emit to any partition at the sink based upon the column value. A changefeed with a key_column specified will still maintain per-key and at-least-once delivery guarantees.

To confirm that messages may emit the same row to different partitions when an arbitrary column is used, you must include the unordered option:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE employees INTO 'external://kafka-sink'
    WITH key_column='office', unordered;

At-least-once delivery

Changefeeds also provide an at-least-once delivery guarantee, which means that each version of a row will be emitted once. Under some infrequent conditions a changefeed will emit duplicate messages. This happens when the changefeed was not able to emit all messages before reaching a checkpoint. As a result, it may re-emit some or all of the messages starting from the previous checkpoint to ensure that every message is delivered at least once, which could lead to some messages being delivered more than once.

Refer to Duplicate messages for causes of messages repeating at the sink.

For example, the checkpoints and changefeed pauses marked in this output show how messages may be duplicated, but always delivered:

{"after": {"id": 1, "name": "Terry", "office": "new york city"}, "key": [1], "updated": "1701102296662969433.0000000000"}
{"after": {"id": 1, "name": "Terri", "office": "new york city"}, "key": [1], "updated": "1701102311425045162.0000000000"}
{"after": {"id": 2, "name": "Alex", "office": "los angeles"}, "key": [2], "updated": "1701102305519323705.0000000000"}

[checkpoint]

{"after": {"id": 3, "name": "Ash", "office": "london"}, "key": [3], "updated": "1701102316388801052.0000000000"}
{"after": {"id": 1, "name": "Terrence", "office": "new york city"}, "key": [1], "updated": "1701102320607990564.0000000000"}

[changefeed pauses before the next checkpoint was reached]

[changefeed resumes and re-emits the messages after the previous checkpoint to ensure the sink received the messages]

{"after": {"id": 3, "name": "Ash", "office": "london"}, "key": [3], "updated": "1701102316388801052.0000000000"}
{"after": {"id": 1, "name": "Terrence", "office": "new york city"}, "key": [1], "updated": "1701102320607990564.0000000000"}
{"after": {"id": 2, "name": "Alex", "office": "new york city"}, "key": [2], "updated": "1701102325724272373.0000000000"}

[changefeed continues to emit new events]

{"after": {"id": 5, "name": "Robbie", "office": "london"}, "key": [5], "updated": "1701102330377135318.0000000000"}
{"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"}

[checkpoint]

In this example, with duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to Per-key ordering.)

Warning:

The first time a message is delivered, it will be in the correct timestamp order, which follows the per-key ordering guarantee. However, when there are duplicate messages, the changefeed may not re-emit every row update. As a result, there may be gaps in a sequence of duplicate messages for a key.

To compare two different rows for happens-before, compare the updated timestamp. This works across anything in the same cluster (tables, nodes, etc.).

The complexity with timestamps is necessary because CockroachDB supports transactions that can affect any part of the cluster, and it is not possible to horizontally divide the transaction log into independent changefeeds. For more information about this, read our blog post on CDC.

Note:

When changes happen to a column that is part of a composite key, the changefeed will produce a delete message and then an insert message.

Delete messages

Deleting a row will result in a changefeed outputting the primary key of the deleted row and a null value. For example, with default options, deleting the row with primary key 5 will output:

[5] {"after": null}

In some unusual situations you may receive a delete message for a row without first seeing an insert message. For example, if an attempt is made to delete a row that does not exist, you may or may not get a delete message because the changefeed behavior is undefined to allow for optimizations at the storage layer. Similarly, if there are multiple writes to a row within a single transaction, only the last one will propagate to a changefeed. This means that creating and deleting a row within the same transaction will never result in an insert message, but may result in a delete message.

Resolved messages

When you create a changefeed with the resolved option, the changefeed will emit resolved timestamp messages in a format dependent on the connected sink. The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an earlier update timestamp will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a checkpoint.

When you specify the resolved option at changefeed creation, the job's coordinating node will send the resolved timestamp to each endpoint at the sink. For example, each Kafka partition will receive a resolved timestamp message, or a cloud storage sink will receive a resolved timestamp file.

There are three different ways to configure resolved timestamp messages:

  • If you do not specify the resolved option at all, then the changefeed coordinator node will not send resolved timestamp messages.
  • If you include WITH resolved in your changefeed creation statement without specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away.

    icon/buttons/copy
    CREATE CHANGEFEED FOR TABLE ... WITH resolved;
    
  • If you specify a duration like WITH resolved={duration}, the changefeed will use it as the minimum duration between resolved messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example:

    icon/buttons/copy
    CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s;
    

Resolved timestamp frequency

The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the min_checkpoint_frequency option and flush frequency (if flushing is configurable for your sink).

The min_checkpoint_frequency option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the 30s default, then you must set min_checkpoint_frequency to at least the desired resolved timestamp frequency. For example:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s;

When you configure the min_checkpoint_frequency and resolved options, there can be a tradeoff between changefeed message latency and cluster CPU usage.

  • Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster.
  • Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink.

For example, you can set min_checkpoint_frequency and resolved to 0s so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than 0s between checkpoints, this will help to reduce the overhead on the cluster.

Duplicate messages

Under some circumstances, changefeeds will emit duplicate messages to ensure the sink is receiving each message at least once. The following can cause or increase duplicate messages:

  • The changefeed job encounters an error and pauses, or is manually paused.
  • A node in the cluster restarts or fails.
  • The changefeed job has the min_checkpoint_frequency option set, which can potentially increase duplicate messages.
  • A target table undergoes a schema change. Schema changes may also cause the changefeed to emit the whole target table. Refer to Schema changes for detail on duplicates in this case.

A changefeed job cannot confirm that a message has been received by the sink unless the changefeed has reached a checkpoint. As a changefeed job runs, each node will send checkpoint progress to the job's coordinator node. These progress reports allow the coordinator to update the high-water mark timestamp confirming that all changes before (or at) the timestamp have been emitted.

When a changefeed must pause and then resume, it will return to the last checkpoint (A), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that were not confirmed in the next checkpoint. The changefeed may not re-emit every message, but it will ensure each change is emitted at least once.

How checkpoints will re-emit messages when a changefeed pauses. The changefeed returns to the last checkpoint and potentially sends duplicate messages.

Changefeed encounters an error

By default, changefeeds treat errors as retryable except for some specific terminal errors. When a changefeed encounters a retryable or non-retryable error, the job will pause until a successful retry or you resume the job once the error is solved. This can cause duplicate messages at the sink as the changefeed returns to the last checkpoint.

We recommend monitoring for changefeed retry errors and failures. Refer to the Monitor and Debug Changefeeds page.

Note:

A sink's batching behavior can increase the number of duplicate messages. For example, if Kafka receives a batch of N messages and successfully saves N-1 of them, the changefeed job only knows that the batch failed, not which message failed to commit. As a result, the changefeed job will resend the full batch of messages, which means all but one of the messages are duplicates. For Kafka sinks, reducing the batch size with kafka_sink_config may help to reduce the number of duplicate messages at the sink.

Refer to the Changefeed Sinks page for details on sink batching configuration.

Node restarts

When a node restarts, the changefeed will emit duplicates since the last checkpoint. During a rolling restart of nodes, a changefeed can fall behind as it tries to catch up during each node restart. For example, as part of a rolling upgrade or cluster maintenance, a node may drain every 5 minutes and the changefeed job checkpoints every 5 minutes.

To prevent the changefeed from falling too far behind, pause changefeed jobs before performing rolling node restarts.

min_checkpoint_frequency option

The min_checkpoint_frequency option controls how often nodes flush their progress to the coordinating changefeed node. Therefore, changefeeds will wait for at least the min_checkpoint_frequency duration before flushing to the sink. If a changefeed pauses and then resumes, the min_checkpoint_frequency duration is the amount of time that the changefeed will need to catch up since its previous checkpoint. During this catch-up time, you could receive duplicate messages.

Schema Changes

In v22.1, CockroachDB introduced the declarative schema changer. When schema changes happen that use the declarative schema changer by default, changefeeds will not emit duplicate records for the table that is being altered. It will only emit a copy of the table using the new schema. Refer to Schema changes with column backfill for examples of this.

Avro schema changes

To ensure that the Avro schemas that CockroachDB publishes will work with the schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible, because the Confluent Schema Registry has a different set of rules than Avro for schemas to be backward- and forward-compatible.

The original CockroachDB column definition is also included within a doc field __crdb__ in the schema. This allows CockroachDB to distinguish between a NOT NULL CockroachDB column and a NULL CockroachDB column.

Warning:

Schema validation tools should ignore the __crdb__ field. This is an internal CockroachDB schema type description that may change between CockroachDB versions.

Schema changes with column backfill

When schema changes with column backfill (e.g., adding a column with a default, adding a stored computed column, adding a NOT NULL column, dropping a column) are made to watched rows, CockroachDB emits a copy of the table using the new schema.

Note:

Schema changes that do not use the declarative schema changer by default will trigger a changefeed to emit a copy of the table being altered as well as a copy of the table using the new schema. For a list of supported schema changes, refer to the Declarative schema changer section.

The following example demonstrates the messages you will receive after creating a changefeed and then applying a schema change to the watched table:

icon/buttons/copy
CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);
icon/buttons/copy
INSERT INTO office_dogs VALUES
   (1, 'Petee H'),
   (2, 'Carl'),
   (3, 'Ernie');
icon/buttons/copy
CREATE CHANGEFEED FOR TABLE office_dogs INTO 'external://cloud';

You receive each of the rows at the sink:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}

For example, add a column to the watched table:

icon/buttons/copy
ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

After the schema change, the changefeed will emit a copy of the table with the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

If the schema change does not use the declarative schema changer by default, the changefeed will emit a copy of the altered table and a copy of the table using the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"}  # Duplicate
[2] {"id": 2, "name": "Carl"}     # Duplicate
[3] {"id": 3, "name": "Ernie"}    # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

To prevent the changefeed from emitting a copy of the table with the new schema, use the schema_change_policy = nobackfill option. In the preceding two output blocks, the new schema messages that include the "likes_treats" column will not emit.

Refer to the CREATE CHANGEFEED option table for detail on the schema_change_policy option. You can also use the schema_change_events option to define the type of schema change event that triggers the behavior specified in schema_change_policy.

Note:

As of v22.1, changefeeds filter out VIRTUAL computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions where NULL values are emitted for virtual computed columns, see the virtual_columns option for more detail.

Filtering changefeed messages

There are several ways to define messages, filter different types of message, or prevent all changefeed messages from emitting to the sink. The following sections outline configurable settings and SQL syntax to handle different use cases.

Prevent changefeeds from emitting row-level TTL deletes

Use the ttl_disable_changefeed_replication table storage parameter to prevent changefeeds from sending DELETE messages issued by row-level TTL jobs for a table. Include the storage parameter when you create or alter the table. For example:

icon/buttons/copy
CREATE TABLE tbl (
  id UUID PRIMARY KEY default gen_random_uuid(),
  value TEXT
) WITH (ttl_expire_after = '3 weeks', ttl_job_cron = '@daily', ttl_disable_changefeed_replication = 'true');
icon/buttons/copy
ALTER TABLE events SET (ttl_expire_after = '1 year', ttl_disable_changefeed_replication = 'true');

You can also widen the scope to the cluster by setting the sql.ttl.changefeed_replication.disabled cluster setting to true. This will prevent changefeeds from emitting deletes issued by all TTL jobs on a cluster.

If you want to have a changefeed ignore the storage parameter or cluster setting that disables changefeed replication, you can set the changefeed option ignore_disable_changefeed_replication to true:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE table_name INTO 'external://changefeed-sink'
  WITH resolved, ignore_disable_changefeed_replication = true;

This is useful when you have multiple use cases for different changefeeds on the same table. For example, you have a table with a changefeed streaming changes to another database for analytics workflows in which you do not want to reflect row-level TTL deletes. Secondly, you have a changefeed on the same table for audit-logging purposes for which you need to persist every change through the changefeed.

Disable changefeeds from emitting messages

To prevent changefeeds from emitting messages for any changes (e.g., INSERT, UPDATE) issued to watched tables during that session, set the disable_changefeed_replication session variable to true.

Define the change data emitted to a sink

When you create a changefeed, use change data capture queries to define the change data emitted to your sink.

For example:

icon/buttons/copy
CREATE CHANGEFEED INTO 'scheme://sink-URI' WITH updated AS SELECT column, column FROM table;

For details on syntax and examples, refer to the Change Data Capture Queries page.

Message formats

By default, changefeeds emit messages in JSON format. You can use a different format by creating a changefeed with the format option and specifying one of the following:

  • json
  • csv
  • avro
  • parquet
Note:

Changefeed types are not fully integrated with user-defined composite types. Running changefeeds with user-defined composite types is in Preview. Certain changefeed types do not support user-defined composite types. Refer to the change data capture Known Limitations for more detail.

The following sections outline the limitations and type mapping for relevant formats.

Avro

The following sections provide information on Avro usage with CockroachDB changefeeds. Creating a changefeed using Avro is available in Core and Enterprise changefeeds with the confluent_schema_registry option.

Avro limitations

Below are clarifications for particular SQL types and values for Avro changefeeds:

  • Decimals must have precision specified.
  • BYTES (or its aliases BYTEA and BLOB) are often used to store machine-readable data. When you stream these types through a changefeed with format=avro, CockroachDB does not encode or change the data. However, Avro clients can often include escape sequences to present the data in a printable format, which can interfere with deserialization. A potential solution is to hex-encode BYTES values when initially inserting them into CockroachDB. This will ensure that Avro clients can consistently decode the hexadecimal. Note that hex-encoding values at insertion will increase record size.
  • BIT and VARBIT types are encoded as arrays of 64-bit integers.

    For efficiency, CockroachDB encodes BIT and VARBIT bitfield types as arrays of 64-bit integers. That is, base-2 (binary format) BIT and VARBIT data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.

    For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be 1; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be 0 or 9223372036854775808 (i.e., the integer with only the first bit set, or 1000000000000000000000000000000000000000000000000000000000000000 when base 2).

    This example is base-10 encoded into an array as follows:

    {"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
    

    For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be 1. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.

    In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:

    {"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
    

    To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:

    [8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
    

    Next, you use the first element in the array to take the number of bits from the last base-2 element, 10111110. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.

  • A changefeed in Avro format will not be able to serialize user-defined composite (tuple) types. #102903

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type Avro Type Avro Logical Type
ARRAY ARRAY
BIT Array of LONG
BLOB BYTES
BOOL BOOLEAN
BYTEA BYTES
BYTES BYTES
COLLATE STRING
DATE INT DATE
DECIMAL STRING, BYTES DECIMAL
ENUMS STRING
FLOAT DOUBLE
INET STRING
INT LONG
INTERVAL STRING
JSONB STRING
STRING STRING
TIME LONG TIME-MICROS
TIMESTAMP LONG TIME-MICROS
TIMESTAMPTZ LONG TIME-MICROS
UUID STRING
VARBIT Array of LONG
Note:

The DECIMAL type is a union between Avro STRING and Avro DECIMAL types.

CSV

You can use the format=csv option to emit CSV format messages from your changefeed. However, there are the following limitations with this option:

Changefeeds emit the same CSV format as EXPORT. In v22.1, changefeeds emitted CSV data that wrapped some values in single quotes, which were not wrapped when exporting data with the EXPORT statement.

See Export Data with Changefeeds for detail on using changefeeds to export data from CockroachDB.

The following shows example CSV format output:

4ccccccc-cccc-4c00-8000-00000000000f,washington dc,Holly Williams,95153 Harvey Street Suite 5,2165526885
51eb851e-b851-4c00-8000-000000000010,washington dc,Ryan Hickman,21187 Dennis Village,1635328127
56242e0e-4935-4d21-a8cd-915f4002e53c,washington dc,Joshua Smith,80842 Edwards Bridge,1892482054
5707febd-0278-4e55-8715-adbb35f09759,washington dc,Preston Fisher,5603 David Mission Apt. 93,5802323725
576546de-d59c-429b-9251-be79472643d4,washington dc,Anna Underwood,81246 Lee Knoll,2838348371
596c1cf8-d59f-4ad6-9379-6aba82648ca9,washington dc,Gerald Good,59876 Wang Neck,6779715200
5d30f838-e24c-46cb-bb0c-4a5643ddc2b1,washington dc,Lawrence Lucas,67248 Robinson Way Apt. 46,6167017463
65c398b9-7cce-45c5-9a5b-9561569ae030,washington dc,Mr. Xavier Waters,85393 Diaz Camp,1783482816
7a78fb0b-d368-46f6-b530-f9c74c19ba25,washington dc,Christopher Owens,7460 Curtis Centers,1470959770
80696ab6-7ec9-4e55-afee-4f468478fe82,washington dc,Patricia Gibson,77436 Vaughn Ville,3271633253
93750763-f992-4018-8a11-bf15ebfecc06,washington dc,Alison Romero,15878 Grant Forks Suite 16,2742488244
9cc3f995-0a91-4612-a079-e81ca28257ab,washington dc,Corey Dunn,15958 Jenna Locks,2358457606
9efd7047-c5e5-4501-9fcd-cff2d27efc34,washington dc,Patricia Gray,16139 Nicholas Wells Suite 64,8935020269
a253a15c-8e0a-4d25-aa87-1a0839935005,washington dc,Samantha Lee,90429 Russell Coves,2990967825
a3081762-9841-4275-ad7a-75a7e8d5f69d,washington dc,Preston Fisher,5603 David Mission Apt. 93,5802323725
aebb80a6-eceb-4d10-9d9a-f26270188114,washington dc,Kenneth Miller,52393 Stephen Mill Apt. 7,3966083325

JSON

To distinguish between JSON NULL values and SQL NULL values in changefeed messages, you can use the encode_json_value_null_as_object option with CREATE CHANGEFEED. When you enable encode_json_value_null_as_object, JSON NULL values will emit as {"__crdb_json_null__": true}.

For example, the following test table has a primary key column and a JSONB column. The INSERT adds NULL values in each column. A changefeed without the option enabled will not distinguish between the SQL and JSON NULL values. With the encode_json_value_null_as_object option enabled, the changefeed emits the JSON NULL as {"__crdb_json_null__": true}:

icon/buttons/copy
CREATE TABLE test (id INT PRIMARY KEY, data JSONB);
icon/buttons/copy
INSERT INTO test VALUES (1, NULL), (2, 'null'::JSONB);
icon/buttons/copy
CREATE CHANGEFEED FOR TABLE test INTO 'external://sink';

Without the option enabled, it is not possible to distinguish the SQL and JSON NULL values in the emitted changefeed messages:

{"after": {"data": null, "id": 1}, "key": [1]}
{"after": {"data": null, "id": 2}, "key": [2]}
icon/buttons/copy
CREATE CHANGEFEED FOR TABLE test INTO 'external://sink' WITH encode_json_value_null_as_object;

With encode_json_value_null_as_object enabled, the changefeed will encode the JSON NULL value in the emitted messages:

{"after": {"data": null, "id": 1}, "key": [1]}
{"after": {"data": {"__crdb_json_null__": true}, "id": 2}, "key": [2]}

When encode_json_value_null_as_object is enabled, if the changefeed encounters the literal value {"__crdb_json_null__": true} in JSON, it will have the same representation as a JSON NULL value and a warning will be printed to the DEV logging channel.

See also


Yes No
On this page

Yes No