Skip to content

feat: remove SharedVars, SequenceTracker, status_tracking_queue. Add run_operation_submission table. Return bool status from wait #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

michalsosn
Copy link
Contributor

@michalsosn michalsosn commented Jun 16, 2025

fixes PY-121, PY-138

Summary by Sourcery

Use a new run_operation_submission table in the database as the single source of truth for tracking and acknowledging RunOperations, removing the previous in-memory SequenceTracker, SharedVar, and queue mechanisms. Migrate sync process and threads to rely on OperationsRepository APIs, introduce boolean-returning wait_for_submission and wait_for_processing on Run, simplify LagTracker, and update tests and DB version to v4.

New Features:

  • Introduce run_operation_submission table and OperationSubmission dataclass to persist submitted operations
  • Add Run.wait_for_submission and Run.wait_for_processing methods that return boolean status
  • Extend OperationsRepository with methods to query operation counts, sequence ID ranges, and minimum timestamps

Enhancements:

  • Replace in-memory SequenceTracker, SharedInt/SharedFloat, and status_tracking_queue with DB-backed submission tracking
  • Refactor SenderThread and StatusTrackingThread to use operations_repository for submission and acknowledgment of operations
  • Simplify LagTracker to derive async lag from operations_repository timestamps
  • Add ThrottledLogger to throttle repetitive log messages and remove the old print_message utility
  • Remove obsolete sync parameters (e.g., minimal sleep times) and deprecate sequence_tracker/shared_var modules

Tests:

  • Update unit tests for sync_process, sender/status threads, operations_repository, lag_tracking, and timer to reflect the new submission table and boolean wait methods
  • Add tests for get/save/delete operation submissions and ensure end-to-end sync honors pre-existing submissions

Copy link

sourcery-ai bot commented Jun 16, 2025

Reviewer's Guide

This PR replaces in-memory synchronization constructs (SharedVars, SequenceTracker, queues) with a persistent database-backed submission tracking via a new run_operation_submission table and OperationSubmission API; refactors wait loops and CLI/sync threads to poll operation counts from the DB and return boolean status; introduces a ThrottledLogger for rate-limited logging; and updates tests to reflect the new workflow.

Sequence diagram for wait_for_submission and wait_for_processing using DB-backed polling

sequenceDiagram
    actor User
    participant Run as Run
    participant OperationsRepository as DB
    User->>Run: wait_for_submission(timeout, verbose)
    Run->>DB: get_operation_submission_sequence_id_range()
    Run->>DB: get_operations_sequence_id_range()
    alt operations remain
        Run->>User: (logs via ThrottledLogger)
        Run-->>Run: sleep and retry
        Run->>DB: repeat polling
    else all submitted
        Run->>User: return True
    end
    User->>Run: wait_for_processing(timeout, verbose)
    Run->>DB: get_operation_count()
    alt operations remain
        Run->>User: (logs via ThrottledLogger)
        Run-->>Run: sleep and retry
        Run->>DB: repeat polling
    else all processed
        Run->>User: return True
    end
Loading

Class diagram for OperationSubmission and OperationsRepository changes

classDiagram
    class OperationSubmission {
        +SequenceId sequence_id
        +int timestamp
        +RequestId request_id
        +datetime ts
    }
    class OperationsRepository {
        +save_operation_submissions(submissions: list[OperationSubmission]) SequenceId
        +get_operation_submissions(limit: int) list[OperationSubmission]
        +delete_operation_submissions(up_to_seq_id: Optional[SequenceId]) int
        +get_operation_submission_count(limit: Optional[int]) int
        +get_operation_submission_sequence_id_range() Optional[tuple[SequenceId, SequenceId]]
        +get_operation_count(limit: Optional[int]) int
        +get_operations_sequence_id_range() Optional[tuple[SequenceId, SequenceId]]
        +get_operations_min_timestamp() Optional[datetime]
    }
    OperationsRepository --> OperationSubmission
Loading

Class diagram for removal of SharedVars, SequenceTracker, and related constructs

classDiagram
    class SharedInt {
        <<removed>>
    }
    class SharedFloat {
        <<removed>>
    }
    class SequenceTracker {
        <<removed>>
    }
    class PeekableQueue {
        <<removed>>
    }
    class StatusTrackingElement {
        <<removed>>
    }
Loading

Class diagram for ThrottledLogger addition

classDiagram
    class ThrottledLogger {
        -logging.Logger _logger
        -bool _enabled
        -Optional[float] _last_print_timestamp
        +__init__(logger: logging.Logger, enabled: bool)
        +info(msg: str, *args: Any)
    }
Loading

File-Level Changes

Change Details Files
Replace in-memory synchronization constructs with DB-backed tracking
  • Removed SharedInt/SharedFloat, SequenceTracker, PeekableQueue and related queue usage
  • Updated SenderThread and StatusTrackingThread to save/get/delete submissions via OperationsRepository
  • Removed status_tracking_queue and wait_seq usage in run, sync_process, CLI and lag tracker
  • Assertions in run now verify operations_repo is set instead of sequence_tracker
src/neptune_scale/api/run.py
src/neptune_scale/sync/sync_process.py
src/neptune_scale/cli/sync.py
src/neptune_scale/sync/lag_tracking.py
src/neptune_scale/sync/parameters.py
Add persistent run_operation_submission table and OperationSubmission API
  • Created run_operation_submission table in DB initialization
  • Introduced OperationSubmission dataclass with sequence_id, timestamp, request_id
  • Implemented save/get/delete/count/sequence_id_range methods for operation submissions
src/neptune_scale/sync/operations_repository.py
Refactor wait loops and run API to return boolean status
  • Replaced generic _wait with wait_for_submission and wait_for_processing in Run API
  • Integrated Timer for timeout management and combined submission/file upload waits
  • Polled DB for operation/file counts rather than waiting on shared variables
src/neptune_scale/api/run.py
Introduce ThrottledLogger for rate-limited logging
  • Added ThrottledLogger class in util/logger.py
  • Replaced print_message logic in wait loops with ThrottledLogger.info
  • Configured log throttling based on STOP_MESSAGE_FREQUENCY
src/neptune_scale/util/logger.py
Update test suite for DB-backed submission workflow
  • Replaced SharedVar, PeekableQueue and StatusTrackingElement in tests with OperationSubmission
  • Adjusted mocks to return get_operation_submissions, get_operation_submission_sequence_id_range
  • Added unit tests for new submission methods and e2e test for nonempty submissions
tests/unit/test_sync_process.py
tests/unit/sync/test_operations_repository.py
tests/unit/test_lag_tracker.py
tests/unit/test_run.py
tests/e2e/test_sync.py
tests/e2e/test_log_and_fetch.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @michalsosn - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/neptune_scale/api/run.py:491` </location>
<code_context>
         fork_step: Optional[Union[int, float]],
     ) -> None:
-        if self._operations_repo is None or self._sequence_tracker is None:
+        if self._operations_repo is None:
             logger.debug("Run is in mode that doesn't support creating runs.")
             return
</code_context>

<issue_to_address>
Early return if _operations_repo is None may mask issues.

If _operations_repo should never be None, raise an exception or log a warning instead to avoid hiding configuration errors.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        if self._operations_repo is None:
            logger.debug("Run is in mode that doesn't support creating runs.")
            return
=======
        if self._operations_repo is None:
            logger.error("Configuration error: _operations_repo is None. Cannot create runs without a valid operations repository.")
            raise RuntimeError("Cannot create runs: _operations_repo is not configured.")
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +491 to 493
if self._operations_repo is None:
logger.debug("Run is in mode that doesn't support creating runs.")
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Early return if _operations_repo is None may mask issues.

If _operations_repo should never be None, raise an exception or log a warning instead to avoid hiding configuration errors.

Suggested change
if self._operations_repo is None:
logger.debug("Run is in mode that doesn't support creating runs.")
return
if self._operations_repo is None:
logger.error("Configuration error: _operations_repo is None. Cannot create runs without a valid operations repository.")
raise RuntimeError("Cannot create runs: _operations_repo is not configured.")

Comment on lines +207 to +208
for key, value in data.items():
assert fetched[key] == value, f"Value for {key} does not match"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Avoid loops in tests. (no-loop-in-tests)

ExplanationAvoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:

  • loops
  • conditionals

Some ways to fix this:

  • Use parametrized tests to get rid of the loop.
  • Move the complex logic into helpers.
  • Move the complex part into pytest fixtures.

Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / Don't Put Logic in Tests

Comment on lines +301 to +303
for i in range(10):
update = UpdateRunSnapshot(assign={"key": Value(string=f"a{i}")})
updates.append(update)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Avoid loops in tests. (no-loop-in-tests)

ExplanationAvoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:

  • loops
  • conditionals

Some ways to fix this:

  • Use parametrized tests to get rid of the loop.
  • Move the complex logic into helpers.
  • Move the complex part into pytest fixtures.

Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / Don't Put Logic in Tests

Comment on lines +340 to +342
for i in range(10):
update = UpdateRunSnapshot(assign={"key": Value(string=f"a{i}")})
updates.append(update)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Avoid loops in tests. (no-loop-in-tests)

ExplanationAvoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:

  • loops
  • conditionals

Some ways to fix this:

  • Use parametrized tests to get rid of the loop.
  • Move the complex logic into helpers.
  • Move the complex part into pytest fixtures.

Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / Don't Put Logic in Tests

Comment on lines +273 to 275
else:
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary else after guard condition (remove-unnecessary-else)

Suggested change
else:
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None

Comment on lines +277 to +280
if queued_range is None:
last_queued_seq = SequenceId(-1)
else:
last_queued_seq = queued_range[1]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace if statement with if expression (assign-if-exp)

Suggested change
if queued_range is None:
last_queued_seq = SequenceId(-1)
else:
last_queued_seq = queued_range[1]
last_queued_seq = SequenceId(-1) if queued_range is None else queued_range[1]

@michalsosn michalsosn closed this Jun 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant