Enterprise changefeeds emit messages to configurable downstream sinks. This page details the URIs, parameters, and configurations available for each changefeed sink.
CockroachDB supports the following sinks:
- Amazon MSK
- Apache Pulsar (in Preview)
- Azure Event Hubs
- Cloud Storage / HTTP
- Confluent Cloud
- Google Cloud Pub/Sub
- Kafka
- Webhook
For a guide to using changefeeds with Amazon Simple Notification Service (Amazon SNS), refer to this blog that uses the webhook sink with Amazon API Gateway to publish messages to Amazon SNS.
The CREATE CHANGEFEED
page provides detail on using the SQL statement and a complete list of the query parameters and options available when setting up a changefeed.
For a step-by-step example connecting a changefeed to a sink, see the Changefeed Examples page.
Sink URI
The sink URI follows the basic format of:
'{scheme}://{host}:{port}?{query_parameters}'
URI Component | Description |
---|---|
scheme |
The type of sink: kafka , gcpubsub , any cloud storage sink, or webhook sink. |
host |
The sink's hostname or IP address. |
port |
The sink's port. |
query_parameters |
The sink's query parameters. |
You can create an external connection to represent a changefeed sink URI. This allows you to specify the external connection's name in statements rather than the provider-specific URI. For detail on using external connections, see the CREATE EXTERNAL CONNECTION
page.
To set a different sink URI to an existing changefeed, use the sink
option with ALTER CHANGEFEED
.
Cockroach Labs recommends enabling Egress Perimeter Controls on CockroachDB Advanced clusters to mitigate the risk of data exfiltration when accessing external resources, such as cloud storage for change data capture or backup and restore operations. See Egress Perimeter Controls for detail and setup instructions.
Kafka
Kafka sink connection
Example of a Kafka sink URI using SCRAM-SHA-256
authentication:
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256'
Example of a Kafka sink URI using OAUTHBEARER
authentication:
'kafka://{kafka cluster address}:9093?topic_name={vehicles}&sasl_client_id={your client ID}&sasl_client_secret={your base64-encoded client secret}&sasl_enabled=true&sasl_mechanism=OAUTHBEARER&sasl_token_url={your token URL}'
OAuth 2.0 authentication uses credentials managed by a third-party provider (IdP) to authenticate with Kafka instead of requiring you to provide your Kafka cluster credentials directly in a CREATE CHANGEFEED
statement. Your provider's authentication server will issue a temporary token, giving you flexibility to apply access rules on the credentials that your IdP provides.
To authenticate to Kafka with OAuth using Okta, see the Connect to a Changefeed Kafka sink with OAuth Using Okta tutorial.
VPC Peering and AWS PrivateLink in CockroachDB Advanced clusters do not support connecting to a Kafka sink's internal IP addresses for changefeeds. To connect to a Kafka sink from CockroachDB Advanced, it is necessary to expose the Kafka cluster's external IP address and open ports with firewall rules to allow access from a CockroachDB Advanced cluster.
The following table lists the available parameters for Kafka URIs:
URI Parameter | Description |
---|---|
topic_name |
The topic name to which messages will be sent. See the following section on Topic Naming for detail on how topics are created. |
topic_prefix |
Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo . |
tls_enabled |
If true , enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below). Default: false |
ca_cert |
The base64-encoded ca_cert file. Specify ca_cert for a Kafka sink. Note: To encode your ca.cert , run base64 -w 0 ca.cert . |
client_cert |
The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key . |
client_key |
The base64-encoded private key for the PEM certificate. This is used with client_cert .Note: Client keys are often encrypted. You will receive an error if you pass an encrypted client key in your changefeed statement. To decrypt the client key, run: openssl rsa -in key.pem -out key.decrypt.pem -passin pass:{PASSWORD} . Once decrypted, be sure to update your changefeed statement to use the new key.decrypt.pem file instead. |
sasl_client_id |
Client ID for OAuth authentication from a third-party provider. This parameter is only applicable with sasl_mechanism=OAUTHBEARER . |
sasl_client_secret |
Client secret for OAuth authentication from a third-party provider. This parameter is only applicable with sasl_mechanism=OAUTHBEARER . Note: You must base64 encode this value when passing it in as part of a sink URI. |
sasl_enabled |
If true , the authentication protocol can be set to SCRAM or PLAIN using the sasl_mechanism parameter. You must have tls_enabled set to true to use SASL. Default: false |
sasl_grant_type |
Override the default OAuth client credentials grant type for other implementations. This parameter is only applicable with sasl_mechanism=OAUTHBEARER . |
sasl_mechanism |
Can be set to OAUTHBEARER , SCRAM-SHA-256 , SCRAM-SHA-512 , or PLAIN . A sasl_user and sasl_password are required. Default: PLAIN |
sasl_scopes |
A list of scopes that the OAuth token should have access for. This parameter is only applicable with sasl_mechanism=OAUTHBEARER . |
sasl_token_url |
Client token URL for OAuth authentication from a third-party provider. This parameter is only applicable with sasl_mechanism=OAUTHBEARER . Note: You must URL encode this value before passing in a URI. |
sasl_user |
Your SASL username. |
sasl_password |
Your SASL password |
insecure_tls_skip_verify |
If true , disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication. Default: false |
This table shows the parameters for changefeeds to a specific sink. The CREATE CHANGEFEED
page provides a list of all the available options.
Topic naming
By default, a Kafka topic has the same name as the table on which a changefeed was created. If you create a changefeed on multiple tables, the changefeed will write to multiple topics corresponding to those table names. When you run CREATE CHANGEFEED
to a Kafka sink, the output will display the job ID as well as the topic name(s) that the changefeed will emit to.
To modify the default topic naming, you can specify a topic prefix, an arbitrary topic name, or use the full_table_name
option. Using the topic_name
parameter, you can specify an arbitrary topic name and feed all tables into that topic.
You can either manually create a topic in your Kafka cluster before starting the changefeed, or the topic will be automatically created when the changefeed connects to your Kafka cluster.
You must have the Kafka cluster setting auto.create.topics.enable
set to true
for automatic topic creation. This will create the topic when the changefeed sends its first message. If you create the consumer before that, you will also need the Kafka consumer configuration allow.auto.create.topics
to be set to true
.
Kafka has the following topic limitations:
- Legal characters are numbers, letters, and
[._-]
. - The maximum character length of a topic name is 249.
- Topics with a period (
.
) and underscore (_
) can collide on internal Kafka data structures, so you should use either but not both. - Characters not accepted by Kafka will be automatically encoded as unicode characters by CockroachDB.
Kafka sink configuration
You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following:
Set the
changefeed.sink_io_workers
cluster setting to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you setchangefeed.sink_io_workers
, it will not affect running changefeeds; pause the changefeed, setchangefeed.sink_io_workers
, and then resume the changefeed.changefeed.sink_io_workers
will also affect changefeeds running to Google Cloud Pub/Sub sinks and webhook sinks.Note:changefeed.sink_io_workers
only applies to Kafka sinks created in v24.2.1+, or if thechangefeed.new_kafka_sink.enabled
cluster setting has been enabled in CockroachDB clusters running v23.2.10+ and v24.1.4+.The
kafka_sink_config
option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.
Each of the following settings have significant impact on a changefeed's behavior, such as latency. For example, it is possible to configure batching parameters to be very high, which would negatively impact changefeed latency. As a result it would take a long time to see messages coming through to the sink. Also, large batches may be rejected by the Kafka server unless it's separately configured to accept a high max.message.bytes
.
kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}'
Using the default values or not setting fields in kafka_sink_config
will mean that changefeed messages emit immediately.
The configurable fields are as follows:
Field | Type | Description | Default |
---|---|---|---|
"ClientID" |
STRING |
Applies a Kafka client ID per changefeed. Configure quotas within your Kafka configuration that apply to a unique client ID. The ClientID field can only contain the characters A-Za-z0-9._- . For more details, refer to ClientID . |
"" |
"Compression" |
STRING |
Sets a compression protocol that the changefeed should use when emitting events. The possible values are: "NONE" , "GZIP" , "SNAPPY" , "LZ4" , "ZSTD" . |
"NONE" |
"CompressionLevel" |
INT |
Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to CompressionLevel .Note: If you have the changefeed.new_kafka_sink.enabled cluster setting disabled, CompressionLevel will not affect LZ4 compression. SNAPPY does not support CompressionLevel . |
Refer to CompressionLevel |
"Flush"."Bytes" |
INT |
When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | 0 |
"Flush"."Frequency" |
Duration string | When this amount of time has passed since the first received message in the batch without it flushing, it should be flushed. For more details, refer to Flush . |
"0s" |
"Flush"."MaxMessages" |
INT |
Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. For more details, refer to Flush . |
1000 |
"Flush"."Messages" |
INT |
Configures the number of messages the changefeed should batch before flushing. | 0 |
"RequiredAcks" |
STRING |
Specifies what a successful write to Kafka is. CockroachDB guarantees at least once delivery of messages — this value defines the delivery. The possible values are: ONE , NONE , ALL . For details on each value, refer to RequiredAcks . |
"ONE" |
"Version" |
STRING |
Sets the appropriate Kafka cluster version, which can be used to connect to Kafka versions < v1.0 (kafka_sink_config='{"Version": "0.8.2.0"}' ). |
"1.0.0.0" |
ClientID
Implement a Kafka resource usage limit per changefeed by setting a client ID and Kafka quota. You can set the quota for the client ID in your Kafka server's configuration:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name client-changefeed-1
When you create a changefeed, include the "ClientID"
field with the unique client ID (e.g., kafka_client_ID_1
) you have configured in your Kafka server configuration. This will subject the changefeed to the Kafka quota applied to that client ID. We recommend tracking the changefeed.kafka_throttling_hist_nanos
metric to monitor the time spent throttling due to changefeed messages exceeding Kafka quotas.
For details on setting quotas to client IDs, refer to the Kafka documentation.
CompressionLevel
The CompressionLevel
field allows you to implement a level of compression for your set Compression
protocol. CompressionLevel
determines the level of the compression ratio versus the compression speed. That is, how much the data is reduced for better compression and how quickly the compression is completed for faster compression. The compression protocols support the following values:
GZIP
:0
: No compression1
to9
: From fastest compression to best compression
The default compression level for
GZIP
is-1
; however, theCompressionLevel
field does not support manually set negative values. For more details, refer to Known Limitations.ZSTD
:1
: Fastest compression2
: Default compression3
: Better compression4
: Best compression
LZ4
: The supported values from fastest compression to best compression:0
: Fastest compression (Default)512
1024
2048
4096
8192
16384
32768
65536
131072
: Best compression
If you have the
changefeed.new_kafka_sink.enabled
cluster setting disabled,CompressionLevel
will not affectLZ4
compression.SNAPPY
does not support theCompressionLevel
field.
Flush
"Flush"."MaxMessages"
and "Flush"."Frequency"
are configurable batching parameters depending on latency and throughput needs. For example, if "MaxMessages"
is set to 1000 and "Frequency"
to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a "1s"
frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker.
RequiredAcks
The RequiredAcks
field defines what a successful write to Kafka is. The possible values are:
"ONE"
: A write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails."NONE"
: No Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency."ALL"
: A quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. You must also setacks
toALL
in your server-side Kafka configuration for this to provide high durability delivery.
Kafka sink messages
The following shows the Avro messages for a changefeed emitting to Kafka:
{
"after":{
"users":{
"name":{
"string":"Michael Clark"
},
"address":{
"string":"85957 Ashley Junctions"
},
"credit_card":{
"string":"4144089313"
},
"id":{
"string":"d84cf3b6-7029-4d4d-aa81-e5caa9cce09e"
},
"city":{
"string":"seattle"
}
}
},
"updated":{
"string":"1659643584586630201.0000000000"
}
}
{
"after":{
"users":{
"address":{
"string":"17068 Christopher Isle"
},
"credit_card":{
"string":"6664835435"
},
"id":{
"string":"11b99275-92ce-4244-be61-4dae21973f87"
},
"city":{
"string":"amsterdam"
},
"name":{
"string":"John Soto"
}
}
},
"updated":{
"string":"1659643585384406152.0000000000"
}
}
See the Changefeed Examples page and the Stream a Changefeed to a Confluent Cloud Kafka Cluster tutorial for examples to set up a Kafka sink.
For an overview of the messages emitted from changefeeds, see the Changefeed Messages page.
Amazon MSK
On CockroachDB self-hosted clusters, you must create instances in the same VPC as the MSK or MSK Serverless cluster in order for the changefeed to authenticate successfully.
If you would like to connect a CockroachDB Dedicated cluster to an Amazon MSK cluster, contact your Cockroach Labs account team.
Changefeeds can deliver messages to Amazon MSK clusters (Amazon Managed Streaming for Apache Kafka). Amazon MSK cluster types include: MSK and MSK Serverless. Changefeeds support the following authentication methods for these MSK cluster types:
- MSK:
SCRAM
orIAM
- MSK Serverless:
IAM
Changefeeds can deliver messages to MSK and MSK Serverless clusters using AWS IAM roles.
For initial setup guides, refer to the AWS documentation:
Changefeeds connecting to Amazon MSK clusters use the kafka://
scheme. The example URIs show the necessary parameters for MSK and MSK Serverless clusters depending on the authentication type:
To connect to an MSK cluster using
SCRAM
authentication, you must include the following parameters in the URI:kafka://{cluster_endpoint}/?tls_enabled=true&sasl_enabled=true&sasl_mechanism=SCRAM-SHA-512&sasl_user={user}&sasl_password={password}
For SCRAM authentication, add your SASL username and password to the URI.
To connect to an MSK or MSK Serverless cluster using AWS IAM roles, you must include the following parameters in the URI:
kafka://{cluster_endpoint}/?tls_enabled=true&sasl_enabled=true&sasl_mechanism=AWS_MSK_IAM&sasl_aws_region={region}&sasl_aws_iam_role_arn={arn}&sasl_aws_iam_session_name={your_session_name}
For IAM authentication, add the MSK cluster region, IAM role ARN, and session name to the URI.
This table outlines the available parameters for Amazon MSK URIs:
URI Parameter | Description |
---|---|
cluster_endpoint |
The endpoint listed for your Amazon MSK cluster in the AWS Console. For example, boot-a1test.c3.kafka-serverless.us-east-2.amazonaws.com:9098 . |
sasl_aws_iam_role_arn |
The ARN for the IAM role that has the permissions to create a topic and send data to the topic. |
sasl_aws_iam_session_name |
The user-specified string that identifies the session in AWS. |
sasl_aws_region |
The region of the Amazon MSK cluster. |
sasl_enabled |
Enable SASL authentication. Set this to true . |
sasl_mechanism |
Set to AWS_MSK_IAM , SCRAM-SHA-512 , or SCRAM-SHA-256 . |
sasl_password |
Your SASL password. |
sasl_user |
Your SASL username. |
tls_enabled |
Enable Transport Layer Security (TLS) on the connection to Amazon MSK clusters. Set this to true . |
For more detail on each of these parameters, refer to Query Parameters.
Confluent Cloud
Changefeeds can deliver messages to Kafka clusters hosted on Confluent Cloud.
A Confluent Cloud sink connection URI must include the following:
'confluent-cloud://{bootstrap server}:9092?api_key={key}&api_secret={secret}'
The api_key
and api_secret
are the required parameters for the Confluent Cloud sink connection URI.
URI Parameter | Description |
---|---|
bootstrap server |
The bootstrap server listed for your Kafka cluster in the Confluent Cloud console. |
api_key |
The API key created for the cluster in Confluent Cloud. |
api_secret |
The API key's secret generated in Confluent Cloud. Note: This must be URL-encoded before passing into the connection string. |
Changefeeds emitting to a Confluent Cloud Kafka cluster support the standard Kafka parameters, such as topic_name
and topic_prefix
. Confluent Cloud sinks also support the standard Kafka changefeed options and the Kafka sink configuration option.
For a Confluent Cloud setup example, refer to the Changefeed Examples page.
The following parameters are also needed, but are set by default in CockroachDB:
tls_enabled=true
sasl_enabled=true
sasl_handshake=true
sasl_mechanism=PLAIN
Google Cloud Pub/Sub
Changefeeds can deliver messages to a Google Cloud Pub/Sub sink, which is integrated with Google Cloud Platform.
Since CockroachDB v23.2, the changefeed.new_pubsub_sink_enabled
cluster setting is enabled by default, which provides improved throughput. Without this cluster setting enabled, changefeeds emit JSON-encoded events with the top-level message fields all lowercase. With changefeed.new_pubsub_sink_enabled
, the top-level fields are capitalized. For more details, refer to the Pub/Sub sink messages section.
A Pub/Sub sink URI follows this example:
'gcpubsub://{project name}?region={region}&topic_name={topic name}&AUTH=specified&CREDENTIALS={base64-encoded credentials}'
URI Parameter | Description |
---|---|
project name |
The Google Cloud Project name. |
region |
(Optional) The single region to which all output will be sent. If you do not include region , then you must create your changefeed with the unordered option. |
topic_name |
(Optional) The topic name to which messages will be sent. See the following section on Topic Naming for detail on how topics are created. |
AUTH |
The authentication parameter can define either specified (default) or implicit authentication. To use specified authentication, pass your Service Account credentials with the URI. To use implicit authentication, configure these credentials via an environment variable. Refer to the Cloud Storage Authentication page page for examples of each of these. |
CREDENTIALS |
(Required with AUTH=specified ) The base64-encoded credentials of your Google Service Account. |
ASSUME_ROLE |
The service account of the role to assume. Use in combination with AUTH=implicit or specified . Refer to the Cloud Storage Authentication page for an example on setting up assume role authentication. |
This table shows the parameters for changefeeds to a specific sink. The CREATE CHANGEFEED
page provides a list of all the available options.
When using Pub/Sub as your downstream sink, consider the following:
- Pub/Sub sinks support
JSON
message format. You can use theformat=csv
option in combination withinitial_scan='only'
for CSV-formatted messages. - Use the
unordered
option for multi-region Pub/Sub. Google Cloud's multi-region Pub/Sub will have lower latency when emitting from multiple regions, but Google Cloud Pub/Sub does not support message ordering for multi-region topics. - Changefeeds connecting to a Pub/Sub sink do not support the
topic_prefix
option.
Ensure one of the following Pub/Sub roles are set in your Google Service Account at the project level:
- To create topics on changefeed creation, you must use the Pub/Sub Editor role, which contains the permissions to create a topic.
- If the topic the changefeed is writing to already exists, then you can use the more limited Pub/Sub Publisher role, which can only write to existing topics.
For more information, read about compatible changefeed options and the Create a changefeed connected to a Google Cloud Pub/Sub sink example.
You can use Google's Pub/Sub emulator, which allows you to run Pub/Sub locally for testing. CockroachDB uses the Google Cloud SDK, which means that you can follow Google's instructions for Setting environment variables to run the Pub/Sub emulator.
Pub/Sub topic naming
When running a CREATE CHANGEFEED
statement to a Pub/Sub sink, consider the following regarding topic names:
- Changefeeds will try to create a topic automatically. When you do not specify the topic in the URI with the
topic_name
parameter, the changefeed will use the table name to create the topic name. - If the topic already exists in your Pub/Sub sink, the changefeed will write to it.
- Changefeeds watching multiple tables will write to multiple topics corresponding to those table names.
- The
full_table_name
option will create a topic using the fully qualified table name for each table the changefeed is watching. - The output from
CREATE CHANGEFEED
will display the job ID as well as the topic name(s) to which the changefeed will emit.
You can manually create a topic in your Pub/Sub sink before starting the changefeed. Refer to the Creating a changefeed to Google Cloud Pub/Sub example for more detail. To understand restrictions on user-specified topic names, refer to Google's documentation on Guidelines to name a topic or subscription.
For a list of compatible parameters and options, refer to Parameters on the CREATE CHANGEFEED
page.
Pub/Sub sink configuration
You can configure flushing, retry, and concurrency behavior of changefeeds running to a Pub/Sub sink with the following:
- Set the
changefeed.sink_io_workers
cluster setting to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you setchangefeed.sink_io_workers
, it will not affect running changefeeds; pause the changefeed, setchangefeed.sink_io_workers
, and then resume the changefeed. Note that this cluster setting will also affect changefeeds running to webhook sinks and Kafka. - Set the
pubsub_sink_config
option to configure the changefeed flushing and retry behavior to your webhook sink. For details on thepubsub_sink_config
option's configurable fields, refer to the following table and examples.
Field | Type | Description | Default |
---|---|---|---|
Flush.Messages |
INT |
The batch is flushed and its messages are sent when it contains this many messages. | 0 |
Flush.Bytes |
INT |
The batch is flushed when the total byte sizes of all its messages reaches this threshold. | 0 |
Flush.Frequency |
INTERVAL |
When this amount of time has passed since the first received message in the batch without it flushing, it should be flushed. | "0s" |
Retry.Max |
INT |
The maximum number of attempted batch emit retries after sending a message batch in a request fails. Specify either an integer greater than zero or the string inf to retry indefinitely. This only affects batch emit retries, not other causes of duplicate messages. Note that setting this field will not prevent the whole changefeed job from retrying indefinitely. |
3 |
Retry.Backoff |
INTERVAL |
How long the sink waits before retrying after the first failure. The backoff will double until it reaches the maximum retry time of 30 seconds. For example, if Retry.Max = 4 and Retry.Backoff = 10s , then the sink will try at most 4 retries, with 10s , 20s , 30s , and 30s backoff times. |
"500ms" |
For example:
pubsub_sink_config = '{ "Flush": {"Messages": 100, "Frequency": "5s"}, "Retry": { "Max": 4, "Backoff": "10s"} }'
Setting either Messages
or Bytes
with a non-zero value without setting Frequency
will cause the sink to assume Frequency
has an infinity value. If either Messages
or Bytes
have a non-zero value, then a non-zero value for Frequency
must be provided. This configuration is invalid and will cause an error, since the messages could sit in a batch indefinitely if the other conditions do not trigger.
Some complexities to consider when setting Flush
fields for batching:
When all batching parameters are zero (
"Messages"
,"Bytes"
, and"Frequency"
) the sink will interpret this configuration as "send batch every time a message is available." This would be the same as not providing any configuration at all:{ "Flush": { "Messages": 0, "Bytes": 0, "Frequency": "0s" } }
If one or more fields are set as non-zero values, any fields with a zero value the sink will interpret as infinity. For example, in the following configuration, the sink will send a batch whenever the size reaches 100 messages, or, when 5 seconds has passed since the batch was populated with its first message.
Bytes
is unset, so the batch size is unlimited. No flush will be triggered due to batch size:{ "Flush": { "Messages": 100, "Frequency": "5s" } }
Pub/Sub sink messages
The changefeed.new_pubsub_sink_enabled
cluster setting is enabled by default, which provides improved changefeed throughput peformance. With changefeed.new_pubsub_sink_enabled
enabled, the changefeed JSON-encoded message format have top-level fields that are capitalized:
{Key: ..., Value: ..., Topic: ...}
By default in v23.2, the capitalization of top-level fields in the message has changed. Before upgrading to CockroachDB v23.2 and later, you may need to reconfigure downstream systems to parse the new message format.
With changefeed.new_pubsub_sink_enabled
set to false
, changefeeds emit JSON messages with the top-level fields all lowercase:
{key: ..., value: ..., topic: ...}
If changefeed.new_pubsub_sink_enabled
is set to false
, changefeeds will not benefit from the improved throughput performance that this setting enables.
The following shows the default JSON messages for a changefeed emitting to Pub/Sub. These changefeed messages were emitted as part of the Create a changefeed connected to a Google Cloud Pub/Sub sink example:
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────────┬──────────────┬────────────┬──────────────────┬────────────┐
│ DATA │ MESSAGE_ID │ ORDERING_KEY │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_STATUS │
├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────┼──────────────┼────────────┼──────────────────┼────────────┤
│ {"Key":["amsterdam", "09ee2856-5856-40c4-85d3-7d65bed978f0"],"Value":{"after": {"address": "84579 Peter Divide Apt. 47", "city": "amsterdam", "credit_card": "0100007510", "id": "09ee2856-5856-40c4-85d3-7d65bed978f0", "name": "Timothy Jackson"}},"Topic":"users"} │ 11249015757941393 │ │ │ │ SUCCESS │
│ {"Key":["new york", "8803ab9e-5001-4994-a2e6-68d587f95f1d"],"Value":{"after": {"address": "37546 Andrew Roads Apt. 68", "city": "new york", "credit_card": "4731676650", "id": "8803ab9e-5001-4994-a2e6-68d587f95f1d", "name": "Susan Harrington"}},"Topic":"users"} │ 11249015757941394 │ │ │ │ SUCCESS │
│ {"Key":["seattle", "32e27201-ca0d-4a0c-ada2-fbf47f6a4711"],"Value":{"after": {"address": "86725 Stephen Gardens", "city": "seattle", "credit_card": "3639690115", "id": "32e27201-ca0d-4a0c-ada2-fbf47f6a4711", "name": "Brad Hill"}},"Topic":"users"} │ 11249015757941395 │ │ │ │ SUCCESS │
│ {"Key":["san francisco", "27b03637-ef9f-49a0-9b58-b16d7a9e34f4"],"Value":{"after": {"address": "85467 Tiffany Field", "city": "san francisco", "credit_card": "0016125921", "id": "27b03637-ef9f-49a0-9b58-b16d7a9e34f4", "name": "Mark Garcia"}},"Topic":"users"} │ 11249015757941396 │ │ │ │ SUCCESS │
│ {"Key":["rome", "982e1863-88d4-49cb-adee-0a35baae7e0b"],"Value":{"after": {"address": "54918 Sutton Isle Suite 74", "city": "rome", "credit_card": "6015706174", "id": "982e1863-88d4-49cb-adee-0a35baae7e0b", "name": "Kimberly Nichols"}},"Topic":"users"} │ 11249015757941397 │ │ │ │ SUCCESS │
│ {"Key":["washington dc", "7b298994-7b12-414c-90ef-353c7105f012"],"Value":{"after": {"address": "45205 Romero Ford Apt. 86", "city": "washington dc", "credit_card": "3519400314", "id": "7b298994-7b12-414c-90ef-353c7105f012", "name": "Taylor Bullock"}},"Topic":"users"} │ 11249015757941398 │ │ │ │ SUCCESS │
│ {"Key":["boston", "4f012f57-577b-4853-b5ab-0d79d0df1369"],"Value":{"after": {"address": "15765 Vang Ramp", "city": "boston", "credit_card": "6747715133", "id": "4f012f57-577b-4853-b5ab-0d79d0df1369", "name": "Ryan Garcia"}},"Topic":"users"} │ 11249015757941399 │ │ │ │ SUCCESS │
│ {"Key":["seattle", "9ba85917-5545-4674-8ab2-497fa47ac00f"],"Value":{"after": {"address": "24354 Whitney Lodge", "city": "seattle", "credit_card": "8642661685", "id": "9ba85917-5545-4674-8ab2-497fa47ac00f", "name": "Donald Walsh"}},"Topic":"users"} │ 11249015757941400 │ │ │ │ SUCCESS │
│ {"Key":["seattle", "98312fb3-230e-412d-9b22-074ec97329ff"],"Value":{"after": {"address": "72777 Carol Shoal", "city": "seattle", "credit_card": "7789799678", "id": "98312fb3-230e-412d-9b22-074ec97329ff", "name": "Christopher Davis"}},"Topic":"users"} │ 11249015757941401 │ │ │ │ SUCCESS │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────────┴──────────────┴────────────┴──────────────────┴────────────┘
For an overview of the messages emitted from changefeeds, see the Changefeed Messages page.
Cloud storage sink
Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.
Some considerations when using cloud storage sinks:
- Cloud storage sinks work with
JSON
and emit newline-delimitedJSON
files. You can use theformat=csv
option in combination withinitial_scan='only'
for CSV-formatted messages. - Cloud storage sinks can be configured to store emitted changefeed messages in one or more subdirectories organized by date. See file partitioning and the General file format examples.
- The supported cloud schemes are:
s3
,gs
,azure
,http
, andhttps
. - Both
http://
andhttps://
are cloud storage sinks, not webhook sinks. It is necessary to prefix the scheme withwebhook-
for webhook sinks.
You can authenticate to cloud storage sinks using specified
or implicit
authentication. CockroachDB also supports assume role authentication for Amazon S3 and Google Cloud Storage, which allows you to limit the control specific users have over your storage buckets. For detail and instructions on authenticating to cloud storage sinks, see Cloud Storage Authentication.
Examples of supported cloud storage sink URIs:
Amazon S3
's3://{BUCKET NAME}/{PATH}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}'
Azure Blob Storage
'azure://{CONTAINER NAME}/{PATH}?AZURE_ACCOUNT_NAME={ACCOUNT NAME}&AZURE_ACCOUNT_KEY={URL-ENCODED KEY}'
Google Cloud Storage
'gs://{BUCKET NAME}/{PATH}?AUTH=specified&CREDENTIALS={ENCODED KEY}'
HTTP
'http://localhost:8080/{PATH}'
Cloud storage parameters
The following table lists the available parameters for cloud storage sink URIs:
URI Parameter | Storage | Description |
---|---|---|
AWS_ACCESS_KEY_ID |
AWS | The access key ID to your AWS account. |
AWS_SECRET_ACCESS_KEY |
AWS | The secret access key to your AWS account. |
ASSUME_ROLE |
AWS S3, GCS | The ARN (AWS) or service account (GCS) of the role to assume. Use in combination with AUTH=implicit or specified .AWS S3 only: Use external_id with ASSUME_ROLE to specify a third-party assigned external ID as part of the role. Refer to Amazon S3 assume role for setup details. |
AUTH |
AWS S3, Azure Blob Storage, GCS | The authentication parameter can define either specified (default) or implicit authentication. To use specified authentication, pass your account credentials with the URI. To use implicit authentication, configure these credentials via an environment variable. See Cloud Storage Authentication for examples of each of these. |
AZURE_ACCOUNT_NAME |
Azure Blob Storage | The name of your Azure account. |
AZURE_ACCOUNT_KEY |
Azure Blob Storage | The URL-encoded account key for your Azure account. |
AZURE_CLIENT_ID |
Azure Blob Storage | Application (client) ID for your App Registration. |
AZURE_CLIENT_SECRET |
Azure Blob Storage | Client credentials secret generated for your App Registration. |
AZURE_ENVIRONMENT |
Azure Blob Storage | The Azure environment that the storage account belongs to. The accepted values are: AZURECHINACLOUD , AZUREGERMANCLOUD , AZUREPUBLICCLOUD , and AZUREUSGOVERNMENTCLOUD . These are cloud environments that meet security, compliance, and data privacy requirements for the respective instance of Azure cloud. If the parameter is not specified, it will default to AZUREPUBLICCLOUD . |
AZURE_TENANT_ID |
Azure Blob Storage | Directory (tenant) ID for your App Registration. |
CREDENTIALS |
GCS | (Required with AUTH=specified ) The base64-encoded credentials of your Google Service Account credentials. |
file_size |
All | The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolved option, which flushes on a specified cadence. Default: 16MB |
partition_format |
All | Specify how changefeed file paths are partitioned in cloud storage sinks. Use partition_format with the following values:
For example: CREATE CHANGEFEED FOR TABLE users INTO 'gs://...?AUTH...&partition_format=hourly' Default: daily |
S3_STORAGE_CLASS |
AWS S3 | Specify the S3 storage class for files created by the changefeed. See Create a changefeed with an S3 storage class for the available classes and an example. Default: STANDARD |
topic_prefix |
All | Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 's3://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo . |
This table shows the parameters for changefeeds to a specific sink. The CREATE CHANGEFEED
page provides a list of all the available options.
Use Cloud Storage for Bulk Operations provides more detail on authentication to cloud storage sinks.
Cloud storage sink messages
The following shows the default JSON messages for a changefeed emitting to a cloud storage sink:
{
"after":{
"address":"51438 Janet Valleys",
"city":"boston",
"credit_card":"0904722368",
"id":"33333333-3333-4400-8000-00000000000a",
"name":"Daniel Hernandez MD"
},
"key":[
"boston",
"33333333-3333-4400-8000-00000000000a"
]
}
{
"after":{
"address":"15074 Richard Falls",
"city":"boston",
"credit_card":"0866384459",
"id":"370117cf-d77d-4778-b0b9-01ac17c15a06",
"name":"Cheyenne Morales"
},
"key":[
"boston",
"370117cf-d77d-4778-b0b9-01ac17c15a06"
]
}
{
"after":{
"address":"69687 Jessica Islands Apt. 68",
"city":"boston",
"credit_card":"6837062320",
"id":"3851eb85-1eb8-4200-8000-00000000000b",
"name":"Sarah Wang DDS"
},
"key":[
"boston",
"3851eb85-1eb8-4200-8000-00000000000b"
]
}
. . .
For an overview of the messages emitted from changefeeds, see the Changefeed Messages page.
Webhook sink
Use a webhook sink to deliver changefeed messages to an arbitrary HTTP endpoint.
Example of a webhook sink URL:
'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
The following table lists the parameters you can use in your webhook URI:
URI Parameter | Description |
---|---|
ca_cert |
The base64-encoded ca_cert file. Specify ca_cert for a webhook sink. Note: To encode your ca.cert , run base64 -w 0 ca.cert . |
client_cert |
The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key . |
client_key |
The base64-encoded private key for the PEM certificate. This is used with client_cert .Note: Client keys are often encrypted. You will receive an error if you pass an encrypted client key in your changefeed statement. To decrypt the client key, run: openssl rsa -in key.pem -out key.decrypt.pem -passin pass:{PASSWORD} . Once decrypted, be sure to update your changefeed statement to use the new key.decrypt.pem file instead. |
insecure_tls_skip_verify |
If true , disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication. Default: false |
This table shows the parameters for changefeeds to a specific sink. The CREATE CHANGEFEED
page provides a list of all the available options.
The following are considerations when using the webhook sink:
- Only supports HTTPS. Use the
insecure_tls_skip_verify
parameter when testing to disable certificate verification; however, this still requires HTTPS and certificates. - Supports JSON output format. You can use the
format=csv
option in combination withinitial_scan='only'
for CSV-formatted messages.
Webhook sink configuration
You can configure flushing, retry, and concurrency behavior of changefeeds running to a webhook sink with the following:
- Set the
changefeed.sink_io_workers
cluster setting to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you setchangefeed.sink_io_workers
, it will not affect running changefeeds; pause the changefeed, setchangefeed.sink_io_workers
, and then resume the changefeed. Note that this cluster setting will also affect changefeeds running to Google Cloud Pub/Sub sinks and Kafka. - Set the
webhook_sink_config
option to configure the changefeed flushing and retry behavior to your webhook sink. For details on thewebhook_sink_config
option's configurable fields, refer to the following table and examples.
Field | Type | Description | Default |
---|---|---|---|
Flush.Messages |
INT |
The batch is flushed and its messages are sent when it contains this many messages. | 0 |
Flush.Bytes |
INT |
The batch is flushed when the total byte sizes of all its messages reaches this threshold. | 0 |
Flush.Frequency |
INTERVAL |
When this amount of time has passed since the first received message in the batch without it flushing, it should be flushed. | "0s" |
Retry.Max |
INT |
The maximum number of attempted HTTP retries after sending a message batch in an HTTP request fails. Specify either an integer greater than zero or the string inf to retry indefinitely. This only affects HTTP retries, not other causes of duplicate messages. Note that setting this field will not prevent the changefeed from retrying indefinitely. |
3 |
Retry.Backoff |
INTERVAL |
How long the sink waits before retrying after the first failure. The backoff will double until it reaches the maximum retry time of 30 seconds. For example, if Retry.Max = 4 and Retry.Backoff = 10s , then the sink will try at most 4 retries, with 10s , 20s , 30s , and 30s backoff times. |
"500ms" |
For example:
webhook_sink_config = '{ "Flush": {"Messages": 100, "Frequency": "5s"}, "Retry": { "Max": 4, "Backoff": "10s"} }'
Setting either Messages
or Bytes
with a non-zero value without setting Frequency
will cause the sink to assume Frequency
has an infinity value. If either Messages
or Bytes
have a non-zero value, then a non-zero value for Frequency
must be provided. This configuration is invalid and will cause an error, since the messages could sit in a batch indefinitely if the other conditions do not trigger.
Some complexities to consider when setting Flush
fields for batching:
When all batching parameters are zero (
"Messages"
,"Bytes"
, and"Frequency"
) the sink will interpret this configuration as "send batch every time a message is available." This would be the same as not providing any configuration at all:{ "Flush": { "Messages": 0, "Bytes": 0, "Frequency": "0s" } }
If one or more fields are set as non-zero values, any fields with a zero value the sink will interpret as infinity. For example, in the following configuration, the sink will send a batch whenever the size reaches 100 messages, or, when 5 seconds has passed since the batch was populated with its first message.
Bytes
is unset, so the batch size is unlimited. No flush will be triggered due to batch size:{ "Flush": { "Messages": 100, "Frequency": "5s" } }
Webhook sink messages
The following shows the default JSON messages for a changefeed emitting to a webhook sink. These changefeed messages were emitted as part of the Create a changefeed connected to a Webhook sink example:
"2021/08/24 14":"00":21
{
"payload":[
{
"after":{
"city":"rome",
"creation_time":"2019-01-02T03:04:05",
"current_location":"39141 Travis Curve Suite 87",
"ext":{
"brand":"Schwinn",
"color":"red"
},
"id":"d7b18299-c0c4-4304-9ef7-05ae46fd5ee1",
"dog_owner_id":"5d0c85b5-8866-47cf-a6bc-d032f198e48f",
"status":"in_use",
"type":"bike"
},
"key":[
"rome",
"d7b18299-c0c4-4304-9ef7-05ae46fd5ee1"
],
"topic":"vehicles",
"updated":"1629813621680097993.0000000000"
}
],
"length":1
}
"2021/08/24 14":"00":22
{
"payload":[
{
"after":{
"city":"san francisco",
"creation_time":"2019-01-02T03:04:05",
"current_location":"84888 Wallace Wall",
"ext":{
"color":"black"
},
"id":"020cf7f4-6324-48a0-9f74-6c9010fb1ab4",
"dog_owner_id":"b74ea421-fcaf-4d80-9dcc-d222d49bdc17",
"status":"available",
"type":"scooter"
},
"key":[
"san francisco",
"020cf7f4-6324-48a0-9f74-6c9010fb1ab4"
],
"topic":"vehicles",
"updated":"1629813621680097993.0000000000"
}
],
"length":1
}
For an overview of the messages emitted from changefeeds, see the Changefeed Messages page.
Azure Event Hubs
Changefeeds can deliver messages to an Azure Event Hub, which is compatible with Apache Kafka.
An Azure Event Hubs sink URI:
'azure-event-hub://{event-hubs-namespace}.servicebus.windows.net:9093?shared_access_key_name={policy-name}&shared_access_key={url-encoded key}'
You can also use a kafka://
scheme in the URI:
'kafka://{event-hubs-namespace}.servicebus.windows.net:9093?shared_access_key_name={policy-name}&shared_access_key={url-encoded key}'
The shared_access_key
and shared_access_key_name
are the required parameters for an Azure Event Hubs connection URI.
URI Parameter | Description |
---|---|
{event_hubs_namespace} |
The Event Hub namespace. |
shared_access_key_name |
The name of the shared access policy created for the namespace. |
shared_access_key |
The key for the shared access policy. Note: You must URL encode the shared access key before passing it in the connection string. |
Changefeeds emitting to an Azure Event hub support topic_name
and topic_prefix
. Azure Event Hubs also supports the standard Kafka changefeed options and the Kafka sink configuration option.
For an Azure Event Hub setup example, refer to the Changefeed Examples page.
The following parameters are also needed, but are set by default in CockroachDB:
tls_enabled=true
sasl_enabled=true
sasl_handshake=true
sasl_mechanism=PLAIN
Apache Pulsar
This feature is in preview. This feature is subject to change. To share feedback and/or issues, contact Support.
Changefeeds can deliver messages to Apache Pulsar.
A Pulsar sink URI:
pulsar://{host IP}:6650
By default, Apache Pulsar listens for client connections on port :6650
. For more detail on configuration, refer to the Apache Pulsar documentation.
Changefeeds emitting to an Apache Pulsar sink support json
and csv
format options.
Changefeeds emitting to an Apache Pulsar sink do not support:
format=avro
confluent_schema_registry
topic_prefix
- Any batching configuration
- Authentication query parameters
- External connections
For an Apache Pulsar setup example, refer to the Changefeed Examples page.
Apache Pulsar sink messages
----- got message -----
key:[null], properties:[], content:{"Key":["seattle", "09265ab7-5f3a-40cb-a543-d37c8c893793"],"Value":{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "09265ab7-5f3a-40cb-a543-d37c8c893793", "revenue": 53.00, "rider_id": "44576296-d4a7-4e79-add9-f880dd951064", "start_address": "25795 Alyssa Extensions", "start_time": "2024-05-09T12:18:42.022952", "vehicle_city": "seattle", "vehicle_id": "a0c935f6-8872-408e-bc12-4d0b5a85fa71"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "b3548485-9475-44cf-9769-66617b9cb151"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "b3548485-9475-44cf-9769-66617b9cb151", "revenue": 25.00, "rider_id": "adf4656f-6a0d-4315-b035-eaf7fa6b85eb", "start_address": "49614 Victoria Cliff Apt. 25", "start_time": "2024-05-09T12:18:42.763718", "vehicle_city": "amsterdam", "vehicle_id": "eb1d1d2c-865e-4a40-a7d7-8f396c1c063f"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "d119f344-318f-41c0-bfc0-b778e6e38f9a"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "d119f344-318f-41c0-bfc0-b778e6e38f9a", "revenue": 24.00, "rider_id": "1a242414-f704-4e1f-9f5e-2b468af0c2d1", "start_address": "54909 Douglas Street Suite 51", "start_time": "2024-05-09T12:18:42.369755", "vehicle_city": "amsterdam", "vehicle_id": "99d98e05-3114-460e-bb02-828bcd745d44"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["rome", "3c7d6676-f713-4985-ba52-4c19fe6c3692"],"Value":{"after": {"city": "rome", "end_address": null, "end_time": null, "id": "3c7d6676-f713-4985-ba52-4c19fe6c3692", "revenue": 27.00, "rider_id": "c15a4926-fbb2-4931-a9a0-6dfabc6c506b", "start_address": "39415 Brandon Avenue Apt. 29", "start_time": "2024-05-09T12:18:42.055498", "vehicle_city": "rome", "vehicle_id": "627dad1a-3531-4214-a173-16bcc6b93036"}},"Topic":"rides"}