-
Notifications
You must be signed in to change notification settings - Fork 873
mongodb/cdc: make progress on low activity collections #3430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@Jeffail if you merge Jeffail/checkpoint#4 then I can make this PR trivial (and cut a release) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a checkpoint mechanism to distinguish between a snapshot and streaming phase in Mongo CDC, ensuring that in the absence of writes a resnapshot is not performed upon restart. Key changes include:
- Removal of an unnecessary logger from the ollama moderation processor test.
- Improvements to connection string handling and error reporting in the MongoDB CDC integration tests.
- Refactoring of checkpoint flushing and snapshot reading logic in the CDC input and checkpoint cache modules.
Reviewed Changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated no comments.
File | Description |
---|---|
internal/impl/ollama/moderation_processor_test.go | Removed a logger parameter from container creation in integration tests |
internal/impl/mongodb/cdc/integration_test.go | Updated async handling and connection string construction in integration tests |
internal/impl/mongodb/cdc/input.go | Enhanced checkpoint flushing and refined snapshot reading logic |
internal/impl/mongodb/cdc/checkpoint_cache.go | Revised Load function error handling and unmarshalling of checkpoint data |
Files not reviewed (1)
- go.mod: Language not supported
Comments suppressed due to low confidence (2)
internal/impl/ollama/moderation_processor_test.go:50
- [nitpick] The removal of the test logger parameter may reduce debugging detail available during test failures. Confirm that this removal is intended and that logging is adequately handled elsewhere.
- testcontainers.WithLogger(testcontainers.TestLogger(t)),
internal/impl/mongodb/cdc/checkpoint_cache.go:52
- The updated Load function no longer handles the case when the key is not found (service.ErrKeyNotFound) as in the previous version. Consider restoring that check to return nil, nil for missing keys to maintain the original behavior.
if err != nil {
This is useful if there are no changes to the DB before the connector restarts. High level we do this by plumbing a wait group to all ack functions and waiting for all snapshot documents to be ack'd before moving on to the streaming phase (and saving a new kind of checkpoint that signifies the checkpoint is completed). This does add a "pause" in the pipeline before moving into the streaming phase, but I'm hoping that won't be an issue in practice.
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
We don't need a stop the world pause after the snapshot phase, we can just commit the resume token if there are no new batches AND there is nothing in flight at the moment. This is needed for low frequency collections are being watched without changes but we still need to ratchet forward our oplog position (encoded in the resumeToken). Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a doozy! Nice work 🐑 🚀
When having a database that recieves no writes, we need to snapshot before streaming so that we don't resnapshot on restart.
I can't reproduce the timeouts when there are no changes yet.
Fixes: #3425
Commits: