Skip to content

Add a global default timeout for all wait() operations. #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
)
from neptune_scale.envs import (
API_TOKEN_ENV_NAME,
OPERATIONS_TIMEOUT_SECONDS,
PROJECT_ENV_NAME,
)
from neptune_scale.exceptions import (
NeptuneApiTokenNotProvided,
NeptuneProjectNotProvided,
)
from neptune_scale.parameters import (
DEFAULT_OPERATIONS_TIMEOUT_SECONDS,
MAX_EXPERIMENT_NAME_LENGTH,
MAX_FAMILY_LENGTH,
MAX_QUEUE_SIZE,
Expand Down Expand Up @@ -273,10 +275,8 @@ def __init__(
self.wait_for_processing(verbose=False)

def _handle_signal(self, signum: int, frame: Any) -> None:
if not self._is_closing:
signame = safe_signal_name(signum)
logger.debug(f"Received signal {signame}. Terminating.")

signame = safe_signal_name(signum)
logger.debug(f"Received signal {signame}")
self.terminate()

@property
Expand All @@ -294,21 +294,23 @@ def resources(self) -> tuple[Resource, ...]:
self._errors_monitor,
)

def _close(self, *, wait: bool = True) -> None:
def _close(self, *, wait: bool = True, timeout: Optional[float] = None) -> None:
logger.debug(f"Run is closing, wait={wait}, is_closing={self._is_closing}")
with self._lock:
if self._is_closing:
return

self._is_closing = True

logger.debug(f"Run is closing, wait={wait}")

if self._sync_process.is_alive():
if wait:
self.wait_for_processing()

self._sync_process.terminate()
self._sync_process.join()
self.wait_for_processing(timeout=timeout)
self._sync_process.terminate()
self._sync_process.join()
else:
# If we don't care about waiting, it means we're trying to abort the run on error,
# so make it quick
self._sync_process.kill()

if self._lag_tracker is not None:
self._lag_tracker.interrupt()
Expand Down Expand Up @@ -351,14 +353,22 @@ def my_error_callback(exc):
if self._exit_func is not None:
atexit.unregister(self._exit_func)
self._exit_func = None

# Force cleanup in _close() to make sure we're not waiting for any threads / processes
self._is_closing = False
self._close(wait=False)

def close(self) -> None:
def close(self, timeout: Optional[float] = None) -> None:
"""
Closes the connection to Neptune and waits for data synchronization to be completed.

Use to finalize a regular run your model-training script.

Args:
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete.
If the timeout is reached `TimeoutError` is raised. If not provided, the value
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used.

Example:
```
from neptune_scale import Run
Expand All @@ -373,7 +383,7 @@ def close(self) -> None:
if self._exit_func is not None:
atexit.unregister(self._exit_func)
self._exit_func = None
self._close(wait=True)
self._close(wait=True, timeout=timeout)

def _create_run(
self,
Expand Down Expand Up @@ -583,17 +593,22 @@ def _wait(
timeout: Optional[float] = None,
verbose: bool = True,
) -> None:
if timeout is None:
timeout = float(os.getenv(OPERATIONS_TIMEOUT_SECONDS, DEFAULT_OPERATIONS_TIMEOUT_SECONDS))

if timeout <= 0:
raise ValueError(f"Timeout must be greater than 0, but got {timeout}")

if verbose:
logger.info(f"Waiting for all operations to be {phrase}")

if timeout is None and verbose:
logger.warning("No timeout specified. Waiting indefinitely")

begin_time = time.time()
wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time
timeout_left = timeout
last_print_timestamp: Optional[float] = None

while True:
t0 = time.monotonic()
wait_time = min(sleep_time, timeout_left)

try:
with self._lock:
if not self._sync_process.is_alive():
Expand All @@ -611,6 +626,10 @@ def _wait(
wait_condition.wait(timeout=wait_time)
value = wait_value.value

timeout_left -= time.monotonic() - t0
Copy link
Contributor

@michalsosn michalsosn Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you doing it this way instead of the usual (at least I've seen this pattern in this codebase)

start_time = time.monotonic()
while True:
    ..
    time_elapsed = time.monotonic() - start_time
    if time_elapsed > timeout:
        boom

?

Your implementation may wait longer than the specified timeout, because you are moving the t0 on each loop and thus ignoring the time of executing BLOCK B

while True:
    t0 = time.monotonic()   ## stores e.g.   1000ms, 2000ms, 3000ms   on each iteration
    BLOCK A  # ~900ms passes
    timeout_left = time.monotonic() - t0   ## subtracts e.g. 1900ms - 1000ms, 2900 - 2000, 3900 - 3000...
    BLOCK B  # ~100ms passes, but it's not subtracted from timeout_left

if timeout_left <= 0:
raise TimeoutError(f"Timeout of {timeout}s reached while waiting for all operations to be {phrase}")

last_queued_sequence_id = self._operations_queue.last_sequence_id

if value == -1:
Expand All @@ -636,7 +655,7 @@ def _wait(
)
else:
# Reaching the last queued sequence ID means that all operations were submitted
if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout):
if value >= last_queued_sequence_id:
break
except KeyboardInterrupt:
if verbose:
Expand All @@ -654,7 +673,9 @@ def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = T
See wait_for_processing().

Args:
timeout (float, optional): In seconds, the maximum time to wait for submission.
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete.
If the timeout is reached `TimeoutError` is raised. If not provided, the value
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used.
verbose (bool): If True (default), prints messages about the waiting process.
"""
self._wait(
Expand All @@ -673,7 +694,9 @@ def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = T
Once the call is complete, the data is saved in Neptune.

Args:
timeout (float, optional): In seconds, the maximum time to wait for processing.
timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete.
If the timeout is reached `TimeoutError` is raised. If not provided, the value
of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used.
verbose (bool): If True (default), prints messages about the waiting process.
"""
self._wait(
Expand Down
7 changes: 4 additions & 3 deletions src/neptune_scale/envs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
PROJECT_ENV_NAME = "NEPTUNE_PROJECT"

API_TOKEN_ENV_NAME = "NEPTUNE_API_TOKEN"
ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE"

DISABLE_COLORS = "NEPTUNE_DISABLE_COLORS"

DEBUG_MODE = "NEPTUNE_DEBUG_MODE"

SUBPROCESS_KILL_TIMEOUT = "NEPTUNE_SUBPROCESS_KILL_TIMEOUT"

ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE"
# Default timeout used to wait for operations to be processed by the Neptune backend.
# Affects Run.wait*() and Run.close*() methods.
OPERATIONS_TIMEOUT_SECONDS = "NEPTUNE_OPERATIONS_TIMEOUT_SECONDS"
1 change: 1 addition & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MINIMAL_WAIT_FOR_ACK_SLEEP_TIME = 10
STOP_MESSAGE_FREQUENCY = 5
LAG_TRACKER_TIMEOUT = 1
DEFAULT_OPERATIONS_TIMEOUT_SECONDS = 30 * 60

# Status tracking
MAX_REQUESTS_STATUS_BATCH_SIZE = 1000
Loading