CREATE CHANGEFEED
is an Enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR
.
The CREATE CHANGEFEED
statement creates a new Enterprise changefeed, which targets an allowlist of tables, called "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON
or Avro) to a configurable sink (Kafka or a cloud storage sink). You can create, pause, resume, or cancel an Enterprise changefeed.
For more information, see Stream Data Out of CockroachDB Using Changefeeds.
Required privileges
To create a changefeed, the user must be a member of the admin
role or have the CREATECHANGEFEED
parameter set.
Synopsis
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. Note: Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables. |
sink |
The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below. |
option / value |
For a list of available options and their values, see Options below. |
Sink URI
The sink URI follows the basic format of:
'{scheme}://{host}:{port}?{query_parameters}'
URI Component | Description |
---|---|
scheme |
The type of sink: kafka or any cloud storage sink. |
host |
The sink's hostname or IP address. |
port |
The sink's port. |
query_parameters |
The sink's query parameters. |
Kafka
Example of a Kafka sink URI:
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'
Cloud storage sink
Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.
Currently, cloud storage sinks only work with JSON
and emits newline-delimited JSON
files.
Example of a cloud storage sink URI:
`experimental-s3://acme-co/employees?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456`
Cloud storage sink URIs must be prepended with experimental-
when working with changefeeds. For more information on the sink URI structure, see Use Cloud Storage for Bulk Operations.
Query parameters
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
Query parameters include:
Parameter | Sink Type |
Type |
Description |
---|---|---|---|
topic_prefix |
Kafka, cloud | STRING |
Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo . |
tls_enabled |
Kafka | BOOL |
If true , enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert . Default: false |
ca_cert |
Kafka | STRING |
The base64-encoded ca_cert file.Note: To encode your ca.cert , run base64 -w 0 ca.cert . |
client_cert |
Kafka | STRING |
The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key . |
client_key |
Kafka | STRING |
The base64-encoded private key for the PEM certificate. This is used with client_cert . |
sasl_enabled |
Kafka | BOOL |
If true , use SASL/PLAIN to authenticate. This requires a sasl_user and sasl_password . Default: false |
sasl_user |
Kafka | STRING |
Your SASL username. |
sasl_password |
Kafka | STRING |
Your SASL password. |
file_size |
cloud | STRING |
The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolved option, which flushes on a specified cadence. Default: 16MB |
insecure_tls_skip_verify |
Kafka | BOOL |
New in v20.2: If true , disable client-side validation of responses. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication. Default: false |
Options
Option | Value | Description |
---|---|---|
updated |
N/A | Include updated timestamps with each row. If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid. |
resolved |
Duration string | Emits resolved timestamp events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s' . This option will only emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If unspecified, all resolved timestamps are emitted as the high-water mark advances. |
envelope |
key_only / wrapped |
Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=wrapped |
cursor |
Timestamp | Emit any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.When starting a changefeed at a specific cursor , the cursor cannot be before the configured garbage collection window (see gc.ttlseconds ) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' |
format |
json / experimental_avro |
Format of the emitted record. Currently, support for Avro is limited and experimental. For mappings of CockroachDB types to Avro types, see the table below. Default: format=json |
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use experimental_avro . |
key_in_value |
N/A | Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_value is automatically used for these sinks (currently only cloud storage sinks). |
diff |
N/A | Publish a before field with each message, which includes the value of the row before the update was applied. |
compression |
gzip |
Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression. |
protect_data_from_gc_on_pause |
N/A | When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected. Note: If you use this option, changefeeds left paused can prevent garbage collection for long periods of time. |
schema_change_events |
default / column_changes |
The type of schema change event that triggers the behavior specified by the schema_change_policy option:
Default: schema_change_events=default |
schema_change_policy |
backfill / nobackfill / stop |
The behavior to take when an event specified by the schema_change_events option occurs:
Default: schema_change_policy=backfill |
initial_scan / no_initial_scan |
N/A | Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan and no_initial_scan cannot be used simultaneously. If neither initial_scan nor no_initial_scan is specified, an initial scan will occur if there is no cursor , and will not occur if there is one. This preserves the behavior from previous releases.Default: initial_scan If used in conjunction with cursor , an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now() . |
full_table_name |
N/A | Use fully-qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo... WITH full_table_name will create the topic name defaultdb.public.foo instead of foo . |
avro_schema_prefix |
Schema prefix name | Provide a namespace for the schema of a table in addition to the default, the table name. This allows multiple databases or clusters to share the same schema registry when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo WITH format=experimental_avro, confluent_schema_registry='registry_url', avro_schema_prefix='super' will register subjects as superfoo-key and superfoo-value with the namespace super . |
Using the format=experimental_avro
, envelope=key_only
, and updated
options together is rejected. envelope=key_only
prevents any rows with updated fields from being emitted, which makes the updated
option meaningless.
Avro limitations
Currently, support for Avro is limited and experimental. Below is a list of unsupported SQL types and values for Avro changefeeds:
- Decimals must have precision specified.
INTERVAL
,ARRAY
,BIT
, and collatedSTRING
are not supported in Avro yet.
Avro types
Below is a mapping of CockroachDB types to Avro types:
CockroachDB Type | Avro Type | Avro Logical Type |
---|---|---|
INT |
LONG |
|
BOOL |
BOOLEAN |
|
FLOAT |
DOUBLE |
|
STRING |
STRING |
|
DATE |
INT |
DATE |
TIME |
LONG |
TIME-MICROS |
TIMESTAMP |
LONG |
TIME-MICROS |
TIMESTAMPTZ |
LONG |
TIME-MICROS |
DECIMAL |
BYTES |
DECIMAL |
UUID |
STRING |
|
INET |
STRING |
|
JSONB |
STRING |
Topic Naming
By default, a Kafka topic has the same name as the table that a changefeed was created on. If a changefeed was created on multiple tables, the changefeed will write to multiple topics corresponding to those table names. You can specify a topic prefix or use the full_table_name
option to modify this.
You can either manually create a topic in your Kafka cluster before starting the changefeed, or the topic will be automatically created when the changefeed connects to your Kafka cluster.
You must have the Kafka cluster setting auto.create.topics.enable
set to true
for automatic topic creation. This will create the topic when the changefeed sends its first message. If you create the consumer before that, you will also need the Kafka consumer configuration allow.auto.create.topics
to be set to true
.
Kafka has the following topic limitations:
- Legal characters are numbers, letters, and
[._-]
. - The maximum character length of a topic name is 249.
- Topics with a period (
.
) and underscore (_
) can collide on internal Kafka data structures, so you should use either but not both. - Characters not accepted by Kafka will be automatically encoded as unicode characters by CockroachDB.
Responses
Messages
The messages (i.e., keys and values) emitted to a sink are specific to the envelope
. The default format is wrapped
, and the output messages are composed of the following:
- Key: An array always composed of the row's
PRIMARY KEY
field(s) (e.g.,[1]
forJSON
or{"id":{"long":1}}
for Avro). - Value:
- One of three possible top-level fields:
after
, which contains the state of the row after the update (ornull
' forDELETE
s).updated
, which contains the updated timestamp.resolved
, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
- For
INSERT
andUPDATE
, the current state of the row inserted or updated. - For
DELETE
,null
.
- One of three possible top-level fields:
For example:
Statement | Response |
---|---|
INSERT INTO office_dogs VALUES (1, 'Petee'); |
JSON: [1] {"after": {"id": 1, "name": "Petee"}} Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}} |
DELETE FROM office_dogs WHERE name = 'Petee' |
JSON: [1] {"after": null} Avro: {"id":{"long":1}} {"after":null} |
Files
The files emitted to a sink use the following naming conventions:
The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL
.
General file format
/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]
For example:
/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
Resolved file format
/[date]/[timestamp].RESOLVED
For example:
/2020-04-04/202004042351304139680000000000000.RESOLVED
Examples
Create a changefeed connected to Kafka
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka://host:port'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to Kafka, see Stream Data Out of CockroachDB Using Changefeeds.
Create a changefeed connected to Kafka using Avro
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka://host:port'
WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see Stream Data Out of CockroachDB Using Changefeeds.
Create a changefeed connected to a cloud storage sink
This is an experimental feature. The interface and output are subject to change.
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'experimental-scheme://host?parameters'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to a cloud storage sink, see Stream Data Out of CockroachDB Using Changefeeds.
Manage a changefeed
Use the following SQL statements to pause, resume, and cancel a changefeed.
Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED
) will be added in the future.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB
.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB
.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB
.
Configuring all changefeeds
It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.
To pause all running changefeeds:
PAUSE JOBS (SELECT job_id FROM [SHOW JOBS] WHERE job_type='CHANGEFEED' AND status IN ('running'));
This will change the status for each of the running changefeeds to paused
, which can be verified with SHOW JOBS
.
To resume all running changefeeds:
RESUME JOBS (SELECT job_id FROM [SHOW JOBS] WHERE job_type='CHANGEFEED' AND status IN ('paused'));
This will resume the changefeeds and update the status for each of the changefeeds to running
.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
job_id | job_type | ... | high_water_timestamp | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 | | 1
(1 row)
Use the high_water_timestamp
to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka//host:port'
WITH cursor = '<high_water_timestamp>';
Note that because the cursor is provided, the initial scan is not performed.