Skip to content

feat: remove SharedVars, SequenceTracker, status_tracking_queue. Add run_operation_submission table #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

michalsosn
Copy link
Contributor

@michalsosn michalsosn commented May 29, 2025

fixes PY-121

Summary by Sourcery

Migrate asynchronous synchronization coordination from in-memory shared variables and queues to a persistent database table and simplify related waiting and lag-tracking logic.

New Features:

  • Add a new run_operation_submission table and OperationSubmission model to persist operation submissions.

Enhancements:

  • Remove SharedInt, SharedFloat, SequenceTracker, PeekableQueue, and StatusTrackingElement in favor of the new submissions table.
  • Implement operations_repository methods to save, retrieve, count, range-query, and delete OperationSubmission records.
  • Refactor SenderThread and StatusTrackingThread to operate on the database table instead of shared memory or thread queues.
  • Revamp wait_for_submission and wait_for_processing to poll operations_repository rather than rely on shared-variable notifications.
  • Simplify LagTracker to compute async lag directly from operations_repository timestamps.
  • Bump the database version to v4 and drop backward compatibility for older schema versions.

Tests:

  • Update unit tests to use operations_repository.get_operation_submissions and delete_operation_submissions instead of SharedVar and status queue mocks.
  • Remove tests and fixtures for util/shared_var and sync/sequence_tracker modules.

Summary by Sourcery

Migrate asynchronous synchronization coordination from in-memory shared variables and thread-local queues to a persistent run_operation_submission database table and refactor sync components to use operations_repository APIs.

New Features:

  • Add run_operation_submission table and OperationSubmission model to persist operation submissions

Enhancements:

  • Remove SharedFloat, SharedInt, SequenceTracker, PeekableQueue, and StatusTrackingElement in favor of database-backed submissions
  • Refactor SenderThread and StatusTrackingThread to use operations_repository for submitting and checking operations
  • Add wait_for_submission and wait_for_processing methods that poll operations_repository instead of relying on shared-memory notifications
  • Simplify LagTracker to compute async lag directly from operations_repository timestamps
  • Bump DB_VERSION to v4 and drop backward compatibility for older schema versions

Tests:

  • Update unit tests to remove shared-var and queue mocks and use operations_repository.get_operation_submissions and delete_operation_submissions
  • Add tests for new save/get/delete OperationSubmission methods and verify sync logic integration

Copy link

sourcery-ai bot commented May 29, 2025

Reviewer's Guide

Migrate async coordination from in-memory shared variables and queues to a persistent run_operation_submission table with a new OperationSubmission model, refactor SenderThread/StatusTrackingThread and run API waiting logic to poll the repository, simplify lag tracking via database timestamps, bump schema to v4, and update tests and CLI accordingly.

Sequence Diagram: Operation Logging and Submission

sequenceDiagram
    participant R as Run
    participant OR as OperationsRepository
    participant ST as SenderThread
    participant NA as NeptuneAPI

    R->>OR: save_update_run_snapshots(operations)
    activate OR
    OR-->>R: (sequence_id)
    deactivate OR

    loop Periodically
        ST->>OR: get_operations(from_exclusive=last_queued_seq)
        activate OR
        OR-->>ST: operations_batch
        deactivate OR

        opt operations_batch is not empty
            ST->>NA: ingest_run_operations(operations_batch)
            activate NA
            NA-->>ST: request_ids_response
            deactivate NA

            ST->>OR: save_operation_submissions(OperationSubmission[seq_id, ts, req_id])
            activate OR
            OR-->>ST: (confirm / new_last_queued_seq)
            deactivate OR
            ST->>ST: Update internal last_queued_seq
        end
    end
Loading

Sequence Diagram: Status Tracking and Operation Cleanup

sequenceDiagram
    participant STT as StatusTrackingThread
    participant OR as OperationsRepository
    participant NA as NeptuneAPI

    loop Periodically
        STT->>OR: get_operation_submissions(limit=BATCH_SIZE)
        activate OR
        OR-->>STT: batch_of_submissions
        deactivate OR

        opt batch_of_submissions is not empty
            STT->>NA: check_batch(request_ids=[s.request_id for s in batch])
            activate NA
            NA-->>STT: bulk_request_status_response
            deactivate NA

            loop for each status in response related to a submission
                alt status is processed successfully
                    STT->>OR: delete_operation_submissions(up_to_seq_id=processed_seq_id)
                    activate OR
                    OR-->>STT: (deleted_submission_count)
                    deactivate OR

                    STT->>OR: delete_operations(up_to_seq_id=processed_seq_id)
                    activate OR
                    OR-->>STT: (deleted_operation_count)
                    deactivate OR
                else status indicates not ready or error
                    STT->>STT: Handle error or wait for next cycle
                end
            end
        end
    end
Loading

Entity Relationship Diagram for run_operation_submission Table

erDiagram
    run_operations {
        INTEGER sequence_id PK
        TEXT operation_type
        BLOB data
        INTEGER timestamp
        INTEGER operation_size_bytes
    }

    run_operation_submission {
        INTEGER sequence_id PK "FK to run_operations.sequence_id"
        INTEGER timestamp "Submission Timestamp"
        TEXT request_id "Request ID from API"
    }

    run_operations ||--o| run_operation_submission : "submission_is_tracked_for"
Loading

Class Diagram for Async Coordination Refactor

classDiagram
    class OperationSubmission {
        +SequenceId sequence_id
        +int timestamp
        +RequestId request_id
        +datetime ts
    }

    class OperationsRepository {
        +save_operation_submissions(statuses: list~OperationSubmission~) SequenceId
        +get_operation_submissions(limit: Optional~int~) list~OperationSubmission~
        +delete_operation_submissions(up_to_seq_id: SequenceId) int
        +get_operation_submission_count(limit: Optional~int~) int
        +get_operation_submission_sequence_id_range() Optional~tuple~SequenceId, SequenceId~~
        +get_operations_min_timestamp() Optional~datetime~
        +get_operation_count(limit: Optional~int~) int
        +get_operations_sequence_id_range() Optional~tuple~SequenceId, SequenceId~~
    }

    class Run {
        - _operations_repo: OperationsRepository
        +wait_for_submission(timeout: Optional~float~, verbose: bool)
        +wait_for_processing(timeout: Optional~float~, verbose: bool)
        #_wait_for_operation_submission(timeout: Optional~float~, verbose: bool)
        #_wait_for_operation_processing(timeout: Optional~float~, verbose: bool)
    }

    class SenderThread {
        - _operations_repository: OperationsRepository
        - _last_queued_seq: SequenceId
        +work()
    }

    class StatusTrackingThread {
        - _operations_repository: OperationsRepository
        +work()
    }

    class LagTracker {
        - _operations_repository: OperationsRepository
        +work()
    }

    class SharedInt {
        <<Removed>>
        +int value
        +wait(timeout: Optional~float~)
        +notify_all()
    }
    class SharedFloat {
        <<Removed>>
        +float value
        +wait(timeout: Optional~float~)
        +notify_all()
    }
    class SequenceTracker {
        <<Removed>>
        +update_sequence_id(sequence_id: SequenceId)
        +SequenceId last_sequence_id
        +Optional~float~ last_timestamp
    }
    class PeekableQueue {
        <<Removed>>
        +put(element: T)
        +peek(max_size: int) Optional~list~T~~
        +commit(n: int)
    }
    class StatusTrackingElement {
        <<Removed>>
        +SequenceId sequence_id
        +datetime timestamp
        +str request_id
    }

    Run *-- OperationsRepository
    SenderThread *-- OperationsRepository
    StatusTrackingThread *-- OperationsRepository
    LagTracker *-- OperationsRepository
    SenderThread ..> OperationSubmission : creates and saves via OperationsRepository
Loading

File-Level Changes

Change Details Files
Introduce persistent operation submission storage
  • Define OperationSubmission dataclass
  • Create run_operation_submission table in DB init
  • Add save/get/delete/count methods for submissions
src/neptune_scale/sync/operations_repository.py
Remove in-memory synchronization primitives and modules
  • Delete SharedInt/SharedFloat usage and util/shared_var module
  • Remove SequenceTracker and sync/sequence_tracker module
  • Drop PeekableQueue and StatusTrackingElement from sync_process
src/neptune_scale/api/run.py
src/neptune_scale/util/shared_var.py
src/neptune_scale/sync/sequence_tracker.py
src/neptune_scale/sync/sync_process.py
src/neptune_scale/cli/sync.py
Refactor SenderThread and StatusTrackingThread to use the database
  • Replace queue puts/peeks with repository.save/get submissions
  • Use sequence_id field instead of SharedInt
  • Commit and delete submissions via repository methods
src/neptune_scale/sync/sync_process.py
Implement polling-based wait_for_submission and wait_for_processing
  • Add high-level wait_for_submission/processing methods with Timer
  • Remove old wait function and SharedVar notifications
  • Poll operations_repository for sequence ranges
src/neptune_scale/api/run.py
Simplify LagTracker to compute lag from DB timestamps
  • Use get_operations_min_timestamp instead of SharedFloat/SequenceTracker
  • Trigger async lag callback based on timestamp diff
src/neptune_scale/sync/lag_tracking.py
Bump database schema to v4 and drop older compatibility
  • Update DB_VERSION to v4 and BACKWARD_COMPATIBLE_DB_VERSIONS
  • Remove legacy schema branches
src/neptune_scale/sync/operations_repository.py
Update tests to use operations_repository for submissions
  • Switch test_sync_process to get_operation_submissions and delete_operation_submissions
  • Adjust repository tests for sequence range and counts
  • Remove shared_var and sequence_tracker test fixtures
tests/unit/test_sync_process.py
tests/unit/sync/test_operations_repository.py
tests/unit/test_run.py
tests/unit/test_lag_tracker.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

linear bot commented May 29, 2025

@michalsosn michalsosn force-pushed the ms/remove-sharedvarss branch from 2027ab0 to c1bb15d Compare May 29, 2025 09:42
@michalsosn michalsosn marked this pull request as ready for review May 29, 2025 10:38
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @michalsosn - I've reviewed your changes and they look great!

Here's what I looked at during the review
  • 🟡 General issues: 1 issue found
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +182 to +184
if log_seq_id_range is not None:
acked_count = log_seq_id_range[0] - self._log_seq_id_range[0]
time.sleep(wait_time)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Sleeping for wait_time even if all operations are already submitted may delay progress.

Currently, the code sleeps for wait_time whenever log_seq_id_range is not None, even if no operations are pending. Please add a check to ensure there is work to wait for before sleeping.

repo.close(cleanup_files=True)


def test__lag_tracker__callback_called(operations_repo, temp_db_path):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider making lag condition more deterministic for test__lag_tracker__callback_called.

To improve test reliability, mock time.time() in LagTracker.work or set the initial operation's timestamp in the past to ensure the lag threshold is consistently met.

Suggested implementation:

from unittest.mock import patch

def test__lag_tracker__callback_called(operations_repo, temp_db_path):
    # given
    async_lag_threshold = 1.0

    # and
    snapshot = UpdateRunSnapshot(assign={"key": Value(string="value")})
    operations_repo.save_update_run_snapshots([snapshot])
    callback = Mock()

    # Mock time.time() to ensure lag is always above threshold
    with patch("time.time") as mock_time:
        # Set the operation timestamp to 1000, and current time to 1002
        # so lag = 2.0 > async_lag_threshold
        mock_time.return_value = 1002.0
        # You may need to set the snapshot's timestamp to 1000.0 if possible,
        # or ensure the repository returns a timestamp of 1000.0 for the latest operation.
        # If UpdateRunSnapshot or the repo allows setting timestamp, do so here.
        # Otherwise, ensure the repo's implementation uses a fixed timestamp for this test.

If UpdateRunSnapshot or operations_repo.save_update_run_snapshots allows setting the timestamp, set it explicitly to a value (e.g., 1000.0) so that the lag calculation is deterministic. If not, you may need to adjust the repository or snapshot creation logic to allow this for the test.

Comment on lines +276 to 275
else:
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary else after guard condition (remove-unnecessary-else)

Suggested change
else:
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True)
return None

Comment on lines 23 to 24
db_path = pathlib.Path(temp_dir) / "test_operations.db"
yield db_path
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Inline variable that is immediately yielded (inline-immediately-yielded-variable)

Suggested change
db_path = pathlib.Path(temp_dir) / "test_operations.db"
yield db_path
yield pathlib.Path(temp_dir) / "test_operations.db"

@michalsosn michalsosn force-pushed the ms/remove-sharedvarss branch from c1bb15d to ea2f560 Compare May 29, 2025 11:00
self._last_ack_timestamp.value = processed_timestamp.timestamp()
self._last_ack_timestamp.notify_all()
# TODO: delete in a single transaction
self._operations_repository.delete_operation_submissions(up_to_seq_id=processed_sequence_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reverse the order here. If we fail after the first call, it's better to delete operations, since we'll delete the DB anyway if only submissions remain in it.

Copy link
Contributor Author

@michalsosn michalsosn Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to delete old submissions when StatusTrackingThread starts
just like the old status tracking queue started from 0 when the process was restarted
otherwise sync would try to resume from the point that the previous sync left, including the in flight status requests
and the requests have a limited ttl, so the resync would likely get stuck if they expired in the meantime - and not even restarting the sync process would fix it

self._wait_for_file_upload(timeout=timer.remaining_time(), verbose=verbose)
if operations_count > 0:
last_print_timestamp = print_message(
"Waiting for remaining %d%s operation(s) to be processed",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason this method accepts msg + *args? Since we're already here, could you make it accept just a string? That way we could use interpolation - more readable. If you feel it makes sense in this MR, you could also create a small utility class with some "debounce" semantic - it would hold the last_print_timestamp state and verbose. Just a thought, not pushing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably the same reason log.info accepts *args, some minor performance optimization and ease of use compared to calling .format() in pre-format string python. This isn't anything I've changed tho

)

if timer.is_expired():
if verbose:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should print 1-time messages regardless of verbose (not just in this method).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a weird to control logs with this parameter instead of logging levels...
It seems to have disabled (almost) all logging in the past, both the one off and the recurring messages
It probably won't surprise anyone if it changes, so w/e

Copy link
Contributor Author

@michalsosn michalsosn Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we log exactly 1 message - at the end of every branch - when verbose is False it'd be a sensible compromise

last_print_timestamp = print_message(
f"Waiting. No operations were {phrase} yet",
"Waiting. No operations were submitted yet. Operations to sync: %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may get printed if we query the repo between deletion of submissions and deletion of operations. I'd like to reverse the order of those deletions anyway, so this won't happen, but I don't really see value in differing the log message between "waiting for X" and "waiting for X, no work has been done yet" -> please use the same log msg here and in the 1st if branch.

Copy link
Contributor Author

@michalsosn michalsosn Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"You haven't managed to submit even a single operation yet" (= check if you can connect to neptune at all from your vpn) and "You have submitted some operations, we're just slow to process them" (= you are probably right to complain to us) suggest very different classes of problems.


assert wait_seq is not None
assert self._sequence_tracker is not None
# assumption: submissions are always behind or equal to the logged operations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, but are you using this assumption in any way?

Copy link
Contributor Author

@michalsosn michalsosn Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That I should read the ranges in this order:

submitted_sequence_id_range = self._operations_repo.get_operation_submission_sequence_id_range()
logged_sequence_id_range = self._operations_repo.get_operations_sequence_id_range()

rather than this:

logged_sequence_id_range = self._operations_repo.get_operations_sequence_id_range()
submitted_sequence_id_range = self._operations_repo.get_operation_submission_sequence_id_range()

Because in the first case the logged range will have ge values than the submitted range, no matter how much work the sync process does in between the read operations
And in the second case the submitted range could be both greater or lower than the logged range depending on how much work the sync process does...

It also means I should not have to consider the case of submitted_sequence_id_range is not None and logged_sequence_id_range is None

Doing delete_operation_submissions > delete_operations instead of delete_operations > delete_operation_submissions
was a moment of breaking this assumption, because the submissions range could get ahead of operations.

@michalsosn michalsosn force-pushed the ms/remove-sharedvarss branch 4 times, most recently from 02cdc61 to 71903e2 Compare June 9, 2025 17:18
@michalsosn michalsosn force-pushed the ms/remove-sharedvarss branch from 71903e2 to 0f32ae4 Compare June 13, 2025 14:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants