Advanced Changefeed Configuration

On this page Carat arrow pointing down
Warning:

The configurations and settings explained on this page will have a significant impact on a changefeed's behavior and could potentially affect a cluster's performance. Thoroughly test before deploying any changes to production.

The following sections describe performance, settings, configurations, and details to tune changefeeds:

Some options for the kafka_sink_config and webhook_sink_config parameters are discussed on this page. However, for more information on specific tuning for Kafka and Webhook sinks, refer to the following pages:

Changefeed performance

By default, changefeeds are integrated with elastic CPU, which helps to prevent changefeeds from affecting foreground traffic. For example, changefeed backfills and initial scans can be CPU-intensive. This integration will result in a cluster prioritizing SQL traffic over changefeeds. Since this may affect changefeed latency, you can monitor your cluster's admission control system on the Overload Dashboard and changefeed latency on the Changefeed Dashboard.

This is controlled by the following cluster settings, which are by default enabled:

changefeed.cpu.per_event_elastic_control.enabled
kvadmission.rangefeed_catchup_scan_elastic_control.enabled

For a more technical explanation of elastic CPU, refer to the Rubbing control theory on the Go scheduler blog post.

Mux rangefeeds

MuxRangefeed is enabled by default.

MuxRangefeed is a subsystem that improves the performance of rangefeeds with scale. It significantly reduces the overhead of running rangefeeds. Without MuxRangefeed, the number of RPC streams is proportional with the number of ranges in a table. For example, a large table could have tens of thousands of ranges. With MuxRangefeed, this proportion improves so that the number of RPC streams is relative to the number of nodes in a cluster.

Latency in changefeeds

When you are running large workloads, changefeeds can encounter or cause latency in a cluster in the following ways:

  • Changefeeds can have an impact on SQL latency in the cluster generally.
  • Changefeeds can encounter latency in events emitting. This latency is the total time CockroachDB takes to:

The following cluster settings reduce bursts of rangefeed work so that updates are paced steadily over time.

Warning:

We do not recommend adjusting these settings unless you are running a large workload, or are working with the Cockroach Labs support team. Thoroughly test different cluster setting configurations before deploying to production.

kv.closed_timestamp.target_duration

Default: 3s

Note:

Adjusting kv.closed_timestamp.target_duration could have a detrimental impact on follower reads. If you are using follower reads, refer to the kv.rangefeed.closed_timestamp_refresh_interval cluster setting instead to ease changefeed impact on foreground SQL latency.

kv.closed_timestamp.target_duration controls the target closed timestamp lag duration, which determines how far behind the current time CockroachDB will attempt to maintain the closed timestamp. For example, with the default value of 3s, if the current time is 12:30:00 then CockroachDB will attempt to keep the closed timestamp at 12:29:57 by possibly retrying or aborting ongoing writes that are below this time.

A changefeed aggregates checkpoints across all ranges, and once the timestamp on all the ranges advances, the changefeed can then checkpoint. In the context of changefeeds, kv.closed_timestamp.target_duration affects how old the checkpoints will be, which will determine the latency before changefeeds can consider the history of an event complete.

kv.rangefeed.closed_timestamp_refresh_interval

Default: 3s

This setting controls the interval at which closed timestamp updates are delivered to rangefeeds and in turn emitted as a changefeed checkpoint.

Increasing the interval value will lengthen the delay between each checkpoint, which will increase the latency of changefeed checkpoints, but reduce the impact on SQL latency due to overload on the cluster. This happens because every range with a rangefeed has to emit a checkpoint event with this 3s interval. As an example, 1 million ranges would result in 330,000 events per second, which would use more CPU resources.

If you are running changefeeds at a large scale and notice foreground SQL latency, we recommend increasing this setting.

As a result, adjusting kv.rangefeed.closed_timestamp_refresh_interval can affect changefeeds encountering latency and changefeeds causing foreground SQL latency. In clusters running large-scale workloads, it may be helpful to:

  • Decrease the value for a lower changefeed emission latency — that is, how often a client can confirm that all relevant events up to a certain timestamp have been emitted.
  • Increase the value to reduce the potential impact of changefeeds on SQL latency. This will lower the resource cost of changefeeds, which can be especially important for workloads with tables in the TB range of data.

It is important to note that a changefeed at default configuration does not checkpoint more often than once every 30 seconds. When you create a changefeed with CREATE CHANGEFEED, you can adjust this with the min_checkpoint_frequency option.

kv.closed_timestamp.side_transport_interval

Default: 200ms

The kv.closed_timestamp.side_transport_interval cluster setting controls how often the closed timestamp is updated. Although the closed timestamp is updated every 200ms, CockroachDB will only emit an event across the rangefeed containing the closed timestamp value every 3s as per the kv.rangefeed.closed_timestamp_refresh_interval value.

kv.closed_timestamp.side_transport_interval is helpful when ranges are inactive. The closed timestamp subsystem usually propagates closed timestamps via Raft commands. However, an idle range that does not see any writes does not receive any Raft commands, so it would stall. This setting is an efficient mechanism to broadcast closed timestamp updates for all idle ranges between nodes.

Adjusting kv.closed_timestamp.side_transport_interval will affect both follower reads and changefeeds. While you can use kv.closed_timestamp.side_transport_interval to tune the checkpointing interval, we recommend kv.rangefeed.closed_timestamp_refresh_interval if you are using follower reads.

kv.rangefeed.closed_timestamp_smear_interval

Default: 1ms

This setting provides a mechanism to pace the closed timestamp notifications to follower replicas. At the default, the closed timestamp smear interval makes rangefeed closed timestamp delivery less spiky, which can reduce its impact on foreground SQL query latency.

For example, if you have a large table, and one of the nodes in the cluster is hosting 6000 ranges from this table. Normally, the rangefeed system will wake up every kv.rangefeed.closed_timestamp_refresh_interval (default 3s) and every 3 seconds it will publish checkpoints for all 6000 ranges. In this scenario, the kv.rangefeed.closed_timestamp_smear_interval setting takes the 3s frequency and divides it into 1ms chunks. Instead of publishing checkpoints for all 6000 ranges, it will publish checkpoints for 2 ranges every 1ms. This produces a more predictable and level load, rather than spiky, large bursts of workload.

Lagging ranges

Use the changefeed.lagging_ranges metric to track the number of ranges that are behind in a changefeed. This is calculated based on the changefeed options:

  • lagging_ranges_threshold sets a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in the lagging_ranges metric. Note that ranges undergoing an initial scan for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.
    • Default: 3m
  • lagging_ranges_polling_interval sets the interval rate for when lagging ranges are checked and the lagging_ranges metric is updated. Polling adds latency to the lagging_ranges metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.
    • Default: 1m

Use the changefeed.total_ranges metric to monitor the number of ranges that are watched by aggregator processors participating in the changefeed job. If you're experiencing lagging ranges, changefeed.total_ranges may indicate that the number of ranges watched by aggregator processors in the job is unbalanced. You may want to try pausing the changefeed and then resuming it, so that the changefeed replans the work in the cluster. changefeed.total_ranges shares the same polling interval as the changefeed.lagging_ranges metric, which is controlled by the lagging_ranges_polling_interval option.

Tip:

You can use the metrics_label option to track the lagging_ranges and total_ranges metric per changefeed.

Tuning for high durability delivery

When designing a system that relies on high durability message delivery—that is, not missing any message acknowledgement at the downstream sink—consider the following settings and configuration in this section:

Before tuning these settings, we recommend reading details on our changefeed at-least-once-delivery guarantee.

Pausing changefeeds and garbage collection

By default, protected timestamps will protect changefeed data from garbage collection up to the time of the checkpoint. Protected timestamps will protect changefeed data from garbage collection if the downstream changefeed sink is unavailable until you either cancel the changefeed or the sink becomes available once again.

However, if the changefeed lags too far behind, the protected changes could lead to an accumulation of garbage. This could result in increased disk usage and degraded performance for some workloads.

For more detail on changefeeds and protected timestamps, refer to Garbage collection and changefeeds.

To balance protecting change data and prevent the over-accumulation of garbage, Cockroach Labs recommends creating a changefeed with options to define your protection duration and monitoring your changefeed for protected timestamp record collection.

Protecting change data on pause

Create changefeeds with the following options so that your changefeed protects data when it is paused:

Monitoring protected timestamp records

You can monitor changefeed jobs for protected timestamp usage. We recommend setting up monitoring for the following metrics:

  • jobs.changefeed.protected_age_sec: Tracks the age of the oldest protected timestamp record protected by changefeed jobs. We recommend monitoring if protected_age_sec is greater than gc.ttlseconds. As protected_age_sec increases, garbage accumulation increases. Garbage collection will not progress on a table, database, or cluster if the protected timestamp record is present.
  • jobs.changefeed.currently_paused: Tracks the number of changefeed jobs currently considered paused. Since paused changefeed jobs can accumulate garbage, it is important to monitor the number of paused changefeeds.
  • jobs.changefeed.expired_pts_records: Tracks the number of expired protected timestamp records owned by changefeed jobs. You can monitor this metric in conjunction with the gc_protect_expires_after option.
  • jobs.changefeed.protected_record_count: Tracks the number of protected timestamp records held by changefeed jobs.

Defining Kafka message acknowledgment

To determine what a successful write to Kafka is, you can configure the kafka_sink_config option. The 'RequiredAcks' field specifies what a successful write to Kafka is. CockroachDB guarantees at least once delivery of messages—the 'RequiredAcks' value defines the delivery.

For high durability delivery, Cockroach Labs recommends setting:

kafka_sink_config='{'RequiredAcks': 'ALL'}'

ALL provides the highest consistency level. A quorum of Kafka brokers that have committed the message must be reached before the leader can acknowledge the write.

Note:

You must also set acks to ALL in your server-side Kafka configuration for this to provide high durability delivery.

Choosing changefeed sinks

Use Kafka or cloud storage sinks when tuning for high durability delivery in changefeeds. Both Kafka and cloud storage sinks offer built-in advanced protocols, whereas the webhook sink, while flexible, requires an understanding of how messages are acknowledged and committed by the particular system used for the webhook in order to ensure the durability of message delivery.

Defining schema change behavior

Ensure that data is ingested downstream in its new format after a schema change by using the schema_change_events and schema_schange_policy options. For example, setting schema_change_events=column_changes and schema_change_policy=stop will trigger an error to the cockroach.log file on a schema change and the changefeed to fail.

Tuning for high throughput

When designing a system that needs to emit a lot of changefeed messages, whether it be steady traffic or a burst in traffic, consider the following settings and configuration in this section:

Setting the resolved option

When a changefeed emits a resolved message, it force flushes all outstanding messages that have buffered, which will diminish your changefeed's throughput while the flush completes. Therefore, if you are aiming for higher throughput, we suggest setting the duration higher (e.g., 10 minutes), or not using the resolved option.

If you are setting the resolved option when you are aiming for high throughput, you must also consider the min_checkpoint_frequency option, which defaults to 30s. This option controls how often nodes flush their progress to the coordinating changefeed node. As a result, resolved messages will not be emitted more frequently than the configured min_checkpoint_frequency. Set this option to at least as long as your resolved option duration.

Batching and buffering messages

  • Batch messages to your sink:
  • Set the changefeed.memory.per_changefeed_limit cluster setting to a higher limit to give more memory for buffering changefeed data. This setting influences how often the changefeed will flush buffered messages. This is useful during heavy traffic.

Configuring file and message format

  • Use avro as the emitted message format option with Kafka sinks; JSON encoding can potentially create a slowdown.

Compression

  • Use the compression option when you create a changefeed emitting data files to a cloud storage sink. For larger files, set compression to the zstd format.
  • Use the snappy compression format to emit messages to a Kafka sink. If you're intending to do large batching for Kafka, use the lz4 compression format.

File size

To configure changefeeds emitting to cloud storage sinks for high throughput, you should consider:

  • Increasing the file_size parameter to control the size of the files that the changefeed sends to the sink. The default is 16MB. To configure for high throughput, we recommend 32MB–128MB. Note that this is not a hard limit, and a changefeed will flush the file when it reaches the specified size.
  • When you compress a file, it will contain many more events.
  • File size is also dependent on what kind of data the changefeed job is writing. For example, large JSON blobs will quickly fill up the file_size value compared to small rows.
  • When you change or increase file_size, ensure that you adjust the changefeed.memory.per_changefeed_limit cluster setting, which has a default of 512MiB. Buffering messages can quickly reach this limit if you have increased the file size.

Configuring for tables with many ranges

If you have a table with 10,000 or more ranges, you should consider increasing the kv.rangefeed.concurrent_catchup_iterators cluster setting. This changes the number of rangefeed catchup iterators a store will allow concurrently before queuing. The default is 16. We strongly recommend increasing this setting slowly. That is, increase the setting and then monitor its impact before adjusting further.

Adjusting concurrent changefeed work

See also


Yes No
On this page

Yes No