-
Notifications
You must be signed in to change notification settings - Fork 1
feat: remove SharedVars, SequenceTracker, status_tracking_queue. Add run_operation_submission table. Return bool status from wait #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…run_operation_submission table
Reviewer's GuideThis PR replaces in-memory synchronization constructs (SharedVars, SequenceTracker, queues) with a persistent database-backed submission tracking via a new run_operation_submission table and OperationSubmission API; refactors wait loops and CLI/sync threads to poll operation counts from the DB and return boolean status; introduces a ThrottledLogger for rate-limited logging; and updates tests to reflect the new workflow. Sequence diagram for wait_for_submission and wait_for_processing using DB-backed pollingsequenceDiagram
actor User
participant Run as Run
participant OperationsRepository as DB
User->>Run: wait_for_submission(timeout, verbose)
Run->>DB: get_operation_submission_sequence_id_range()
Run->>DB: get_operations_sequence_id_range()
alt operations remain
Run->>User: (logs via ThrottledLogger)
Run-->>Run: sleep and retry
Run->>DB: repeat polling
else all submitted
Run->>User: return True
end
User->>Run: wait_for_processing(timeout, verbose)
Run->>DB: get_operation_count()
alt operations remain
Run->>User: (logs via ThrottledLogger)
Run-->>Run: sleep and retry
Run->>DB: repeat polling
else all processed
Run->>User: return True
end
Class diagram for OperationSubmission and OperationsRepository changesclassDiagram
class OperationSubmission {
+SequenceId sequence_id
+int timestamp
+RequestId request_id
+datetime ts
}
class OperationsRepository {
+save_operation_submissions(submissions: list[OperationSubmission]) SequenceId
+get_operation_submissions(limit: int) list[OperationSubmission]
+delete_operation_submissions(up_to_seq_id: Optional[SequenceId]) int
+get_operation_submission_count(limit: Optional[int]) int
+get_operation_submission_sequence_id_range() Optional[tuple[SequenceId, SequenceId]]
+get_operation_count(limit: Optional[int]) int
+get_operations_sequence_id_range() Optional[tuple[SequenceId, SequenceId]]
+get_operations_min_timestamp() Optional[datetime]
}
OperationsRepository --> OperationSubmission
Class diagram for removal of SharedVars, SequenceTracker, and related constructsclassDiagram
class SharedInt {
<<removed>>
}
class SharedFloat {
<<removed>>
}
class SequenceTracker {
<<removed>>
}
class PeekableQueue {
<<removed>>
}
class StatusTrackingElement {
<<removed>>
}
Class diagram for ThrottledLogger additionclassDiagram
class ThrottledLogger {
-logging.Logger _logger
-bool _enabled
-Optional[float] _last_print_timestamp
+__init__(logger: logging.Logger, enabled: bool)
+info(msg: str, *args: Any)
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @michalsosn - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/neptune_scale/api/run.py:491` </location>
<code_context>
fork_step: Optional[Union[int, float]],
) -> None:
- if self._operations_repo is None or self._sequence_tracker is None:
+ if self._operations_repo is None:
logger.debug("Run is in mode that doesn't support creating runs.")
return
</code_context>
<issue_to_address>
Early return if _operations_repo is None may mask issues.
If _operations_repo should never be None, raise an exception or log a warning instead to avoid hiding configuration errors.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
if self._operations_repo is None:
logger.debug("Run is in mode that doesn't support creating runs.")
return
=======
if self._operations_repo is None:
logger.error("Configuration error: _operations_repo is None. Cannot create runs without a valid operations repository.")
raise RuntimeError("Cannot create runs: _operations_repo is not configured.")
>>>>>>> REPLACE
</suggested_fix>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
if self._operations_repo is None: | ||
logger.debug("Run is in mode that doesn't support creating runs.") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Early return if _operations_repo is None may mask issues.
If _operations_repo should never be None, raise an exception or log a warning instead to avoid hiding configuration errors.
if self._operations_repo is None: | |
logger.debug("Run is in mode that doesn't support creating runs.") | |
return | |
if self._operations_repo is None: | |
logger.error("Configuration error: _operations_repo is None. Cannot create runs without a valid operations repository.") | |
raise RuntimeError("Cannot create runs: _operations_repo is not configured.") |
for key, value in data.items(): | ||
assert fetched[key] == value, f"Value for {key} does not match" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Avoid loops in tests. (no-loop-in-tests
)
Explanation
Avoid complex code, like loops, in test functions.Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
- loops
- conditionals
Some ways to fix this:
- Use parametrized tests to get rid of the loop.
- Move the complex logic into helpers.
- Move the complex part into pytest fixtures.
Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / Don't Put Logic in Tests
for i in range(10): | ||
update = UpdateRunSnapshot(assign={"key": Value(string=f"a{i}")}) | ||
updates.append(update) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Avoid loops in tests. (no-loop-in-tests
)
Explanation
Avoid complex code, like loops, in test functions.Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
- loops
- conditionals
Some ways to fix this:
- Use parametrized tests to get rid of the loop.
- Move the complex logic into helpers.
- Move the complex part into pytest fixtures.
Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / Don't Put Logic in Tests
for i in range(10): | ||
update = UpdateRunSnapshot(assign={"key": Value(string=f"a{i}")}) | ||
updates.append(update) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Avoid loops in tests. (no-loop-in-tests
)
Explanation
Avoid complex code, like loops, in test functions.Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
- loops
- conditionals
Some ways to fix this:
- Use parametrized tests to get rid of the loop.
- Move the complex logic into helpers.
- Move the complex part into pytest fixtures.
Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / Don't Put Logic in Tests
else: | ||
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Remove unnecessary else after guard condition (remove-unnecessary-else
)
else: | |
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | |
return None | |
logger.error(f"Dropping {len(ops)} operations due to error", exc_info=True) | |
return None |
if queued_range is None: | ||
last_queued_seq = SequenceId(-1) | ||
else: | ||
last_queued_seq = queued_range[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Replace if statement with if expression (assign-if-exp
)
if queued_range is None: | |
last_queued_seq = SequenceId(-1) | |
else: | |
last_queued_seq = queued_range[1] | |
last_queued_seq = SequenceId(-1) if queued_range is None else queued_range[1] |
fixes PY-121, PY-138
Summary by Sourcery
Use a new run_operation_submission table in the database as the single source of truth for tracking and acknowledging RunOperations, removing the previous in-memory SequenceTracker, SharedVar, and queue mechanisms. Migrate sync process and threads to rely on OperationsRepository APIs, introduce boolean-returning wait_for_submission and wait_for_processing on Run, simplify LagTracker, and update tests and DB version to v4.
New Features:
Enhancements:
Tests: