-
Notifications
You must be signed in to change notification settings - Fork 1
feat: remove SharedVars, SequenceTracker, status_tracking_queue. Add run_operation_submission table #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideMigrate async coordination from in-memory shared variables and queues to a persistent run_operation_submission table with a new OperationSubmission model, refactor SenderThread/StatusTrackingThread and run API waiting logic to poll the repository, simplify lag tracking via database timestamps, bump schema to v4, and update tests and CLI accordingly. Sequence Diagram: Operation Logging and SubmissionsequenceDiagram
participant R as Run
participant OR as OperationsRepository
participant ST as SenderThread
participant NA as NeptuneAPI
R->>OR: save_update_run_snapshots(operations)
activate OR
OR-->>R: (sequence_id)
deactivate OR
loop Periodically
ST->>OR: get_operations(from_exclusive=last_queued_seq)
activate OR
OR-->>ST: operations_batch
deactivate OR
opt operations_batch is not empty
ST->>NA: ingest_run_operations(operations_batch)
activate NA
NA-->>ST: request_ids_response
deactivate NA
ST->>OR: save_operation_submissions(OperationSubmission[seq_id, ts, req_id])
activate OR
OR-->>ST: (confirm / new_last_queued_seq)
deactivate OR
ST->>ST: Update internal last_queued_seq
end
end
Sequence Diagram: Status Tracking and Operation CleanupsequenceDiagram
participant STT as StatusTrackingThread
participant OR as OperationsRepository
participant NA as NeptuneAPI
loop Periodically
STT->>OR: get_operation_submissions(limit=BATCH_SIZE)
activate OR
OR-->>STT: batch_of_submissions
deactivate OR
opt batch_of_submissions is not empty
STT->>NA: check_batch(request_ids=[s.request_id for s in batch])
activate NA
NA-->>STT: bulk_request_status_response
deactivate NA
loop for each status in response related to a submission
alt status is processed successfully
STT->>OR: delete_operation_submissions(up_to_seq_id=processed_seq_id)
activate OR
OR-->>STT: (deleted_submission_count)
deactivate OR
STT->>OR: delete_operations(up_to_seq_id=processed_seq_id)
activate OR
OR-->>STT: (deleted_operation_count)
deactivate OR
else status indicates not ready or error
STT->>STT: Handle error or wait for next cycle
end
end
end
end
Entity Relationship Diagram for run_operation_submission TableerDiagram
run_operations {
INTEGER sequence_id PK
TEXT operation_type
BLOB data
INTEGER timestamp
INTEGER operation_size_bytes
}
run_operation_submission {
INTEGER sequence_id PK "FK to run_operations.sequence_id"
INTEGER timestamp "Submission Timestamp"
TEXT request_id "Request ID from API"
}
run_operations ||--o| run_operation_submission : "submission_is_tracked_for"
Class Diagram for Async Coordination RefactorclassDiagram
class OperationSubmission {
+SequenceId sequence_id
+int timestamp
+RequestId request_id
+datetime ts
}
class OperationsRepository {
+save_operation_submissions(statuses: list~OperationSubmission~) SequenceId
+get_operation_submissions(limit: Optional~int~) list~OperationSubmission~
+delete_operation_submissions(up_to_seq_id: SequenceId) int
+get_operation_submission_count(limit: Optional~int~) int
+get_operation_submission_sequence_id_range() Optional~tuple~SequenceId, SequenceId~~
+get_operations_min_timestamp() Optional~datetime~
+get_operation_count(limit: Optional~int~) int
+get_operations_sequence_id_range() Optional~tuple~SequenceId, SequenceId~~
}
class Run {
- _operations_repo: OperationsRepository
+wait_for_submission(timeout: Optional~float~, verbose: bool)
+wait_for_processing(timeout: Optional~float~, verbose: bool)
#_wait_for_operation_submission(timeout: Optional~float~, verbose: bool)
#_wait_for_operation_processing(timeout: Optional~float~, verbose: bool)
}
class SenderThread {
- _operations_repository: OperationsRepository
- _last_queued_seq: SequenceId
+work()
}
class StatusTrackingThread {
- _operations_repository: OperationsRepository
+work()
}
class LagTracker {
- _operations_repository: OperationsRepository
+work()
}
class SharedInt {
<<Removed>>
+int value
+wait(timeout: Optional~float~)
+notify_all()
}
class SharedFloat {
<<Removed>>
+float value
+wait(timeout: Optional~float~)
+notify_all()
}
class SequenceTracker {
<<Removed>>
+update_sequence_id(sequence_id: SequenceId)
+SequenceId last_sequence_id
+Optional~float~ last_timestamp
}
class PeekableQueue {
<<Removed>>
+put(element: T)
+peek(max_size: int) Optional~list~T~~
+commit(n: int)
}
class StatusTrackingElement {
<<Removed>>
+SequenceId sequence_id
+datetime timestamp
+str request_id
}
Run *-- OperationsRepository
SenderThread *-- OperationsRepository
StatusTrackingThread *-- OperationsRepository
LagTracker *-- OperationsRepository
SenderThread ..> OperationSubmission : creates and saves via OperationsRepository
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
2027ab0
to
c1bb15d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @michalsosn - I've reviewed your changes and they look great!
Here's what I looked at during the review
- 🟡 General issues: 1 issue found
- 🟢 Security: all looks good
- 🟡 Testing: 1 issue found
- 🟢 Complexity: all looks good
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
if log_seq_id_range is not None: | ||
acked_count = log_seq_id_range[0] - self._log_seq_id_range[0] | ||
time.sleep(wait_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Sleeping for wait_time even if all operations are already submitted may delay progress.
Currently, the code sleeps for wait_time whenever log_seq_id_range is not None, even if no operations are pending. Please add a check to ensure there is work to wait for before sleeping.
tests/unit/test_lag_tracker.py
Outdated
repo.close(cleanup_files=True) | ||
|
||
|
||
def test__lag_tracker__callback_called(operations_repo, temp_db_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider making lag condition more deterministic for test__lag_tracker__callback_called
.
To improve test reliability, mock time.time()
in LagTracker.work
or set the initial operation's timestamp in the past to ensure the lag threshold is consistently met.
Suggested implementation:
from unittest.mock import patch
def test__lag_tracker__callback_called(operations_repo, temp_db_path):
# given
async_lag_threshold = 1.0
# and
snapshot = UpdateRunSnapshot(assign={"key": Value(string="value")})
operations_repo.save_update_run_snapshots([snapshot])
callback = Mock()
# Mock time.time() to ensure lag is always above threshold
with patch("time.time") as mock_time:
# Set the operation timestamp to 1000, and current time to 1002
# so lag = 2.0 > async_lag_threshold
mock_time.return_value = 1002.0
# You may need to set the snapshot's timestamp to 1000.0 if possible,
# or ensure the repository returns a timestamp of 1000.0 for the latest operation.
# If UpdateRunSnapshot or the repo allows setting timestamp, do so here.
# Otherwise, ensure the repo's implementation uses a fixed timestamp for this test.
If UpdateRunSnapshot
or operations_repo.save_update_run_snapshots
allows setting the timestamp, set it explicitly to a value (e.g., 1000.0) so that the lag calculation is deterministic. If not, you may need to adjust the repository or snapshot creation logic to allow this for the test.
else: | ||
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Remove unnecessary else after guard condition (remove-unnecessary-else
)
else: | |
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | |
return None | |
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | |
return None |
tests/unit/test_lag_tracker.py
Outdated
db_path = pathlib.Path(temp_dir) / "test_operations.db" | ||
yield db_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Inline variable that is immediately yielded (inline-immediately-yielded-variable
)
db_path = pathlib.Path(temp_dir) / "test_operations.db" | |
yield db_path | |
yield pathlib.Path(temp_dir) / "test_operations.db" |
c1bb15d
to
ea2f560
Compare
self._last_ack_timestamp.value = processed_timestamp.timestamp() | ||
self._last_ack_timestamp.notify_all() | ||
# TODO: delete in a single transaction | ||
self._operations_repository.delete_operation_submissions(up_to_seq_id=processed_sequence_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should reverse the order here. If we fail after the first call, it's better to delete operations, since we'll delete the DB anyway if only submissions remain in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need to delete old submissions when StatusTrackingThread starts
just like the old status tracking queue started from 0 when the process was restarted
otherwise sync would try to resume from the point that the previous sync left, including the in flight status requests
and the requests have a limited ttl, so the resync would likely get stuck if they expired in the meantime - and not even restarting the sync process would fix it
self._wait_for_file_upload(timeout=timer.remaining_time(), verbose=verbose) | ||
if operations_count > 0: | ||
last_print_timestamp = print_message( | ||
"Waiting for remaining %d%s operation(s) to be processed", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason this method accepts msg + *args? Since we're already here, could you make it accept just a string? That way we could use interpolation - more readable. If you feel it makes sense in this MR, you could also create a small utility class with some "debounce" semantic - it would hold the last_print_timestamp
state and verbose
. Just a thought, not pushing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably the same reason log.info accepts *args, some minor performance optimization and ease of use compared to calling .format() in pre-format string python. This isn't anything I've changed tho
src/neptune_scale/api/run.py
Outdated
) | ||
|
||
if timer.is_expired(): | ||
if verbose: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should print 1-time messages regardless of verbose
(not just in this method).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a weird to control logs with this parameter instead of logging levels...
It seems to have disabled (almost) all logging in the past, both the one off and the recurring messages
It probably won't surprise anyone if it changes, so w/e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if we log exactly 1 message - at the end of every branch - when verbose is False it'd be a sensible compromise
last_print_timestamp = print_message( | ||
f"Waiting. No operations were {phrase} yet", | ||
"Waiting. No operations were submitted yet. Operations to sync: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may get printed if we query the repo between deletion of submissions and deletion of operations. I'd like to reverse the order of those deletions anyway, so this won't happen, but I don't really see value in differing the log message between "waiting for X" and "waiting for X, no work has been done yet" -> please use the same log msg here and in the 1st if branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"You haven't managed to submit even a single operation yet" (= check if you can connect to neptune at all from your vpn) and "You have submitted some operations, we're just slow to process them" (= you are probably right to complain to us) suggest very different classes of problems.
|
||
assert wait_seq is not None | ||
assert self._sequence_tracker is not None | ||
# assumption: submissions are always behind or equal to the logged operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, but are you using this assumption in any way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That I should read the ranges in this order:
submitted_sequence_id_range = self._operations_repo.get_operation_submission_sequence_id_range()
logged_sequence_id_range = self._operations_repo.get_operations_sequence_id_range()
rather than this:
logged_sequence_id_range = self._operations_repo.get_operations_sequence_id_range()
submitted_sequence_id_range = self._operations_repo.get_operation_submission_sequence_id_range()
Because in the first case the logged range will have ge values than the submitted range, no matter how much work the sync process does in between the read operations
And in the second case the submitted range could be both greater or lower than the logged range depending on how much work the sync process does...
It also means I should not have to consider the case of submitted_sequence_id_range is not None and logged_sequence_id_range is None
Doing delete_operation_submissions > delete_operations
instead of delete_operations > delete_operation_submissions
was a moment of breaking this assumption, because the submissions range could get ahead of operations.
02cdc61
to
71903e2
Compare
…run_operation_submission table
71903e2
to
0f32ae4
Compare
fixes PY-121
Summary by Sourcery
Migrate asynchronous synchronization coordination from in-memory shared variables and queues to a persistent database table and simplify related waiting and lag-tracking logic.
New Features:
Enhancements:
Tests:
Summary by Sourcery
Migrate asynchronous synchronization coordination from in-memory shared variables and thread-local queues to a persistent run_operation_submission database table and refactor sync components to use operations_repository APIs.
New Features:
Enhancements:
Tests: