Skip to content

Avoid change streams on the storage database #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

rkistner
Copy link
Contributor

@rkistner rkistner commented Jun 12, 2025

Background

The MongoDB storage adapter relied on change streams to detect changes to:

  1. Read checkpoints.
  2. Write checkpoints (implemented in Optimize write checkpoints lookups #230).

The idea is that each API process is "notified" of changes, so that:

  1. There is low overhead when the instance is idle.
  2. Latency is short.

The issue

The issue is that change streams can have high overhead on a cluster. A change stream is effectively reading all changes in the oplog, then filtering it to the watched collection/pipeline.

This is fine when you only have a small number of open change streams, or low volumes of writes. However, we have cases where there are 100+ change streams open at a time. And even though those collections are not modified often, when you also have a 20k/s write rate (could happen when reprocessing sync rules), you suddenly end up with 100k document scans/s, even though very few documents are returned.

I cannot find any good documentation on this - this performance impact is not mentioned in the docs. But this script demonstrates the issue quite clearly: https://gist.github.com/rkistner/0d898880b0a0a48d1557f64e01992795

I also suspect that MongoDB has an optimization for this issue in Flex clusters, but that code is private unfortunately.

The fix

The fix is to not use watch/change streams in the storage adapter. The actual implementation is different for read and write checkpoints.

Read checkpoints

Diff for this part

We implement this similar to the NOTIFY functionality we use for Postgres storage:

  1. Each time a read checkpoint is committed, we write an empty document to a new checkpoint_events capped collection.
  2. The API processes watch this collection for changes, by using a tailable cursor.
  3. When a change is seen, it fetches the latest state from the sync_rules collection.

An alternative would be to just use polling on sync_rules. However, method has lower latency, and reduced overhead when the instance is mostly idle.

Tailable cursors are an under-documented feature, but it does appear to work well for this case. It gives functionality similar to change streams, with better efficiency, at the cost of requiring explicit writes to the collection.

Write checkpoints

Diff for this part

For write checkpoints, we now use the same mechanism as for bucket_data and parameter_data: On each new read checkpoint, we read all the write checkpoints created after the previous read checkpoint.

What makes this a larger change is that:

  1. We did not previously record sufficient info to look up the write checkpoints between two read checkpoints.
  2. Managed write checkpoints are persisted in a completely different way from custom write checkpoints, so this requires separate implementations for each.
    a. Custom write checkpoints are now persisted in the same batch/transaction as other data, and gets a matching op_id.
    b. Manged write checkpoints gets a processed_at_lsn field, populated when a read checkpoints are committed. We may change this to also use an op_id in the future, but that would complicate the current implementation a bit.

This reverts big parts of #230, but does not go back to the old approach. This actually results in less code and a simpler architecture overall.

TODO

  • Some tests are unstable - investigate

@rkistner rkistner requested a review from stevensJourney June 12, 2025 15:02
Copy link

changeset-bot bot commented Jun 12, 2025

⚠️ No Changeset found

Latest commit: c97cefd

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant