Create and Configure Changefeeds

On this page Carat arrow pointing down

Core and Enterprise changefeeds offer different levels of configurability. Enterprise changefeeds allow for active changefeed jobs to be paused, resumed, and canceled.

This page describes:

Before you create a changefeed

  1. Enable rangefeeds on CockroachDB Advanced and CockroachDB self-hosted. Refer to Enable rangefeeds for instructions.
  2. Decide on whether you will run an Enterprise or basic changefeed. Refer to the Overview page for a comparative capability table.
  3. Plan the number of changefeeds versus the number of tables to include in a single changefeed for your cluster. We recommend limiting the number of changefeeds per cluster to 80. Refer to System resources and running changefeeds and Recommendations for the number of target tables.
  4. Consider whether your Enterprise changefeed use case would be better served by change data capture queries that can filter data on a single table. CDC queries can improve the efficiency of changefeeds because the job will not need to encode as much change data.
  5. Read the Considerations section that provides information on changefeed interactions that could affect how you configure or run your changefeed.

Enable rangefeeds

Changefeeds connect to a long-lived request called a rangefeed, which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.

Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:

icon/buttons/copy
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Any created changefeeds will error until this setting is enabled. If you are working on a CockroachDB Serverless cluster, the kv.rangefeed.enabled cluster setting is enabled by default.

Enabling rangefeeds has a small performance cost (about a 5–10% increase in write latencies), whether or not the rangefeed is being used in a changefeed. When kv.rangefeed.enabled is set to true, a small portion of the latency cost is caused by additional write event information that is sent to the Raft log and for replication. The remainder of the latency cost is incurred once a changefeed is running; the write event information is reconstructed and sent to an active rangefeed, which will push the event to the changefeed.

The kv.closed_timestamp.target_duration cluster setting can be used with changefeeds. Resolved timestamps will always be behind by at least the duration configured by this setting. However, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance. Refer to the Advanced Changefeed Confguration for more detail.

Mux rangefeeds

MuxRangefeed is a subsystem that improves the performance of rangefeeds with scale. It significantly reduces the overhead of running rangefeeds. Without MuxRangefeed enabled the number of RPC streams is proportional with the number of ranges in a table. For example, a large table could have tens of thousands of ranges. With MuxRangefeed enabled, this proportion improves so that the number of RPC streams is relative to the number of nodes in a cluster.

We recommend enabling its functionality with the changefeed.mux_rangefeed.enabled cluster setting. In v24.1 and later versions, MuxRangefeed is enabled by default.

Use the following workflow to enable MuxRangefeed:

  1. Enable the cluster setting:

    icon/buttons/copy
    SET CLUSTER SETTING changefeed.mux_rangefeed.enabled = true;
    
  2. After enabling the setting, pause the changefeed:

    icon/buttons/copy
    PAUSE JOB {job ID};
    

    You can use SHOW CHANGEFEED JOBS to retrieve the job ID.

  3. Resume the changefeed for the cluster setting to take effect:

    icon/buttons/copy
    RESUME JOB {job ID};
    

Recommendations for the number of target tables

When creating a changefeed, it's important to consider the number of changefeeds versus the number of tables to include in a single changefeed:

  • Changefeeds each have their own memory overhead, so every running changefeed will increase total memory usage.
  • Creating a single changefeed that will watch hundreds of tables can affect the performance of a changefeed by introducing coupling, where the performance of a target table affects the performance of the changefeed watching it. For example, any schema change on any of the tables will affect the entire changefeed's performance.

To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables. However, we do not recommend creating a single changefeed for watching hundreds of tables.

Cockroach Labs recommends monitoring your changefeeds to track retryable errors and protected timestamp usage. Refer to the Monitor and Debug Changefeeds page for more information.

System resources and running changefeeds

When you are running more than 10 changefeeds on a cluster, it is important to monitor the CPU usage. A larger cluster will be able to run more changefeeds concurrently compared to a smaller cluster with more limited resources.

Note:

We recommend limiting the number of changefeeds per cluster to 80.

To maintain a high number of changefeeds in your cluster:

  • Connect to different nodes to create each changefeed. The node on which you start the changefeed will become the coordinator node for the changefeed job. The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. As a result, this node will use more resources for the changefeed job. Refer to How does an Enterprise changefeed work? for more detail.
  • Consider logically grouping the target tables into one changefeed. When a changefeed pauses, it will stop emitting messages for the target tables. Grouping tables of related data into a single changefeed may make sense for your workload. However, we do not recommend watching hundreds of tables in a single changefeed. Refer to Garbage collection and changefeeds for more detail on protecting data from garbage collection when a changefeed is paused.

Considerations

  • If you require resolved message frequency under 30s, then you must set the min_checkpoint_frequency option to at least the desired resolved frequency.
  • Many DDL queries (including TRUNCATE, DROP TABLE, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed. If a table is truncated that a changefeed with on_error='pause' is watching, you will also need to start a new changefeed. Refer to the change data capture Known Limitations for more detail.
  • Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
  • When an IMPORT INTO statement is run, any current changefeed jobs targeting that table will fail.
  • After you restore from a full-cluster backup, changefeed jobs will not resume on the new cluster. It is necessary to manually create the changefeeds following the full-cluster restore.
  • As of v22.1, changefeeds filter out VIRTUAL computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions where NULL values are emitted for virtual computed columns, see the virtual_columns option for more detail.

The following Enterprise and Core sections outline how to create and configure each type of changefeed:

Configure a changefeed

An Enterprise changefeed streams row-level changes in a configurable format to a configurable sink (i.e., Kafka or a cloud storage sink). You can create, pause, resume, and cancel an Enterprise changefeed. For a step-by-step example connecting to a specific sink, see the Changefeed Examples page.

Create

To create an Enterprise changefeed:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE table_name, table_name2 INTO '{scheme}://{host}:{port}?{query_parameters}';
Note:

Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

When you create a changefeed without specifying a sink, CockroachDB sends the changefeed events to the SQL client. Consider the following regarding the display format in your SQL client:

  • If you do not define a display format, the CockroachDB SQL client will automatically use ndjson format.
  • If you specify a display format, the client will use that format (e.g., --format=csv).
  • If you set the client display format to ndjson and set the changefeed format to csv, you'll receive JSON format with CSV nested inside.
  • If you set the client display format to csv and set the changefeed format to json, you'll receive a comma-separated list of JSON values.

For more information, see CREATE CHANGEFEED.

Show

To show a list of Enterprise changefeed jobs:

icon/buttons/copy
SHOW CHANGEFEED JOBS;
    job_id             |                                                                                   description                                                                  | ...
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+ ...
  685724608744325121   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
  685723987509116929   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
(2 rows)

To show an individual Enterprise changefeed:

icon/buttons/copy
SHOW CHANGEFEED JOB {job_id};
        job_id       |                                     description                                      | user_name | status  |              running_status              |          created           |          started           | finished |          modified          |      high_water_timestamp      | error |    sink_uri    |  full_table_names   | topics | format
---------------------+--------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+----------------+---------------------+--------+----------
  866218332400680961 | CREATE CHANGEFEED FOR TABLE movr.users INTO 'external://aws' WITH format = 'parquet' | root      | running | running: resolved=1684438482.937939878,0 | 2023-05-18 14:14:16.323465 | 2023-05-18 14:14:16.360245 | NULL     | 2023-05-18 19:35:16.120407 | 1684438482937939878.0000000000 |       | external://aws | {movr.public.users} | NULL   | parquet
(1 row)

All changefeed jobs will display regardless of if the job completed and when it completed. You can define a retention time and delete completed jobs by using the jobs.retention_time cluster setting.

You can filter the columns that SHOW CHANGEFEED JOBS displays using a SELECT statement:

icon/buttons/copy
SELECT job_id, sink_uri, status, format FROM [SHOW CHANGEFEED JOBS] WHERE job_id = 997306743028908033;
        job_id       |    sink_uri      | status   | format
---------------------+------------------+----------+---------
  997306743028908033 | external://kafka | running  | json

For more information, refer to SHOW CHANGEFEED JOB.

Pause

To pause an Enterprise changefeed:

icon/buttons/copy
PAUSE JOB job_id;

For more information, refer to PAUSE JOB.

Resume

To resume a paused Enterprise changefeed:

icon/buttons/copy
RESUME JOB job_id;

For more information, refer to RESUME JOB.

Cancel

To cancel an Enterprise changefeed:

icon/buttons/copy
CANCEL JOB job_id;

For more information, refer to CANCEL JOB.

Modify a changefeed

To modify an Enterprise changefeed, pause the job and then use:

ALTER CHANGEFEED job_id {ADD table DROP table SET option UNSET option};

You can add new table targets, remove them, set new changefeed options, and unset them.

For more information, see ALTER CHANGEFEED.

Configuring all changefeeds

It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.

To pause all running changefeeds:

icon/buttons/copy
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));

This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.

To resume all running changefeeds:

icon/buttons/copy
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));

This will resume the changefeeds and update the status for each of the changefeeds to running.

Create a changefeed

A basic changefeed streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled.

To create a basic changefeed:

icon/buttons/copy
EXPERIMENTAL CHANGEFEED FOR table_name;

For more information, see EXPERIMENTAL CHANGEFEED FOR.

Known limitations

See also


Yes No
On this page

Yes No