Skip to content

mongodb/cdc: make progress on low activity collections #3430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 27, 2025

Conversation

rockwotj
Copy link
Contributor

@rockwotj rockwotj commented May 22, 2025

When having a database that recieves no writes, we need to snapshot before streaming so that we don't resnapshot on restart.

I can't reproduce the timeouts when there are no changes yet.

Fixes: #3425

Commits:

  • chore: bump mongodb version
  • mongodb/cdc: checkpoint when completing snapshot

@rockwotj
Copy link
Contributor Author

rockwotj commented May 22, 2025

@Jeffail if you merge Jeffail/checkpoint#4 then I can make this PR trivial (and cut a release)

@rockwotj rockwotj requested review from mihaitodor, Jeffail, Copilot and mmatczuk and removed request for mihaitodor and Jeffail May 23, 2025 18:44
@rockwotj rockwotj changed the title mongodb/cdc: checkpoint between snapshot and streaming mongodb/cdc: make progress on low activity collections May 23, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a checkpoint mechanism to distinguish between a snapshot and streaming phase in Mongo CDC, ensuring that in the absence of writes a resnapshot is not performed upon restart. Key changes include:

  • Removal of an unnecessary logger from the ollama moderation processor test.
  • Improvements to connection string handling and error reporting in the MongoDB CDC integration tests.
  • Refactoring of checkpoint flushing and snapshot reading logic in the CDC input and checkpoint cache modules.

Reviewed Changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated no comments.

File Description
internal/impl/ollama/moderation_processor_test.go Removed a logger parameter from container creation in integration tests
internal/impl/mongodb/cdc/integration_test.go Updated async handling and connection string construction in integration tests
internal/impl/mongodb/cdc/input.go Enhanced checkpoint flushing and refined snapshot reading logic
internal/impl/mongodb/cdc/checkpoint_cache.go Revised Load function error handling and unmarshalling of checkpoint data
Files not reviewed (1)
  • go.mod: Language not supported
Comments suppressed due to low confidence (2)

internal/impl/ollama/moderation_processor_test.go:50

  • [nitpick] The removal of the test logger parameter may reduce debugging detail available during test failures. Confirm that this removal is intended and that logging is adequately handled elsewhere.
-	testcontainers.WithLogger(testcontainers.TestLogger(t)),

internal/impl/mongodb/cdc/checkpoint_cache.go:52

  • The updated Load function no longer handles the case when the key is not found (service.ErrKeyNotFound) as in the previous version. Consider restoring that check to return nil, nil for missing keys to maintain the original behavior.
if err != nil {

rockwotj and others added 11 commits May 23, 2025 13:48
This is useful if there are no changes to the DB before the connector
restarts.

High level we do this by plumbing a wait group to all ack functions and
waiting for all snapshot documents to be ack'd before moving on to the
streaming phase (and saving a new kind of checkpoint that signifies the
checkpoint is completed). This does add a "pause" in the pipeline before
moving into the streaming phase, but I'm hoping that won't be an issue in
practice.
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
We don't need a stop the world pause after the snapshot phase, we can
just commit the resume token if there are no new batches AND there is
nothing in flight at the moment. This is needed for low frequency
collections are being watched without changes but we still need to
ratchet forward our oplog position (encoded in the resumeToken).

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a doozy! Nice work 🐑 🚀

@rockwotj rockwotj merged commit 69ddc4c into redpanda-data:main May 27, 2025
4 checks passed
@rockwotj rockwotj deleted the mongodb-cdc branch May 27, 2025 02:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

mongodb_cdc connector snapshot restarting if database has no changes
2 participants