-
Notifications
You must be signed in to change notification settings - Fork 1
Add a global default timeout for all wait() operations. #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,13 +59,15 @@ | |
) | ||
from neptune_scale.envs import ( | ||
API_TOKEN_ENV_NAME, | ||
OPERATIONS_TIMEOUT_SECONDS, | ||
PROJECT_ENV_NAME, | ||
) | ||
from neptune_scale.exceptions import ( | ||
NeptuneApiTokenNotProvided, | ||
NeptuneProjectNotProvided, | ||
) | ||
from neptune_scale.parameters import ( | ||
DEFAULT_OPERATIONS_TIMEOUT_SECONDS, | ||
MAX_EXPERIMENT_NAME_LENGTH, | ||
MAX_FAMILY_LENGTH, | ||
MAX_QUEUE_SIZE, | ||
|
@@ -273,10 +275,8 @@ def __init__( | |
self.wait_for_processing(verbose=False) | ||
|
||
def _handle_signal(self, signum: int, frame: Any) -> None: | ||
if not self._is_closing: | ||
signame = safe_signal_name(signum) | ||
logger.debug(f"Received signal {signame}. Terminating.") | ||
|
||
signame = safe_signal_name(signum) | ||
logger.debug(f"Received signal {signame}") | ||
self.terminate() | ||
|
||
@property | ||
|
@@ -294,21 +294,23 @@ def resources(self) -> tuple[Resource, ...]: | |
self._errors_monitor, | ||
) | ||
|
||
def _close(self, *, wait: bool = True) -> None: | ||
def _close(self, *, wait: bool = True, timeout: Optional[float] = None) -> None: | ||
logger.debug(f"Run is closing, wait={wait}, is_closing={self._is_closing}") | ||
with self._lock: | ||
if self._is_closing: | ||
return | ||
|
||
self._is_closing = True | ||
|
||
logger.debug(f"Run is closing, wait={wait}") | ||
|
||
if self._sync_process.is_alive(): | ||
if wait: | ||
self.wait_for_processing() | ||
|
||
self._sync_process.terminate() | ||
self._sync_process.join() | ||
self.wait_for_processing(timeout=timeout) | ||
self._sync_process.terminate() | ||
self._sync_process.join() | ||
else: | ||
# If we don't care about waiting, it means we're trying to abort the run on error, | ||
# so make it quick | ||
self._sync_process.kill() | ||
|
||
if self._lag_tracker is not None: | ||
self._lag_tracker.interrupt() | ||
|
@@ -351,14 +353,22 @@ def my_error_callback(exc): | |
if self._exit_func is not None: | ||
atexit.unregister(self._exit_func) | ||
self._exit_func = None | ||
|
||
# Force cleanup in _close() to make sure we're not waiting for any threads / processes | ||
self._is_closing = False | ||
self._close(wait=False) | ||
|
||
def close(self) -> None: | ||
def close(self, timeout: Optional[float] = None) -> None: | ||
""" | ||
Closes the connection to Neptune and waits for data synchronization to be completed. | ||
|
||
Use to finalize a regular run your model-training script. | ||
|
||
Args: | ||
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. | ||
If the timeout is reached `TimeoutError` is raised. If not provided, the value | ||
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. | ||
|
||
Example: | ||
``` | ||
from neptune_scale import Run | ||
|
@@ -373,7 +383,7 @@ def close(self) -> None: | |
if self._exit_func is not None: | ||
atexit.unregister(self._exit_func) | ||
self._exit_func = None | ||
self._close(wait=True) | ||
self._close(wait=True, timeout=timeout) | ||
|
||
def _create_run( | ||
self, | ||
|
@@ -583,17 +593,22 @@ def _wait( | |
timeout: Optional[float] = None, | ||
verbose: bool = True, | ||
) -> None: | ||
if timeout is None: | ||
timeout = float(os.getenv(OPERATIONS_TIMEOUT_SECONDS, DEFAULT_OPERATIONS_TIMEOUT_SECONDS)) | ||
|
||
if timeout <= 0: | ||
raise ValueError(f"Timeout must be greater than 0, but got {timeout}") | ||
|
||
if verbose: | ||
logger.info(f"Waiting for all operations to be {phrase}") | ||
|
||
if timeout is None and verbose: | ||
logger.warning("No timeout specified. Waiting indefinitely") | ||
|
||
begin_time = time.time() | ||
wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time | ||
timeout_left = timeout | ||
last_print_timestamp: Optional[float] = None | ||
|
||
while True: | ||
t0 = time.monotonic() | ||
wait_time = min(sleep_time, timeout_left) | ||
|
||
try: | ||
with self._lock: | ||
if not self._sync_process.is_alive(): | ||
|
@@ -611,6 +626,10 @@ def _wait( | |
wait_condition.wait(timeout=wait_time) | ||
value = wait_value.value | ||
|
||
timeout_left -= time.monotonic() - t0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are you doing it this way instead of the usual (at least I've seen this pattern in this codebase)
? Your implementation may wait longer than the specified timeout, because you are moving the t0 on each loop and thus ignoring the time of executing BLOCK B
|
||
if timeout_left <= 0: | ||
raise TimeoutError(f"Timeout of {timeout}s reached while waiting for all operations to be {phrase}") | ||
|
||
last_queued_sequence_id = self._operations_queue.last_sequence_id | ||
|
||
if value == -1: | ||
|
@@ -636,7 +655,7 @@ def _wait( | |
) | ||
else: | ||
# Reaching the last queued sequence ID means that all operations were submitted | ||
if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout): | ||
if value >= last_queued_sequence_id: | ||
break | ||
except KeyboardInterrupt: | ||
if verbose: | ||
|
@@ -654,7 +673,9 @@ def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = T | |
See wait_for_processing(). | ||
|
||
Args: | ||
timeout (float, optional): In seconds, the maximum time to wait for submission. | ||
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. | ||
If the timeout is reached `TimeoutError` is raised. If not provided, the value | ||
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. | ||
kgodlewski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
verbose (bool): If True (default), prints messages about the waiting process. | ||
""" | ||
self._wait( | ||
|
@@ -673,7 +694,9 @@ def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = T | |
Once the call is complete, the data is saved in Neptune. | ||
|
||
Args: | ||
timeout (float, optional): In seconds, the maximum time to wait for processing. | ||
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. | ||
If the timeout is reached `TimeoutError` is raised. If not provided, the value | ||
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. | ||
kgodlewski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
verbose (bool): If True (default), prints messages about the waiting process. | ||
""" | ||
self._wait( | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
PROJECT_ENV_NAME = "NEPTUNE_PROJECT" | ||
|
||
API_TOKEN_ENV_NAME = "NEPTUNE_API_TOKEN" | ||
ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE" | ||
|
||
DISABLE_COLORS = "NEPTUNE_DISABLE_COLORS" | ||
|
||
DEBUG_MODE = "NEPTUNE_DEBUG_MODE" | ||
|
||
SUBPROCESS_KILL_TIMEOUT = "NEPTUNE_SUBPROCESS_KILL_TIMEOUT" | ||
|
||
ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE" | ||
# Default timeout used to wait for operations to be processed by the Neptune backend. | ||
# Affects Run.wait*() and Run.close*() methods. | ||
OPERATIONS_TIMEOUT_SECONDS = "NEPTUNE_OPERATIONS_TIMEOUT_SECONDS" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.