diff --git a/src/neptune_scale/__init__.py b/src/neptune_scale/__init__.py index 2bc91f16..cc545bfe 100644 --- a/src/neptune_scale/__init__.py +++ b/src/neptune_scale/__init__.py @@ -59,6 +59,7 @@ ) from neptune_scale.envs import ( API_TOKEN_ENV_NAME, + OPERATIONS_TIMEOUT_SECONDS, PROJECT_ENV_NAME, ) from neptune_scale.exceptions import ( @@ -66,6 +67,7 @@ NeptuneProjectNotProvided, ) from neptune_scale.parameters import ( + DEFAULT_OPERATIONS_TIMEOUT_SECONDS, MAX_EXPERIMENT_NAME_LENGTH, MAX_FAMILY_LENGTH, MAX_QUEUE_SIZE, @@ -273,10 +275,8 @@ def __init__( self.wait_for_processing(verbose=False) def _handle_signal(self, signum: int, frame: Any) -> None: - if not self._is_closing: - signame = safe_signal_name(signum) - logger.debug(f"Received signal {signame}. Terminating.") - + signame = safe_signal_name(signum) + logger.debug(f"Received signal {signame}") self.terminate() @property @@ -294,21 +294,23 @@ def resources(self) -> tuple[Resource, ...]: self._errors_monitor, ) - def _close(self, *, wait: bool = True) -> None: + def _close(self, *, wait: bool = True, timeout: Optional[float] = None) -> None: + logger.debug(f"Run is closing, wait={wait}, is_closing={self._is_closing}") with self._lock: if self._is_closing: return self._is_closing = True - logger.debug(f"Run is closing, wait={wait}") - if self._sync_process.is_alive(): if wait: - self.wait_for_processing() - - self._sync_process.terminate() - self._sync_process.join() + self.wait_for_processing(timeout=timeout) + self._sync_process.terminate() + self._sync_process.join() + else: + # If we don't care about waiting, it means we're trying to abort the run on error, + # so make it quick + self._sync_process.kill() if self._lag_tracker is not None: self._lag_tracker.interrupt() @@ -351,14 +353,22 @@ def my_error_callback(exc): if self._exit_func is not None: atexit.unregister(self._exit_func) self._exit_func = None + + # Force cleanup in _close() to make sure we're not waiting for any threads / processes + self._is_closing = False self._close(wait=False) - def close(self) -> None: + def close(self, timeout: Optional[float] = None) -> None: """ Closes the connection to Neptune and waits for data synchronization to be completed. Use to finalize a regular run your model-training script. + Args: + timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. + If the timeout is reached `TimeoutError` is raised. If not provided, the value + of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. + Example: ``` from neptune_scale import Run @@ -373,7 +383,7 @@ def close(self) -> None: if self._exit_func is not None: atexit.unregister(self._exit_func) self._exit_func = None - self._close(wait=True) + self._close(wait=True, timeout=timeout) def _create_run( self, @@ -583,17 +593,22 @@ def _wait( timeout: Optional[float] = None, verbose: bool = True, ) -> None: + if timeout is None: + timeout = float(os.getenv(OPERATIONS_TIMEOUT_SECONDS, DEFAULT_OPERATIONS_TIMEOUT_SECONDS)) + + if timeout <= 0: + raise ValueError(f"Timeout must be greater than 0, but got {timeout}") + if verbose: logger.info(f"Waiting for all operations to be {phrase}") - if timeout is None and verbose: - logger.warning("No timeout specified. Waiting indefinitely") - - begin_time = time.time() - wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time + timeout_left = timeout last_print_timestamp: Optional[float] = None while True: + t0 = time.monotonic() + wait_time = min(sleep_time, timeout_left) + try: with self._lock: if not self._sync_process.is_alive(): @@ -611,6 +626,10 @@ def _wait( wait_condition.wait(timeout=wait_time) value = wait_value.value + timeout_left -= time.monotonic() - t0 + if timeout_left <= 0: + raise TimeoutError(f"Timeout of {timeout}s reached while waiting for all operations to be {phrase}") + last_queued_sequence_id = self._operations_queue.last_sequence_id if value == -1: @@ -636,7 +655,7 @@ def _wait( ) else: # Reaching the last queued sequence ID means that all operations were submitted - if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout): + if value >= last_queued_sequence_id: break except KeyboardInterrupt: if verbose: @@ -654,7 +673,9 @@ def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = T See wait_for_processing(). Args: - timeout (float, optional): In seconds, the maximum time to wait for submission. + timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. + If the timeout is reached `TimeoutError` is raised. If not provided, the value + of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. verbose (bool): If True (default), prints messages about the waiting process. """ self._wait( @@ -673,7 +694,9 @@ def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = T Once the call is complete, the data is saved in Neptune. Args: - timeout (float, optional): In seconds, the maximum time to wait for processing. + timeout (float, optional): Maximum time in seconds to wait for the remaining operations to complete. + If the timeout is reached `TimeoutError` is raised. If not provided, the value + of `NEPTUNE_OPERATIONS_TIMEOUT_SECONDS`. If not set, the default value of 300sec is used. verbose (bool): If True (default), prints messages about the waiting process. """ self._wait( diff --git a/src/neptune_scale/envs.py b/src/neptune_scale/envs.py index 02e0f9ff..f529e273 100644 --- a/src/neptune_scale/envs.py +++ b/src/neptune_scale/envs.py @@ -1,11 +1,12 @@ PROJECT_ENV_NAME = "NEPTUNE_PROJECT" - API_TOKEN_ENV_NAME = "NEPTUNE_API_TOKEN" +ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE" DISABLE_COLORS = "NEPTUNE_DISABLE_COLORS" - DEBUG_MODE = "NEPTUNE_DEBUG_MODE" SUBPROCESS_KILL_TIMEOUT = "NEPTUNE_SUBPROCESS_KILL_TIMEOUT" -ALLOW_SELF_SIGNED_CERTIFICATE = "NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE" +# Default timeout used to wait for operations to be processed by the Neptune backend. +# Affects Run.wait*() and Run.close*() methods. +OPERATIONS_TIMEOUT_SECONDS = "NEPTUNE_OPERATIONS_TIMEOUT_SECONDS" diff --git a/src/neptune_scale/parameters.py b/src/neptune_scale/parameters.py index b2e8bb62..c40e6af9 100644 --- a/src/neptune_scale/parameters.py +++ b/src/neptune_scale/parameters.py @@ -29,6 +29,7 @@ MINIMAL_WAIT_FOR_ACK_SLEEP_TIME = 10 STOP_MESSAGE_FREQUENCY = 5 LAG_TRACKER_TIMEOUT = 1 +DEFAULT_OPERATIONS_TIMEOUT_SECONDS = 30 * 60 # Status tracking MAX_REQUESTS_STATUS_BATCH_SIZE = 1000