Create and Configure Changefeeds

On this page Carat arrow pointing down

Core and Enterprise changefeeds offer different levels of configurability. Enterprise changefeeds allow for active changefeed jobs to be paused, resumed, and canceled.

This page describes:

Before you create a changefeed

  1. Enable rangefeeds on CockroachDB Advanced and CockroachDB self-hosted. Refer to Enable rangefeeds for instructions.
  2. Decide on whether you will run an Enterprise or basic changefeed. Refer to the Overview page for a comparative capability table.
  3. Plan the number of changefeeds versus the number of tables to include in a single changefeed for your cluster. We recommend limiting the number of changefeeds per cluster to 80. Refer to System resources and running changefeeds and Recommendations for the number of target tables.
  4. Consider whether your Enterprise changefeed use case would be better served by change data capture queries that can filter data on a single table. CDC queries can improve the efficiency of changefeeds because the job will not need to encode as much change data.
  5. Read the following:
    • The Changefeed Best Practices reference for details on planning changefeeds, monitoring basics, and schema changes.
    • The Considerations section that provides information on changefeed interactions that could affect how you configure or run your changefeed.

Enable rangefeeds

Changefeeds connect to a long-lived request called a rangefeed, which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.

Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:

icon/buttons/copy
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Any created changefeeds will error until this setting is enabled. If you are working on a CockroachDB Serverless cluster, the kv.rangefeed.enabled cluster setting is enabled by default.

Enabling rangefeeds has a small performance cost (about a 5–10% increase in write latencies), whether or not the rangefeed is being used in a changefeed. When kv.rangefeed.enabled is set to true, a small portion of the latency cost is caused by additional write event information that is sent to the Raft log and for replication. The remainder of the latency cost is incurred once a changefeed is running; the write event information is reconstructed and sent to an active rangefeed, which will push the event to the changefeed.

For further detail on performance-related configuration, refer to the Advanced Changefeed Confguration page.

Note:

MuxRangefeed is a subsystem that improves the performance of rangefeeds with scale, which is enabled by default in v24.1 and later versions.

Considerations

  • If you require resolved message frequency under 30s, then you must set the min_checkpoint_frequency option to at least the desired resolved frequency.
  • Many DDL queries (including TRUNCATE, DROP TABLE, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed. If a table is truncated that a changefeed with on_error='pause' is watching, you will also need to start a new changefeed. Refer to the change data capture Known Limitations for more detail.
  • Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
  • When an IMPORT INTO statement is run, any current changefeed jobs targeting that table will fail.
  • After you restore from a full-cluster backup, changefeed jobs will not resume on the new cluster. It is necessary to manually create the changefeeds following the full-cluster restore.
  • As of v22.1, changefeeds filter out VIRTUAL computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions where NULL values are emitted for virtual computed columns, see the virtual_columns option for more detail.

The following Enterprise and Core sections outline how to create and configure each type of changefeed:

Configure a changefeed

An Enterprise changefeed streams row-level changes in a configurable format to one of the following sinks:

You can create, pause, resume, and cancel an Enterprise changefeed. For a step-by-step example connecting to a specific sink, see the Changefeed Examples page.

Create

To create an Enterprise changefeed:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE table_name, table_name2 INTO '{scheme}://{host}:{port}?{query_parameters}';
Note:

Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

When you create a changefeed without specifying a sink, CockroachDB sends the changefeed events to the SQL client. Consider the following regarding the display format in your SQL client:

  • If you do not define a display format, the CockroachDB SQL client will automatically use ndjson format.
  • If you specify a display format, the client will use that format (e.g., --format=csv).
  • If you set the client display format to ndjson and set the changefeed format to csv, you'll receive JSON format with CSV nested inside.
  • If you set the client display format to csv and set the changefeed format to json, you'll receive a comma-separated list of JSON values.

For more information, see CREATE CHANGEFEED.

Show

To show a list of Enterprise changefeed jobs:

icon/buttons/copy
SHOW CHANGEFEED JOBS;
    job_id             |                                                                                   description                                                                  | ...
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+ ...
  685724608744325121   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
  685723987509116929   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
(2 rows)

To show an individual Enterprise changefeed:

icon/buttons/copy
SHOW CHANGEFEED JOB {job_id};
        job_id       |                                     description                                      | user_name | status  |              running_status              |          created           |          started           | finished |          modified          |      high_water_timestamp      | error |    sink_uri    |  full_table_names   | topics | format
---------------------+--------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+----------------+---------------------+--------+----------
  866218332400680961 | CREATE CHANGEFEED FOR TABLE movr.users INTO 'external://aws' WITH format = 'parquet' | root      | running | running: resolved=1684438482.937939878,0 | 2023-05-18 14:14:16.323465 | 2023-05-18 14:14:16.360245 | NULL     | 2023-05-18 19:35:16.120407 | 1684438482937939878.0000000000 |       | external://aws | {movr.public.users} | NULL   | parquet
(1 row)

New in v24.3: SHOW CHANGEFEED JOBS will return all changefeed jobs from the last 12 hours. For more information on the retention of job details, refer to the Response section.

You can filter the columns that SHOW CHANGEFEED JOBS displays using a SELECT statement:

icon/buttons/copy
SELECT job_id, sink_uri, status, format FROM [SHOW CHANGEFEED JOBS] WHERE job_id = 997306743028908033;
        job_id       |    sink_uri      | status   | format
---------------------+------------------+----------+---------
  997306743028908033 | external://kafka | running  | json

For more information, refer to SHOW CHANGEFEED JOB.

Pause

To pause an Enterprise changefeed:

icon/buttons/copy
PAUSE JOB job_id;

For more information, refer to PAUSE JOB.

Resume

To resume a paused Enterprise changefeed:

icon/buttons/copy
RESUME JOB job_id;

For more information, refer to RESUME JOB.

Cancel

To cancel an Enterprise changefeed:

icon/buttons/copy
CANCEL JOB job_id;

For more information, refer to CANCEL JOB.

Modify a changefeed

To modify an Enterprise changefeed, pause the job and then use:

ALTER CHANGEFEED job_id {ADD table DROP table SET option UNSET option};

You can add new table targets, remove them, set new changefeed options, and unset them.

For more information, see ALTER CHANGEFEED.

Configuring all changefeeds

It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.

To pause all running changefeeds:

icon/buttons/copy
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));

This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.

To resume all running changefeeds:

icon/buttons/copy
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));

This will resume the changefeeds and update the status for each of the changefeeds to running.

Create a changefeed

A basic changefeed streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled.

To create a basic changefeed:

icon/buttons/copy
EXPERIMENTAL CHANGEFEED FOR table_name;

For more information, see EXPERIMENTAL CHANGEFEED FOR.

Known limitations

  • Changefeed target options are limited to tables and column families. #73435
  • VPC Peering and AWS PrivateLink in CockroachDB Advanced clusters do not support connecting to a Kafka sink's internal IP addresses for changefeeds. To connect to a Kafka sink from CockroachDB Advanced, it is necessary to expose the Kafka cluster's external IP address and open ports with firewall rules to allow access from a CockroachDB Advanced cluster.

  • Webhook sinks only support HTTPS. Use the insecure_tls_skip_verify parameter when testing to disable certificate verification; however, this still requires HTTPS and certificates. #73431

  • Formats for changefeed messages are not supported by all changefeed sinks. Refer to the Changefeed Sinks page for details on compatible formats with each sink and the format option to specify a changefeed message format. #73432

  • Using the split_column_families and resolved options on the same changefeed will cause an error when using the following sinks: Kafka and Google Cloud Pub/Sub. Instead, use the individual FAMILY keyword to specify column families when creating a changefeed. #79452

  • Changefeed types are not fully integrated with user-defined composite types. Running changefeeds with user-defined composite types is in Preview. Certain changefeed types do not support user-defined composite types. Refer to the change data capture Known Limitations for more detail. The following limitations apply:

  • ALTER CHANGEFEED is not fully supported with changefeeds that use CDC queries. You can alter the options that a changefeed uses, but you cannot alter the changefeed target tables. #83033

  • Creating a changefeed with CDC queries on tables with more than one column family is not supported. #127761

  • When you create a changefeed on a table with more than one column family , the changefeed will emit messages per column family in separate streams. As a result, changefeed messages for different column families will arrive at the sink under separate topics. #127736

  • Changefeeds created in v24.3 of CockroachDB that emit to Kafka, or changefeeds created in earlier versions with the changefeed.new_kafka_sink.enabled cluster setting enabled, do not support negative compression level values for GZIP compression in the kafka_sink_config = {... "CompressionLevel" = ...} option field. #136492

See also


Yes No
On this page

Yes No