From f4c7b480c0876c27dc91f46ac11da78a19d2dacb Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Thu, 21 Nov 2024 14:54:21 +0100 Subject: [PATCH 1/2] Refactor internal package structure --- src/neptune_scale/__init__.py | 691 +----------------- src/neptune_scale/api/__init__.py | 1 + src/neptune_scale/{ => api}/exceptions.py | 2 +- src/neptune_scale/api/run.py | 687 +++++++++++++++++ src/neptune_scale/{core => api}/validation.py | 0 src/neptune_scale/core/components/__init__.py | 0 src/neptune_scale/net/__init__.py | 1 + src/neptune_scale/{api => net}/api_client.py | 8 +- .../{core => net}/serialization.py | 0 src/neptune_scale/{core => sync}/__init__.py | 0 .../components => sync}/aggregating_queue.py | 14 +- .../components => sync}/errors_tracking.py | 12 +- .../{core/components => sync}/lag_tracking.py | 14 +- .../{core => sync}/metadata_splitter.py | 2 +- .../components => sync}/operations_queue.py | 10 +- src/neptune_scale/{ => sync}/parameters.py | 0 .../components => sync}/queue_element.py | 0 .../{core/components => sync}/sync_process.py | 48 +- src/neptune_scale/{core => sync}/util.py | 0 src/neptune_scale/util/__init__.py | 17 + .../{core/components => util}/abstract.py | 0 .../{core/components => util}/daemon.py | 2 +- src/neptune_scale/{ => util}/envs.py | 0 src/neptune_scale/{core => util}/logger.py | 4 +- .../{core => util}/process_killer.py | 2 +- .../{core => util}/process_link.py | 6 +- .../{core => util}/shared_var.py | 0 src/neptune_scale/{core => util}/styles.py | 2 +- tests/unit/test_aggregating_queue.py | 4 +- tests/unit/test_errors_monitor.py | 2 +- tests/unit/test_lag_tracker.py | 4 +- tests/unit/test_metadata_splitter.py | 2 +- tests/unit/test_operations_queue.py | 2 +- tests/unit/test_process_link.py | 2 +- tests/unit/test_run.py | 7 +- tests/unit/test_shared_var.py | 2 +- 36 files changed, 788 insertions(+), 760 deletions(-) rename src/neptune_scale/{ => api}/exceptions.py (99%) create mode 100644 src/neptune_scale/api/run.py rename src/neptune_scale/{core => api}/validation.py (100%) delete mode 100644 src/neptune_scale/core/components/__init__.py create mode 100644 src/neptune_scale/net/__init__.py rename src/neptune_scale/{api => net}/api_client.py (96%) rename src/neptune_scale/{core => net}/serialization.py (100%) rename src/neptune_scale/{core => sync}/__init__.py (100%) rename src/neptune_scale/{core/components => sync}/aggregating_queue.py (97%) rename src/neptune_scale/{core/components => sync}/errors_tracking.py (93%) rename src/neptune_scale/{core/components => sync}/lag_tracking.py (85%) rename src/neptune_scale/{core => sync}/metadata_splitter.py (99%) rename src/neptune_scale/{core/components => sync}/operations_queue.py (90%) rename src/neptune_scale/{ => sync}/parameters.py (100%) rename src/neptune_scale/{core/components => sync}/queue_element.py (100%) rename src/neptune_scale/{core/components => sync}/sync_process.py (97%) rename src/neptune_scale/{core => sync}/util.py (100%) create mode 100644 src/neptune_scale/util/__init__.py rename src/neptune_scale/{core/components => util}/abstract.py (100%) rename src/neptune_scale/{core/components => util}/daemon.py (98%) rename src/neptune_scale/{ => util}/envs.py (100%) rename src/neptune_scale/{core => util}/logger.py (94%) rename src/neptune_scale/{core => util}/process_killer.py (94%) rename src/neptune_scale/{core => util}/process_link.py (98%) rename src/neptune_scale/{core => util}/shared_var.py (100%) rename src/neptune_scale/{core => util}/styles.py (96%) diff --git a/src/neptune_scale/__init__.py b/src/neptune_scale/__init__.py index 7cbd13c1..ce2a30ff 100644 --- a/src/neptune_scale/__init__.py +++ b/src/neptune_scale/__init__.py @@ -1,687 +1,4 @@ -""" -Python package -""" - -from __future__ import annotations - -__all__ = ["Run"] - -import atexit -import os -import threading -import time -from contextlib import AbstractContextManager -from datetime import datetime -from typing import ( - Any, - Callable, - Dict, - List, - Literal, - Optional, - Set, - Tuple, - Union, -) - -from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ForkPoint -from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun -from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation - -from neptune_scale.core.components.abstract import ( - Resource, - WithResources, -) -from neptune_scale.core.components.errors_tracking import ( - ErrorsMonitor, - ErrorsQueue, -) -from neptune_scale.core.components.lag_tracking import LagTracker -from neptune_scale.core.components.operations_queue import OperationsQueue -from neptune_scale.core.components.sync_process import SyncProcess -from neptune_scale.core.logger import get_logger -from neptune_scale.core.metadata_splitter import MetadataSplitter -from neptune_scale.core.process_link import ProcessLink -from neptune_scale.core.serialization import ( - datetime_to_proto, - make_step, -) -from neptune_scale.core.shared_var import ( - SharedFloat, - SharedInt, -) -from neptune_scale.core.validation import ( - verify_collection_type, - verify_max_length, - verify_non_empty, - verify_project_qualified_name, - verify_type, -) -from neptune_scale.envs import ( - API_TOKEN_ENV_NAME, - PROJECT_ENV_NAME, -) -from neptune_scale.exceptions import ( - NeptuneApiTokenNotProvided, - NeptuneProjectNotProvided, -) -from neptune_scale.parameters import ( - MAX_EXPERIMENT_NAME_LENGTH, - MAX_QUEUE_SIZE, - MAX_RUN_ID_LENGTH, - MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, - MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, - STOP_MESSAGE_FREQUENCY, -) - -logger = get_logger() - - -class Run(WithResources, AbstractContextManager): - """ - Representation of tracked metadata. - """ - - def __init__( - self, - *, - run_id: str, - project: Optional[str] = None, - api_token: Optional[str] = None, - resume: bool = False, - mode: Literal["async", "disabled"] = "async", - experiment_name: Optional[str] = None, - creation_time: Optional[datetime] = None, - fork_run_id: Optional[str] = None, - fork_step: Optional[Union[int, float]] = None, - max_queue_size: int = MAX_QUEUE_SIZE, - async_lag_threshold: Optional[float] = None, - on_async_lag_callback: Optional[Callable[[], None]] = None, - on_queue_full_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, - on_network_error_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, - on_error_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, - on_warning_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, - ) -> None: - """ - Initializes a run that logs the model-building metadata to Neptune. - - Args: - run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 bytes. - project: Name of the project where the metadata is logged, in the form `workspace-name/project-name`. - If not provided, the value of the `NEPTUNE_PROJECT` environment variable is used. - api_token: Your Neptune API token. If not provided, the value of the `NEPTUNE_API_TOKEN` environment - variable is used. - resume: Whether to resume an existing run. - mode: Mode of operation. If set to "disabled", the run doesn't log any metadata. - experiment_name: If creating a run as an experiment, name (ID) of the experiment to be associated with the run. - creation_time: Custom creation time of the run. - fork_run_id: If forking from an existing run, ID of the run to fork from. - fork_step: If forking from an existing run, step number to fork from. - max_queue_size: Maximum number of operations in a queue. - async_lag_threshold: Threshold for the duration between the queueing and synchronization of an operation - (in seconds). If the duration exceeds the threshold, the callback function is triggered. - on_async_lag_callback: Callback function triggered when the duration between the queueing and synchronization - on_queue_full_callback: Callback function triggered when the queue is full. The function should take the exception - that made the queue full as its argument and an optional timestamp of the last time the exception was raised. - on_network_error_callback: Callback function triggered when a network error occurs. - on_error_callback: The default callback function triggered when error occurs. It applies if an error - wasn't caught by other callbacks. - on_warning_callback: Callback function triggered when a warning occurs. - """ - - verify_type("run_id", run_id, str) - verify_type("resume", resume, bool) - verify_type("project", project, (str, type(None))) - verify_type("api_token", api_token, (str, type(None))) - verify_type("experiment_name", experiment_name, (str, type(None))) - verify_type("creation_time", creation_time, (datetime, type(None))) - verify_type("fork_run_id", fork_run_id, (str, type(None))) - verify_type("fork_step", fork_step, (int, float, type(None))) - verify_type("max_queue_size", max_queue_size, int) - verify_type("async_lag_threshold", async_lag_threshold, (int, float, type(None))) - verify_type("on_async_lag_callback", on_async_lag_callback, (Callable, type(None))) - verify_type("on_queue_full_callback", on_queue_full_callback, (Callable, type(None))) - verify_type("on_network_error_callback", on_network_error_callback, (Callable, type(None))) - verify_type("on_error_callback", on_error_callback, (Callable, type(None))) - verify_type("on_warning_callback", on_warning_callback, (Callable, type(None))) - - if resume and creation_time is not None: - raise ValueError("`resume` and `creation_time` cannot be used together.") - if resume and experiment_name is not None: - raise ValueError("`resume` and `experiment_name` cannot be used together.") - if (fork_run_id is not None and fork_step is None) or (fork_run_id is None and fork_step is not None): - raise ValueError("`fork_run_id` and `fork_step` must be used together.") - if resume and fork_run_id is not None: - raise ValueError("`resume` and `fork_run_id` cannot be used together.") - if resume and fork_step is not None: - raise ValueError("`resume` and `fork_step` cannot be used together.") - - if ( - on_async_lag_callback is not None - and async_lag_threshold is None - or on_async_lag_callback is None - and async_lag_threshold is not None - ): - raise ValueError("`on_async_lag_callback` must be used with `async_lag_threshold`.") - - if max_queue_size < 1: - raise ValueError("`max_queue_size` must be greater than 0.") - - project = project or os.environ.get(PROJECT_ENV_NAME) - if project: - project = project.strip('"').strip("'") - else: - raise NeptuneProjectNotProvided() - assert project is not None # mypy - input_project: str = project - - api_token = api_token or os.environ.get(API_TOKEN_ENV_NAME) - if api_token is None: - raise NeptuneApiTokenNotProvided() - assert api_token is not None # mypy - input_api_token: str = api_token - - verify_non_empty("run_id", run_id) - if experiment_name is not None: - verify_non_empty("experiment_name", experiment_name) - verify_max_length("experiment_name", experiment_name, MAX_EXPERIMENT_NAME_LENGTH) - if fork_run_id is not None: - verify_non_empty("fork_run_id", fork_run_id) - verify_max_length("fork_run_id", fork_run_id, MAX_RUN_ID_LENGTH) - - verify_project_qualified_name("project", project) - - verify_max_length("run_id", run_id, MAX_RUN_ID_LENGTH) - - # This flag is used to signal that we're closed or being closed (and most likely waiting for sync), and no - # new data should be logged. - self._is_closing = False - - self._project: str = input_project - self._run_id: str = run_id - - self._lock = threading.RLock() - self._operations_queue: OperationsQueue = OperationsQueue( - lock=self._lock, - max_size=max_queue_size, - ) - self._errors_queue: ErrorsQueue = ErrorsQueue() - self._errors_monitor = ErrorsMonitor( - errors_queue=self._errors_queue, - on_queue_full_callback=on_queue_full_callback, - on_network_error_callback=on_network_error_callback, - on_error_callback=on_error_callback, - on_warning_callback=on_warning_callback, - ) - - self._last_queued_seq = SharedInt(-1) - self._last_ack_seq = SharedInt(-1) - self._last_ack_timestamp = SharedFloat(-1) - - self._process_link = ProcessLink() - self._sync_process = SyncProcess( - project=self._project, - family=self._run_id, - operations_queue=self._operations_queue.queue, - errors_queue=self._errors_queue, - process_link=self._process_link, - api_token=input_api_token, - last_queued_seq=self._last_queued_seq, - last_ack_seq=self._last_ack_seq, - last_ack_timestamp=self._last_ack_timestamp, - max_queue_size=max_queue_size, - mode=mode, - ) - self._lag_tracker: Optional[LagTracker] = None - if async_lag_threshold is not None and on_async_lag_callback is not None: - self._lag_tracker = LagTracker( - errors_queue=self._errors_queue, - operations_queue=self._operations_queue, - last_ack_timestamp=self._last_ack_timestamp, - async_lag_threshold=async_lag_threshold, - on_async_lag_callback=on_async_lag_callback, - ) - self._lag_tracker.start() - - self._errors_monitor.start() - with self._lock: - self._sync_process.start() - self._process_link.start(on_link_closed=self._on_child_link_closed) - - self._exit_func: Optional[Callable[[], None]] = atexit.register(self._close) - - if not resume: - self._create_run( - creation_time=datetime.now() if creation_time is None else creation_time, - experiment_name=experiment_name, - fork_run_id=fork_run_id, - fork_step=fork_step, - ) - self.wait_for_processing(verbose=False) - - def _on_child_link_closed(self, _: ProcessLink) -> None: - with self._lock: - if not self._is_closing: - logger.error("Child process closed unexpectedly. Terminating.") - self._is_closing = True - self.terminate() - - @property - def resources(self) -> tuple[Resource, ...]: - if self._lag_tracker is not None: - return ( - self._errors_queue, - self._operations_queue, - self._lag_tracker, - self._errors_monitor, - ) - return ( - self._errors_queue, - self._operations_queue, - self._errors_monitor, - ) - - def _close(self, *, wait: bool = True) -> None: - with self._lock: - if self._is_closing: - return - - self._is_closing = True - - logger.debug(f"Run is closing, wait={wait}") - - if self._sync_process.is_alive(): - if wait: - self.wait_for_processing() - - self._sync_process.terminate() - self._sync_process.join() - self._process_link.stop() - - if self._lag_tracker is not None: - self._lag_tracker.interrupt() - self._lag_tracker.wake_up() - self._lag_tracker.join() - - self._errors_monitor.interrupt() - - # Don't call join() if being called from the error thread, as this will - # result in a "cannot join current thread" exception. - if threading.current_thread() != self._errors_monitor: - self._errors_monitor.join() - - super().close() - - def terminate(self) -> None: - """ - Closes the connection and aborts all synchronization mechanisms. - - Use in error callbacks to stop the run from interfering with the training process - in case of an unrecoverable error. - - Example: - ``` - from neptune_scale import Run - - def my_error_callback(exc): - run.terminate() - - run = Run( - on_error_callback=my_error_callback - ..., - ) - ``` - """ - - if not self._is_closing: - logger.info("Terminating Run.") - - if self._exit_func is not None: - atexit.unregister(self._exit_func) - self._exit_func = None - self._close(wait=False) - - def close(self) -> None: - """ - Closes the connection to Neptune and waits for data synchronization to be completed. - - Use to finalize a regular run your model-training script. - - Example: - ``` - from neptune_scale import Run - - with Run(...) as run: - # logging and training code - - run.close() - ``` - """ - - if self._exit_func is not None: - atexit.unregister(self._exit_func) - self._exit_func = None - self._close(wait=True) - - def _create_run( - self, - creation_time: datetime, - experiment_name: Optional[str], - fork_run_id: Optional[str], - fork_step: Optional[Union[int, float]], - ) -> None: - fork_point: Optional[ForkPoint] = None - if fork_run_id is not None and fork_step is not None: - fork_point = ForkPoint( - parent_project=self._project, parent_run_id=fork_run_id, step=make_step(number=fork_step) - ) - - operation = RunOperation( - project=self._project, - run_id=self._run_id, - create=CreateRun( - family=self._run_id, - fork_point=fork_point, - experiment_id=experiment_name, - creation_time=None if creation_time is None else datetime_to_proto(creation_time), - ), - ) - self._operations_queue.enqueue(operation=operation) - - def log_metrics( - self, - data: Dict[str, Union[float, int]], - step: Union[float, int], - *, - timestamp: Optional[datetime] = None, - ) -> None: - """ - Logs the specified metrics to a Neptune run. - - You can log metrics representing a series of numeric values. Pass the metadata as a dictionary {key: value} with - - - key: path to where the metadata should be stored in the run. - - value: a float or int value to append to the series. - - For example, {"metrics/accuracy": 0.89}. - In the attribute path, each forward slash "/" nests the attribute under a namespace. - Use namespaces to structure the metadata into meaningful categories. - - Args: - data: Dictionary of metrics to log. - Each metric value is associated with a step. - To log multiple metrics at once, pass multiple key-value pairs. - step: Index of the log entry. Must be increasing. - Tip: Using float rather than int values can be useful, for example, when logging substeps in a batch. - timestamp (optional): Time of logging the metadata. If not provided, the current time is used. If provided, - and `timestamp.tzinfo` is not set, the time is assumed to be in the local timezone. - - - Examples: - ``` - from neptune_scale import Run - - with Run(...) as run: - run.log_metrics( - data={"loss": 0.14, "acc": 0.78}, - step=1.2, - ) - ``` - """ - self.log(step=step, timestamp=timestamp, metrics=data) - - def log_configs( - self, data: Optional[Dict[str, Union[float, bool, int, str, datetime, list, set, tuple]]] = None - ) -> None: - """ - Logs the specified metadata to a Neptune run. - - You can log configurations or other single values. Pass the metadata as a dictionary {key: value} with - - - key: path to where the metadata should be stored in the run. - - value: configuration or other single value to log. - - For example, {"parameters/learning_rate": 0.001}. - In the attribute path, each forward slash "/" nests the attribute under a namespace. - Use namespaces to structure the metadata into meaningful categories. - - Args: - data: Dictionary of configs or other values to log. - Available types: float, integer, Boolean, string, and datetime. - - Any `datetime` values that don't have the `tzinfo` attribute set are assumed to be in the local timezone. - - Example: - ``` - from neptune_scale import Run - - with Run(...) as run: - run.log_configs( - data={ - "parameters/learning_rate": 0.001, - "parameters/batch_size": 64, - }, - ) - ``` - """ - self.log(configs=data) - - def add_tags(self, tags: Union[List[str], Set[str], Tuple[str]], group_tags: bool = False) -> None: - """ - Adds the list of tags to the run. - - Args: - tags: Tags to add to the run, as a list or set of strings. - group_tags: To add group tags instead of regular tags, set to `True`. - - Example: - ``` - from neptune_scale import Run - - with Run(...) as run: - run.add_tags(tags=["tag1", "tag2", "tag3"]) - ``` - """ - name = "sys/tags" if not group_tags else "sys/group_tags" - self.log(tags_add={name: tags}) - - def remove_tags(self, tags: Union[List[str], Set[str], Tuple[str]], group_tags: bool = False) -> None: - """ - Removes the specified tags from the run. - - Args: - tags: Tags to remove to the run, as a list or set of strings. - group_tags: To remove group tags instead of regular tags, set to `True`. - - Example: - ``` - from neptune_scale import Run - - with Run(...) as run: - run.remove_tags(tags=["tag2", "tag3"]) - ``` - """ - name = "sys/tags" if not group_tags else "sys/group_tags" - self.log(tags_remove={name: tags}) - - def log( - self, - step: Optional[Union[float, int]] = None, - timestamp: Optional[datetime] = None, - configs: Optional[Dict[str, Union[float, bool, int, str, datetime, list, set, tuple]]] = None, - metrics: Optional[Dict[str, Union[float, int]]] = None, - tags_add: Optional[Dict[str, Union[List[str], Set[str], Tuple[str]]]] = None, - tags_remove: Optional[Dict[str, Union[List[str], Set[str], Tuple[str]]]] = None, - ) -> None: - """ - See one of the following instead: - - - log_configs() - - log_metrics() - - add_tags() - - remove_tags() - """ - - verify_type("step", step, (float, int, type(None))) - verify_type("timestamp", timestamp, (datetime, type(None))) - verify_type("configs", configs, (dict, type(None))) - verify_type("metrics", metrics, (dict, type(None))) - verify_type("tags_add", tags_add, (dict, type(None))) - verify_type("tags_remove", tags_remove, (dict, type(None))) - - timestamp = datetime.now() if timestamp is None else timestamp - configs = {} if configs is None else configs - metrics = {} if metrics is None else metrics - tags_add = {} if tags_add is None else tags_add - tags_remove = {} if tags_remove is None else tags_remove - - verify_collection_type("`configs` keys", list(configs.keys()), str) - verify_collection_type("`metrics` keys", list(metrics.keys()), str) - verify_collection_type("`tags_add` keys", list(tags_add.keys()), str) - verify_collection_type("`tags_remove` keys", list(tags_remove.keys()), str) - - verify_collection_type( - "`configs` values", list(configs.values()), (float, bool, int, str, datetime, list, set, tuple) - ) - verify_collection_type("`metrics` values", list(metrics.values()), (float, int)) - verify_collection_type("`tags_add` values", list(tags_add.values()), (list, set, tuple)) - verify_collection_type("`tags_remove` values", list(tags_remove.values()), (list, set, tuple)) - - # Don't log anything after we've been stopped. This allows continuing the training script - # after a non-recoverable error happened. Note we don't to use self._lock in this check, - # to keep the common path faster, because the benefit of locking here is minimal. - if self._is_closing: - return - - # TODO: move this to a separate process or thread, to make the .log call as lightweight as possible - splitter: MetadataSplitter = MetadataSplitter( - project=self._project, - run_id=self._run_id, - step=step, - timestamp=timestamp, - configs=configs, - metrics=metrics, - add_tags=tags_add, - remove_tags=tags_remove, - ) - - for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step) - - def _wait( - self, - phrase: str, - sleep_time: float, - wait_seq: SharedInt, - timeout: Optional[float] = None, - verbose: bool = True, - ) -> None: - if verbose: - logger.info(f"Waiting for all operations to be {phrase}") - - if timeout is None and verbose: - logger.warning("No timeout specified. Waiting indefinitely") - - begin_time = time.time() - wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time - last_print_timestamp: Optional[float] = None - - while True: - try: - with self._lock: - if not self._sync_process.is_alive(): - if verbose and not self._is_closing: - # TODO: error out here? - logger.warning("Sync process is not running") - return # No need to wait if the sync process is not running - - # Handle the case where we get notified on `wait_seq` before we actually wait. - # Otherwise, we would unnecessarily block, waiting on a notify_all() that never happens. - if wait_seq.value >= self._operations_queue.last_sequence_id: - break - - with wait_seq: - wait_seq.wait(timeout=wait_time) - value = wait_seq.value - - last_queued_sequence_id = self._operations_queue.last_sequence_id - - if value == -1: - if self._operations_queue.last_sequence_id != -1: - last_print_timestamp = print_message( - f"Waiting. No operations were {phrase} yet. Operations to sync: %s", - self._operations_queue.last_sequence_id + 1, - last_print=last_print_timestamp, - verbose=verbose, - ) - else: - last_print_timestamp = print_message( - f"Waiting. No operations were {phrase} yet", - last_print=last_print_timestamp, - verbose=verbose, - ) - elif value < last_queued_sequence_id: - last_print_timestamp = print_message( - f"Waiting for remaining %d operation(s) to be {phrase}", - last_queued_sequence_id - value + 1, - last_print=last_print_timestamp, - verbose=verbose, - ) - else: - # Reaching the last queued sequence ID means that all operations were submitted - if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout): - break - except KeyboardInterrupt: - if verbose: - logger.warning("Waiting interrupted by user") - return - - if verbose: - logger.info(f"All operations were {phrase}") - - def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = True) -> None: - """ - Waits until all metadata is submitted to Neptune for processing. - - When submitted, the data is not yet saved in Neptune until fully processed. - See wait_for_processing(). - - Args: - timeout (float, optional): In seconds, the maximum time to wait for submission. - verbose (bool): If True (default), prints messages about the waiting process. - """ - self._wait( - phrase="submitted", - sleep_time=MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, - wait_seq=self._last_queued_seq, - timeout=timeout, - verbose=verbose, - ) - - def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = True) -> None: - """ - Waits until all metadata is processed by Neptune. - - Once the call is complete, the data is saved in Neptune. - - Args: - timeout (float, optional): In seconds, the maximum time to wait for processing. - verbose (bool): If True (default), prints messages about the waiting process. - """ - self._wait( - phrase="processed", - sleep_time=MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, - wait_seq=self._last_ack_seq, - timeout=timeout, - verbose=verbose, - ) - - -def print_message(msg: str, *args: Any, last_print: Optional[float] = None, verbose: bool = True) -> Optional[float]: - current_time = time.time() - - if verbose and (last_print is None or current_time - last_print > STOP_MESSAGE_FREQUENCY): - logger.info(msg, *args) - return current_time - - return last_print +__all__ = [ + "Run", +] +from neptune_scale.api.run import Run diff --git a/src/neptune_scale/api/__init__.py b/src/neptune_scale/api/__init__.py index e69de29b..87413fd1 100644 --- a/src/neptune_scale/api/__init__.py +++ b/src/neptune_scale/api/__init__.py @@ -0,0 +1 @@ +# This package contains user-facing classes and functions. diff --git a/src/neptune_scale/exceptions.py b/src/neptune_scale/api/exceptions.py similarity index 99% rename from src/neptune_scale/exceptions.py rename to src/neptune_scale/api/exceptions.py index c15c38d8..889d3881 100644 --- a/src/neptune_scale/exceptions.py +++ b/src/neptune_scale/api/exceptions.py @@ -40,7 +40,7 @@ from typing import Any -from neptune_scale.core.styles import ( +from neptune_scale.util.styles import ( STYLES, ensure_style_detected, ) diff --git a/src/neptune_scale/api/run.py b/src/neptune_scale/api/run.py new file mode 100644 index 00000000..25faf5cc --- /dev/null +++ b/src/neptune_scale/api/run.py @@ -0,0 +1,687 @@ +""" +Python package +""" + +from __future__ import annotations + +__all__ = ["Run"] + +import atexit +import os +import threading +import time +from contextlib import AbstractContextManager +from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + List, + Literal, + Optional, + Set, + Tuple, + Union, +) + +from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ForkPoint +from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun +from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation + +from neptune_scale.api.exceptions import ( + NeptuneApiTokenNotProvided, + NeptuneProjectNotProvided, +) +from neptune_scale.api.validation import ( + verify_collection_type, + verify_max_length, + verify_non_empty, + verify_project_qualified_name, + verify_type, +) +from neptune_scale.net.serialization import ( + datetime_to_proto, + make_step, +) +from neptune_scale.sync.errors_tracking import ( + ErrorsMonitor, + ErrorsQueue, +) +from neptune_scale.sync.lag_tracking import LagTracker +from neptune_scale.sync.metadata_splitter import MetadataSplitter +from neptune_scale.sync.operations_queue import OperationsQueue +from neptune_scale.sync.parameters import ( + MAX_EXPERIMENT_NAME_LENGTH, + MAX_QUEUE_SIZE, + MAX_RUN_ID_LENGTH, + MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, + MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, + STOP_MESSAGE_FREQUENCY, +) +from neptune_scale.sync.sync_process import SyncProcess +from neptune_scale.util.abstract import ( + Resource, + WithResources, +) +from neptune_scale.util.envs import ( + API_TOKEN_ENV_NAME, + PROJECT_ENV_NAME, +) +from neptune_scale.util.logger import get_logger +from neptune_scale.util.process_link import ProcessLink +from neptune_scale.util.shared_var import ( + SharedFloat, + SharedInt, +) + +logger = get_logger() + + +class Run(WithResources, AbstractContextManager): + """ + Representation of tracked metadata. + """ + + def __init__( + self, + *, + run_id: str, + project: Optional[str] = None, + api_token: Optional[str] = None, + resume: bool = False, + mode: Literal["async", "disabled"] = "async", + experiment_name: Optional[str] = None, + creation_time: Optional[datetime] = None, + fork_run_id: Optional[str] = None, + fork_step: Optional[Union[int, float]] = None, + max_queue_size: int = MAX_QUEUE_SIZE, + async_lag_threshold: Optional[float] = None, + on_async_lag_callback: Optional[Callable[[], None]] = None, + on_queue_full_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, + on_network_error_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, + on_error_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, + on_warning_callback: Optional[Callable[[BaseException, Optional[float]], None]] = None, + ) -> None: + """ + Initializes a run that logs the model-building metadata to Neptune. + + Args: + run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 bytes. + project: Name of the project where the metadata is logged, in the form `workspace-name/project-name`. + If not provided, the value of the `NEPTUNE_PROJECT` environment variable is used. + api_token: Your Neptune API token. If not provided, the value of the `NEPTUNE_API_TOKEN` environment + variable is used. + resume: Whether to resume an existing run. + mode: Mode of operation. If set to "disabled", the run doesn't log any metadata. + experiment_name: If creating a run as an experiment, name (ID) of the experiment to be associated with the run. + creation_time: Custom creation time of the run. + fork_run_id: If forking from an existing run, ID of the run to fork from. + fork_step: If forking from an existing run, step number to fork from. + max_queue_size: Maximum number of operations in a queue. + async_lag_threshold: Threshold for the duration between the queueing and synchronization of an operation + (in seconds). If the duration exceeds the threshold, the callback function is triggered. + on_async_lag_callback: Callback function triggered when the duration between the queueing and synchronization + on_queue_full_callback: Callback function triggered when the queue is full. The function should take the exception + that made the queue full as its argument and an optional timestamp of the last time the exception was raised. + on_network_error_callback: Callback function triggered when a network error occurs. + on_error_callback: The default callback function triggered when error occurs. It applies if an error + wasn't caught by other callbacks. + on_warning_callback: Callback function triggered when a warning occurs. + """ + + verify_type("run_id", run_id, str) + verify_type("resume", resume, bool) + verify_type("project", project, (str, type(None))) + verify_type("api_token", api_token, (str, type(None))) + verify_type("experiment_name", experiment_name, (str, type(None))) + verify_type("creation_time", creation_time, (datetime, type(None))) + verify_type("fork_run_id", fork_run_id, (str, type(None))) + verify_type("fork_step", fork_step, (int, float, type(None))) + verify_type("max_queue_size", max_queue_size, int) + verify_type("async_lag_threshold", async_lag_threshold, (int, float, type(None))) + verify_type("on_async_lag_callback", on_async_lag_callback, (Callable, type(None))) + verify_type("on_queue_full_callback", on_queue_full_callback, (Callable, type(None))) + verify_type("on_network_error_callback", on_network_error_callback, (Callable, type(None))) + verify_type("on_error_callback", on_error_callback, (Callable, type(None))) + verify_type("on_warning_callback", on_warning_callback, (Callable, type(None))) + + if resume and creation_time is not None: + raise ValueError("`resume` and `creation_time` cannot be used together.") + if resume and experiment_name is not None: + raise ValueError("`resume` and `experiment_name` cannot be used together.") + if (fork_run_id is not None and fork_step is None) or (fork_run_id is None and fork_step is not None): + raise ValueError("`fork_run_id` and `fork_step` must be used together.") + if resume and fork_run_id is not None: + raise ValueError("`resume` and `fork_run_id` cannot be used together.") + if resume and fork_step is not None: + raise ValueError("`resume` and `fork_step` cannot be used together.") + + if ( + on_async_lag_callback is not None + and async_lag_threshold is None + or on_async_lag_callback is None + and async_lag_threshold is not None + ): + raise ValueError("`on_async_lag_callback` must be used with `async_lag_threshold`.") + + if max_queue_size < 1: + raise ValueError("`max_queue_size` must be greater than 0.") + + project = project or os.environ.get(PROJECT_ENV_NAME) + if project: + project = project.strip('"').strip("'") + else: + raise NeptuneProjectNotProvided() + assert project is not None # mypy + input_project: str = project + + api_token = api_token or os.environ.get(API_TOKEN_ENV_NAME) + if api_token is None: + raise NeptuneApiTokenNotProvided() + assert api_token is not None # mypy + input_api_token: str = api_token + + verify_non_empty("run_id", run_id) + if experiment_name is not None: + verify_non_empty("experiment_name", experiment_name) + verify_max_length("experiment_name", experiment_name, MAX_EXPERIMENT_NAME_LENGTH) + if fork_run_id is not None: + verify_non_empty("fork_run_id", fork_run_id) + verify_max_length("fork_run_id", fork_run_id, MAX_RUN_ID_LENGTH) + + verify_project_qualified_name("project", project) + + verify_max_length("run_id", run_id, MAX_RUN_ID_LENGTH) + + # This flag is used to signal that we're closed or being closed (and most likely waiting for sync), and no + # new data should be logged. + self._is_closing = False + + self._project: str = input_project + self._run_id: str = run_id + + self._lock = threading.RLock() + self._operations_queue: OperationsQueue = OperationsQueue( + lock=self._lock, + max_size=max_queue_size, + ) + self._errors_queue: ErrorsQueue = ErrorsQueue() + self._errors_monitor = ErrorsMonitor( + errors_queue=self._errors_queue, + on_queue_full_callback=on_queue_full_callback, + on_network_error_callback=on_network_error_callback, + on_error_callback=on_error_callback, + on_warning_callback=on_warning_callback, + ) + + self._last_queued_seq = SharedInt(-1) + self._last_ack_seq = SharedInt(-1) + self._last_ack_timestamp = SharedFloat(-1) + + self._process_link = ProcessLink() + self._sync_process = SyncProcess( + project=self._project, + family=self._run_id, + operations_queue=self._operations_queue.queue, + errors_queue=self._errors_queue, + process_link=self._process_link, + api_token=input_api_token, + last_queued_seq=self._last_queued_seq, + last_ack_seq=self._last_ack_seq, + last_ack_timestamp=self._last_ack_timestamp, + max_queue_size=max_queue_size, + mode=mode, + ) + self._lag_tracker: Optional[LagTracker] = None + if async_lag_threshold is not None and on_async_lag_callback is not None: + self._lag_tracker = LagTracker( + errors_queue=self._errors_queue, + operations_queue=self._operations_queue, + last_ack_timestamp=self._last_ack_timestamp, + async_lag_threshold=async_lag_threshold, + on_async_lag_callback=on_async_lag_callback, + ) + self._lag_tracker.start() + + self._errors_monitor.start() + with self._lock: + self._sync_process.start() + self._process_link.start(on_link_closed=self._on_child_link_closed) + + self._exit_func: Optional[Callable[[], None]] = atexit.register(self._close) + + if not resume: + self._create_run( + creation_time=datetime.now() if creation_time is None else creation_time, + experiment_name=experiment_name, + fork_run_id=fork_run_id, + fork_step=fork_step, + ) + self.wait_for_processing(verbose=False) + + def _on_child_link_closed(self, _: ProcessLink) -> None: + with self._lock: + if not self._is_closing: + logger.error("Child process closed unexpectedly. Terminating.") + self._is_closing = True + self.terminate() + + @property + def resources(self) -> tuple[Resource, ...]: + if self._lag_tracker is not None: + return ( + self._errors_queue, + self._operations_queue, + self._lag_tracker, + self._errors_monitor, + ) + return ( + self._errors_queue, + self._operations_queue, + self._errors_monitor, + ) + + def _close(self, *, wait: bool = True) -> None: + with self._lock: + if self._is_closing: + return + + self._is_closing = True + + logger.debug(f"Run is closing, wait={wait}") + + if self._sync_process.is_alive(): + if wait: + self.wait_for_processing() + + self._sync_process.terminate() + self._sync_process.join() + self._process_link.stop() + + if self._lag_tracker is not None: + self._lag_tracker.interrupt() + self._lag_tracker.wake_up() + self._lag_tracker.join() + + self._errors_monitor.interrupt() + + # Don't call join() if being called from the error thread, as this will + # result in a "cannot join current thread" exception. + if threading.current_thread() != self._errors_monitor: + self._errors_monitor.join() + + super().close() + + def terminate(self) -> None: + """ + Closes the connection and aborts all synchronization mechanisms. + + Use in error callbacks to stop the run from interfering with the training process + in case of an unrecoverable error. + + Example: + ``` + from neptune_scale import Run + + def my_error_callback(exc): + run.terminate() + + run = Run( + on_error_callback=my_error_callback + ..., + ) + ``` + """ + + if not self._is_closing: + logger.info("Terminating Run.") + + if self._exit_func is not None: + atexit.unregister(self._exit_func) + self._exit_func = None + self._close(wait=False) + + def close(self) -> None: + """ + Closes the connection to Neptune and waits for data synchronization to be completed. + + Use to finalize a regular run your model-training script. + + Example: + ``` + from neptune_scale import Run + + with Run(...) as run: + # logging and training code + + run.close() + ``` + """ + + if self._exit_func is not None: + atexit.unregister(self._exit_func) + self._exit_func = None + self._close(wait=True) + + def _create_run( + self, + creation_time: datetime, + experiment_name: Optional[str], + fork_run_id: Optional[str], + fork_step: Optional[Union[int, float]], + ) -> None: + fork_point: Optional[ForkPoint] = None + if fork_run_id is not None and fork_step is not None: + fork_point = ForkPoint( + parent_project=self._project, parent_run_id=fork_run_id, step=make_step(number=fork_step) + ) + + operation = RunOperation( + project=self._project, + run_id=self._run_id, + create=CreateRun( + family=self._run_id, + fork_point=fork_point, + experiment_id=experiment_name, + creation_time=None if creation_time is None else datetime_to_proto(creation_time), + ), + ) + self._operations_queue.enqueue(operation=operation) + + def log_metrics( + self, + data: Dict[str, Union[float, int]], + step: Union[float, int], + *, + timestamp: Optional[datetime] = None, + ) -> None: + """ + Logs the specified metrics to a Neptune run. + + You can log metrics representing a series of numeric values. Pass the metadata as a dictionary {key: value} with + + - key: path to where the metadata should be stored in the run. + - value: a float or int value to append to the series. + + For example, {"metrics/accuracy": 0.89}. + In the attribute path, each forward slash "/" nests the attribute under a namespace. + Use namespaces to structure the metadata into meaningful categories. + + Args: + data: Dictionary of metrics to log. + Each metric value is associated with a step. + To log multiple metrics at once, pass multiple key-value pairs. + step: Index of the log entry. Must be increasing. + Tip: Using float rather than int values can be useful, for example, when logging substeps in a batch. + timestamp (optional): Time of logging the metadata. If not provided, the current time is used. If provided, + and `timestamp.tzinfo` is not set, the time is assumed to be in the local timezone. + + + Examples: + ``` + from neptune_scale import Run + + with Run(...) as run: + run.log_metrics( + data={"loss": 0.14, "acc": 0.78}, + step=1.2, + ) + ``` + """ + self.log(step=step, timestamp=timestamp, metrics=data) + + def log_configs( + self, data: Optional[Dict[str, Union[float, bool, int, str, datetime, list, set, tuple]]] = None + ) -> None: + """ + Logs the specified metadata to a Neptune run. + + You can log configurations or other single values. Pass the metadata as a dictionary {key: value} with + + - key: path to where the metadata should be stored in the run. + - value: configuration or other single value to log. + + For example, {"parameters/learning_rate": 0.001}. + In the attribute path, each forward slash "/" nests the attribute under a namespace. + Use namespaces to structure the metadata into meaningful categories. + + Args: + data: Dictionary of configs or other values to log. + Available types: float, integer, Boolean, string, and datetime. + + Any `datetime` values that don't have the `tzinfo` attribute set are assumed to be in the local timezone. + + Example: + ``` + from neptune_scale import Run + + with Run(...) as run: + run.log_configs( + data={ + "parameters/learning_rate": 0.001, + "parameters/batch_size": 64, + }, + ) + ``` + """ + self.log(configs=data) + + def add_tags(self, tags: Union[List[str], Set[str], Tuple[str]], group_tags: bool = False) -> None: + """ + Adds the list of tags to the run. + + Args: + tags: Tags to add to the run, as a list or set of strings. + group_tags: To add group tags instead of regular tags, set to `True`. + + Example: + ``` + from neptune_scale import Run + + with Run(...) as run: + run.add_tags(tags=["tag1", "tag2", "tag3"]) + ``` + """ + name = "sys/tags" if not group_tags else "sys/group_tags" + self.log(tags_add={name: tags}) + + def remove_tags(self, tags: Union[List[str], Set[str], Tuple[str]], group_tags: bool = False) -> None: + """ + Removes the specified tags from the run. + + Args: + tags: Tags to remove to the run, as a list or set of strings. + group_tags: To remove group tags instead of regular tags, set to `True`. + + Example: + ``` + from neptune_scale import Run + + with Run(...) as run: + run.remove_tags(tags=["tag2", "tag3"]) + ``` + """ + name = "sys/tags" if not group_tags else "sys/group_tags" + self.log(tags_remove={name: tags}) + + def log( + self, + step: Optional[Union[float, int]] = None, + timestamp: Optional[datetime] = None, + configs: Optional[Dict[str, Union[float, bool, int, str, datetime, list, set, tuple]]] = None, + metrics: Optional[Dict[str, Union[float, int]]] = None, + tags_add: Optional[Dict[str, Union[List[str], Set[str], Tuple[str]]]] = None, + tags_remove: Optional[Dict[str, Union[List[str], Set[str], Tuple[str]]]] = None, + ) -> None: + """ + See one of the following instead: + + - log_configs() + - log_metrics() + - add_tags() + - remove_tags() + """ + + verify_type("step", step, (float, int, type(None))) + verify_type("timestamp", timestamp, (datetime, type(None))) + verify_type("configs", configs, (dict, type(None))) + verify_type("metrics", metrics, (dict, type(None))) + verify_type("tags_add", tags_add, (dict, type(None))) + verify_type("tags_remove", tags_remove, (dict, type(None))) + + timestamp = datetime.now() if timestamp is None else timestamp + configs = {} if configs is None else configs + metrics = {} if metrics is None else metrics + tags_add = {} if tags_add is None else tags_add + tags_remove = {} if tags_remove is None else tags_remove + + verify_collection_type("`configs` keys", list(configs.keys()), str) + verify_collection_type("`metrics` keys", list(metrics.keys()), str) + verify_collection_type("`tags_add` keys", list(tags_add.keys()), str) + verify_collection_type("`tags_remove` keys", list(tags_remove.keys()), str) + + verify_collection_type( + "`configs` values", list(configs.values()), (float, bool, int, str, datetime, list, set, tuple) + ) + verify_collection_type("`metrics` values", list(metrics.values()), (float, int)) + verify_collection_type("`tags_add` values", list(tags_add.values()), (list, set, tuple)) + verify_collection_type("`tags_remove` values", list(tags_remove.values()), (list, set, tuple)) + + # Don't log anything after we've been stopped. This allows continuing the training script + # after a non-recoverable error happened. Note we don't to use self._lock in this check, + # to keep the common path faster, because the benefit of locking here is minimal. + if self._is_closing: + return + + # TODO: move this to a separate process or thread, to make the .log call as lightweight as possible + splitter: MetadataSplitter = MetadataSplitter( + project=self._project, + run_id=self._run_id, + step=step, + timestamp=timestamp, + configs=configs, + metrics=metrics, + add_tags=tags_add, + remove_tags=tags_remove, + ) + + for operation, metadata_size in splitter: + self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step) + + def _wait( + self, + phrase: str, + sleep_time: float, + wait_seq: SharedInt, + timeout: Optional[float] = None, + verbose: bool = True, + ) -> None: + if verbose: + logger.info(f"Waiting for all operations to be {phrase}") + + if timeout is None and verbose: + logger.warning("No timeout specified. Waiting indefinitely") + + begin_time = time.time() + wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time + last_print_timestamp: Optional[float] = None + + while True: + try: + with self._lock: + if not self._sync_process.is_alive(): + if verbose and not self._is_closing: + # TODO: error out here? + logger.warning("Sync process is not running") + return # No need to wait if the sync process is not running + + # Handle the case where we get notified on `wait_seq` before we actually wait. + # Otherwise, we would unnecessarily block, waiting on a notify_all() that never happens. + if wait_seq.value >= self._operations_queue.last_sequence_id: + break + + with wait_seq: + wait_seq.wait(timeout=wait_time) + value = wait_seq.value + + last_queued_sequence_id = self._operations_queue.last_sequence_id + + if value == -1: + if self._operations_queue.last_sequence_id != -1: + last_print_timestamp = print_message( + f"Waiting. No operations were {phrase} yet. Operations to sync: %s", + self._operations_queue.last_sequence_id + 1, + last_print=last_print_timestamp, + verbose=verbose, + ) + else: + last_print_timestamp = print_message( + f"Waiting. No operations were {phrase} yet", + last_print=last_print_timestamp, + verbose=verbose, + ) + elif value < last_queued_sequence_id: + last_print_timestamp = print_message( + f"Waiting for remaining %d operation(s) to be {phrase}", + last_queued_sequence_id - value + 1, + last_print=last_print_timestamp, + verbose=verbose, + ) + else: + # Reaching the last queued sequence ID means that all operations were submitted + if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout): + break + except KeyboardInterrupt: + if verbose: + logger.warning("Waiting interrupted by user") + return + + if verbose: + logger.info(f"All operations were {phrase}") + + def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = True) -> None: + """ + Waits until all metadata is submitted to Neptune for processing. + + When submitted, the data is not yet saved in Neptune until fully processed. + See wait_for_processing(). + + Args: + timeout (float, optional): In seconds, the maximum time to wait for submission. + verbose (bool): If True (default), prints messages about the waiting process. + """ + self._wait( + phrase="submitted", + sleep_time=MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, + wait_seq=self._last_queued_seq, + timeout=timeout, + verbose=verbose, + ) + + def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = True) -> None: + """ + Waits until all metadata is processed by Neptune. + + Once the call is complete, the data is saved in Neptune. + + Args: + timeout (float, optional): In seconds, the maximum time to wait for processing. + verbose (bool): If True (default), prints messages about the waiting process. + """ + self._wait( + phrase="processed", + sleep_time=MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, + wait_seq=self._last_ack_seq, + timeout=timeout, + verbose=verbose, + ) + + +def print_message(msg: str, *args: Any, last_print: Optional[float] = None, verbose: bool = True) -> Optional[float]: + current_time = time.time() + + if verbose and (last_print is None or current_time - last_print > STOP_MESSAGE_FREQUENCY): + logger.info(msg, *args) + return current_time + + return last_print diff --git a/src/neptune_scale/core/validation.py b/src/neptune_scale/api/validation.py similarity index 100% rename from src/neptune_scale/core/validation.py rename to src/neptune_scale/api/validation.py diff --git a/src/neptune_scale/core/components/__init__.py b/src/neptune_scale/core/components/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/neptune_scale/net/__init__.py b/src/neptune_scale/net/__init__.py new file mode 100644 index 00000000..384c3471 --- /dev/null +++ b/src/neptune_scale/net/__init__.py @@ -0,0 +1 @@ +# This package contains everything related to communication with the Neptune API. diff --git a/src/neptune_scale/api/api_client.py b/src/neptune_scale/net/api_client.py similarity index 96% rename from src/neptune_scale/api/api_client.py rename to src/neptune_scale/net/api_client.py index 53fce1a9..baa35e15 100644 --- a/src/neptune_scale/api/api_client.py +++ b/src/neptune_scale/net/api_client.py @@ -55,10 +55,10 @@ from neptune_api.proto.neptune_pb.ingest.v1.pub.request_status_pb2 import RequestStatus from neptune_api.types import Response -from neptune_scale.core.components.abstract import Resource -from neptune_scale.core.logger import get_logger -from neptune_scale.envs import ALLOW_SELF_SIGNED_CERTIFICATE -from neptune_scale.parameters import REQUEST_TIMEOUT +from neptune_scale.sync.parameters import REQUEST_TIMEOUT +from neptune_scale.util.abstract import Resource +from neptune_scale.util.envs import ALLOW_SELF_SIGNED_CERTIFICATE +from neptune_scale.util.logger import get_logger logger = get_logger() diff --git a/src/neptune_scale/core/serialization.py b/src/neptune_scale/net/serialization.py similarity index 100% rename from src/neptune_scale/core/serialization.py rename to src/neptune_scale/net/serialization.py diff --git a/src/neptune_scale/core/__init__.py b/src/neptune_scale/sync/__init__.py similarity index 100% rename from src/neptune_scale/core/__init__.py rename to src/neptune_scale/sync/__init__.py diff --git a/src/neptune_scale/core/components/aggregating_queue.py b/src/neptune_scale/sync/aggregating_queue.py similarity index 97% rename from src/neptune_scale/core/components/aggregating_queue.py rename to src/neptune_scale/sync/aggregating_queue.py index dd0f05ac..a2adcafd 100644 --- a/src/neptune_scale/core/components/aggregating_queue.py +++ b/src/neptune_scale/sync/aggregating_queue.py @@ -12,17 +12,17 @@ from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.core.components.abstract import Resource -from neptune_scale.core.components.queue_element import ( - BatchedOperations, - SingleOperation, -) -from neptune_scale.core.logger import get_logger -from neptune_scale.parameters import ( +from neptune_scale.sync.parameters import ( BATCH_WAIT_TIME_SECONDS, MAX_BATCH_SIZE, MAX_QUEUE_ELEMENT_SIZE, ) +from neptune_scale.sync.queue_element import ( + BatchedOperations, + SingleOperation, +) +from neptune_scale.util import get_logger +from neptune_scale.util.abstract import Resource logger = get_logger() diff --git a/src/neptune_scale/core/components/errors_tracking.py b/src/neptune_scale/sync/errors_tracking.py similarity index 93% rename from src/neptune_scale/core/components/errors_tracking.py rename to src/neptune_scale/sync/errors_tracking.py index 555b6ce2..c131cc39 100644 --- a/src/neptune_scale/core/components/errors_tracking.py +++ b/src/neptune_scale/sync/errors_tracking.py @@ -11,11 +11,7 @@ Type, ) -from neptune_scale.core.components.abstract import Resource -from neptune_scale.core.components.daemon import Daemon -from neptune_scale.core.logger import get_logger -from neptune_scale.core.process_killer import kill_me -from neptune_scale.exceptions import ( +from neptune_scale.api.exceptions import ( NeptuneAsyncLagThresholdExceeded, NeptuneConnectionLostError, NeptuneOperationsQueueMaxSizeExceeded, @@ -23,7 +19,11 @@ NeptuneScaleWarning, NeptuneUnexpectedError, ) -from neptune_scale.parameters import ERRORS_MONITOR_THREAD_SLEEP_TIME +from neptune_scale.sync.parameters import ERRORS_MONITOR_THREAD_SLEEP_TIME +from neptune_scale.util import get_logger +from neptune_scale.util.abstract import Resource +from neptune_scale.util.daemon import Daemon +from neptune_scale.util.process_killer import kill_me logger = get_logger() diff --git a/src/neptune_scale/core/components/lag_tracking.py b/src/neptune_scale/sync/lag_tracking.py similarity index 85% rename from src/neptune_scale/core/components/lag_tracking.py rename to src/neptune_scale/sync/lag_tracking.py index e44e5dc0..dd00ea32 100644 --- a/src/neptune_scale/core/components/lag_tracking.py +++ b/src/neptune_scale/sync/lag_tracking.py @@ -5,15 +5,17 @@ from time import monotonic from typing import Callable -from neptune_scale.core.components.abstract import Resource -from neptune_scale.core.components.daemon import Daemon -from neptune_scale.core.components.errors_tracking import ErrorsQueue -from neptune_scale.core.components.operations_queue import OperationsQueue -from neptune_scale.core.shared_var import SharedFloat -from neptune_scale.parameters import ( +from neptune_scale.sync.errors_tracking import ErrorsQueue +from neptune_scale.sync.operations_queue import OperationsQueue +from neptune_scale.sync.parameters import ( LAG_TRACKER_THREAD_SLEEP_TIME, LAG_TRACKER_TIMEOUT, ) +from neptune_scale.util import ( + Daemon, + SharedFloat, +) +from neptune_scale.util.abstract import Resource class LagTracker(Daemon, Resource): diff --git a/src/neptune_scale/core/metadata_splitter.py b/src/neptune_scale/sync/metadata_splitter.py similarity index 99% rename from src/neptune_scale/core/metadata_splitter.py rename to src/neptune_scale/sync/metadata_splitter.py index a6053d53..2891b656 100644 --- a/src/neptune_scale/core/metadata_splitter.py +++ b/src/neptune_scale/sync/metadata_splitter.py @@ -25,7 +25,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.core.serialization import ( +from neptune_scale.net.serialization import ( datetime_to_proto, make_step, make_value, diff --git a/src/neptune_scale/core/components/operations_queue.py b/src/neptune_scale/sync/operations_queue.py similarity index 90% rename from src/neptune_scale/core/components/operations_queue.py rename to src/neptune_scale/sync/operations_queue.py index a9039436..0069b472 100644 --- a/src/neptune_scale/core/components/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -9,15 +9,15 @@ Optional, ) -from neptune_scale.core.components.abstract import Resource -from neptune_scale.core.components.queue_element import SingleOperation -from neptune_scale.core.logger import get_logger -from neptune_scale.core.validation import verify_type -from neptune_scale.parameters import ( +from neptune_scale.api.validation import verify_type +from neptune_scale.sync.parameters import ( MAX_MULTIPROCESSING_QUEUE_SIZE, MAX_QUEUE_ELEMENT_SIZE, MAX_QUEUE_SIZE, ) +from neptune_scale.sync.queue_element import SingleOperation +from neptune_scale.util import get_logger +from neptune_scale.util.abstract import Resource if TYPE_CHECKING: from threading import RLock diff --git a/src/neptune_scale/parameters.py b/src/neptune_scale/sync/parameters.py similarity index 100% rename from src/neptune_scale/parameters.py rename to src/neptune_scale/sync/parameters.py diff --git a/src/neptune_scale/core/components/queue_element.py b/src/neptune_scale/sync/queue_element.py similarity index 100% rename from src/neptune_scale/core/components/queue_element.py rename to src/neptune_scale/sync/queue_element.py diff --git a/src/neptune_scale/core/components/sync_process.py b/src/neptune_scale/sync/sync_process.py similarity index 97% rename from src/neptune_scale/core/components/sync_process.py rename to src/neptune_scale/sync/sync_process.py index 5a3b0100..ff170f56 100644 --- a/src/neptune_scale/core/components/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -41,29 +41,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.api.api_client import ( - ApiClient, - backend_factory, -) -from neptune_scale.core.components.abstract import ( - Resource, - WithResources, -) -from neptune_scale.core.components.aggregating_queue import AggregatingQueue -from neptune_scale.core.components.daemon import Daemon -from neptune_scale.core.components.errors_tracking import ErrorsQueue -from neptune_scale.core.components.queue_element import ( - BatchedOperations, - SingleOperation, -) -from neptune_scale.core.logger import get_logger -from neptune_scale.core.process_link import ProcessLink -from neptune_scale.core.shared_var import ( - SharedFloat, - SharedInt, -) -from neptune_scale.core.util import safe_signal_name -from neptune_scale.exceptions import ( +from neptune_scale.api.exceptions import ( NeptuneAttributePathEmpty, NeptuneAttributePathExceedsSizeLimit, NeptuneAttributePathInvalid, @@ -95,7 +73,13 @@ NeptuneUnexpectedError, NeptuneUnexpectedResponseError, ) -from neptune_scale.parameters import ( +from neptune_scale.net.api_client import ( + ApiClient, + backend_factory, +) +from neptune_scale.sync.aggregating_queue import AggregatingQueue +from neptune_scale.sync.errors_tracking import ErrorsQueue +from neptune_scale.sync.parameters import ( INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME, MAX_QUEUE_SIZE, MAX_REQUEST_RETRY_SECONDS, @@ -105,6 +89,22 @@ SYNC_PROCESS_SLEEP_TIME, SYNC_THREAD_SLEEP_TIME, ) +from neptune_scale.sync.queue_element import ( + BatchedOperations, + SingleOperation, +) +from neptune_scale.sync.util import safe_signal_name +from neptune_scale.util import ( + Daemon, + ProcessLink, + SharedFloat, + SharedInt, + get_logger, +) +from neptune_scale.util.abstract import ( + Resource, + WithResources, +) T = TypeVar("T") diff --git a/src/neptune_scale/core/util.py b/src/neptune_scale/sync/util.py similarity index 100% rename from src/neptune_scale/core/util.py rename to src/neptune_scale/sync/util.py diff --git a/src/neptune_scale/util/__init__.py b/src/neptune_scale/util/__init__.py new file mode 100644 index 00000000..75a71b3b --- /dev/null +++ b/src/neptune_scale/util/__init__.py @@ -0,0 +1,17 @@ +__all__ = [ + "Daemon", + "get_logger", + "ProcessLink", + "SharedFloat", + "SharedInt", + "SharedVar", +] + +from neptune_scale.util.daemon import Daemon +from neptune_scale.util.logger import get_logger +from neptune_scale.util.process_link import ProcessLink +from neptune_scale.util.shared_var import ( + SharedFloat, + SharedInt, + SharedVar, +) diff --git a/src/neptune_scale/core/components/abstract.py b/src/neptune_scale/util/abstract.py similarity index 100% rename from src/neptune_scale/core/components/abstract.py rename to src/neptune_scale/util/abstract.py diff --git a/src/neptune_scale/core/components/daemon.py b/src/neptune_scale/util/daemon.py similarity index 98% rename from src/neptune_scale/core/components/daemon.py rename to src/neptune_scale/util/daemon.py index 0a2c8bf3..a40ed4ce 100644 --- a/src/neptune_scale/core/components/daemon.py +++ b/src/neptune_scale/util/daemon.py @@ -4,7 +4,7 @@ import threading from enum import Enum -from neptune_scale.core.logger import get_logger +from neptune_scale.util.logger import get_logger logger = get_logger() diff --git a/src/neptune_scale/envs.py b/src/neptune_scale/util/envs.py similarity index 100% rename from src/neptune_scale/envs.py rename to src/neptune_scale/util/envs.py diff --git a/src/neptune_scale/core/logger.py b/src/neptune_scale/util/logger.py similarity index 94% rename from src/neptune_scale/core/logger.py rename to src/neptune_scale/util/logger.py index 0f7053be..c931a110 100644 --- a/src/neptune_scale/core/logger.py +++ b/src/neptune_scale/util/logger.py @@ -3,11 +3,11 @@ import logging import os -from neptune_scale.core.styles import ( +from neptune_scale.util.envs import DEBUG_MODE +from neptune_scale.util.styles import ( STYLES, ensure_style_detected, ) -from neptune_scale.envs import DEBUG_MODE LOG_FORMAT = "{blue}%(name)s{end}:{bold}%(levelname)s{end}: %(message)s" DEBUG_FORMAT = ( diff --git a/src/neptune_scale/core/process_killer.py b/src/neptune_scale/util/process_killer.py similarity index 94% rename from src/neptune_scale/core/process_killer.py rename to src/neptune_scale/util/process_killer.py index 5ff15c75..0cc27e71 100644 --- a/src/neptune_scale/core/process_killer.py +++ b/src/neptune_scale/util/process_killer.py @@ -5,7 +5,7 @@ import psutil -from neptune_scale.envs import SUBPROCESS_KILL_TIMEOUT +from neptune_scale.util.envs import SUBPROCESS_KILL_TIMEOUT KILL_TIMEOUT = int(os.getenv(SUBPROCESS_KILL_TIMEOUT, "5")) diff --git a/src/neptune_scale/core/process_link.py b/src/neptune_scale/util/process_link.py similarity index 98% rename from src/neptune_scale/core/process_link.py rename to src/neptune_scale/util/process_link.py index 6fa1ac5a..733fc283 100644 --- a/src/neptune_scale/core/process_link.py +++ b/src/neptune_scale/util/process_link.py @@ -10,8 +10,10 @@ Optional, ) -from neptune_scale.core.components.daemon import Daemon -from neptune_scale.core.logger import get_logger +from neptune_scale.util import ( + Daemon, + get_logger, +) POLL_TIMEOUT = 0.1 diff --git a/src/neptune_scale/core/shared_var.py b/src/neptune_scale/util/shared_var.py similarity index 100% rename from src/neptune_scale/core/shared_var.py rename to src/neptune_scale/util/shared_var.py diff --git a/src/neptune_scale/core/styles.py b/src/neptune_scale/util/styles.py similarity index 96% rename from src/neptune_scale/core/styles.py rename to src/neptune_scale/util/styles.py index 8ecf7b49..e5f1c6db 100644 --- a/src/neptune_scale/core/styles.py +++ b/src/neptune_scale/util/styles.py @@ -4,7 +4,7 @@ import platform from typing import Dict -from neptune_scale.envs import DISABLE_COLORS +from neptune_scale.util.envs import DISABLE_COLORS UNIX_STYLES = { "h1": "\033[95m", diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index 723c3302..d988f696 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -14,8 +14,8 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.core.components.aggregating_queue import AggregatingQueue -from neptune_scale.core.components.queue_element import ( +from neptune_scale.sync.aggregating_queue import AggregatingQueue +from neptune_scale.sync.queue_element import ( BatchedOperations, SingleOperation, ) diff --git a/tests/unit/test_errors_monitor.py b/tests/unit/test_errors_monitor.py index 3b52cd74..69c613ab 100644 --- a/tests/unit/test_errors_monitor.py +++ b/tests/unit/test_errors_monitor.py @@ -2,7 +2,7 @@ from typing import Optional from unittest.mock import Mock -from neptune_scale.core.components.errors_tracking import ( +from neptune_scale.sync.errors_tracking import ( ErrorsMonitor, ErrorsQueue, ) diff --git a/tests/unit/test_lag_tracker.py b/tests/unit/test_lag_tracker.py index 2b5f4c81..d724246f 100644 --- a/tests/unit/test_lag_tracker.py +++ b/tests/unit/test_lag_tracker.py @@ -4,8 +4,8 @@ from freezegun import freeze_time -from neptune_scale import SharedFloat -from neptune_scale.core.components.lag_tracking import LagTracker +from neptune_scale.sync.lag_tracking import LagTracker +from neptune_scale.util import SharedFloat @freeze_time("2024-09-01 00:00:00") diff --git a/tests/unit/test_metadata_splitter.py b/tests/unit/test_metadata_splitter.py index 2c9cc725..5e3b0274 100644 --- a/tests/unit/test_metadata_splitter.py +++ b/tests/unit/test_metadata_splitter.py @@ -13,7 +13,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.core.metadata_splitter import MetadataSplitter +from neptune_scale.sync.metadata_splitter import MetadataSplitter @freeze_time("2024-07-30 12:12:12.000022") diff --git a/tests/unit/test_operations_queue.py b/tests/unit/test_operations_queue.py index 3f4ca038..76471eec 100644 --- a/tests/unit/test_operations_queue.py +++ b/tests/unit/test_operations_queue.py @@ -7,7 +7,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.core.components.operations_queue import OperationsQueue +from neptune_scale.sync.operations_queue import OperationsQueue def test__enqueue(): diff --git a/tests/unit/test_process_link.py b/tests/unit/test_process_link.py index 34b516ac..f3f68ec3 100644 --- a/tests/unit/test_process_link.py +++ b/tests/unit/test_process_link.py @@ -16,7 +16,7 @@ import pytest from pytest import fixture -from neptune_scale.core.process_link import ProcessLink +from neptune_scale.util import ProcessLink @fixture diff --git a/tests/unit/test_run.py b/tests/unit/test_run.py index 64778254..6084c1a1 100644 --- a/tests/unit/test_run.py +++ b/tests/unit/test_run.py @@ -17,7 +17,8 @@ def api_token(): # Set short timeouts on blocking operations for quicker test execution @pytest.fixture(autouse=True, scope="session") def short_timeouts(): - import neptune_scale.core.components + import neptune_scale.sync.parameters + import neptune_scale.sync.sync_process patch = pytest.MonkeyPatch() timeout = 0.05 @@ -28,10 +29,10 @@ def short_timeouts(): "SYNC_THREAD_SLEEP_TIME", "ERRORS_MONITOR_THREAD_SLEEP_TIME", ): - patch.setattr(neptune_scale.parameters, name, timeout) + patch.setattr(neptune_scale.sync.parameters, name, timeout) # Not perfect, but does the trick for now. Handle direct imports. - for mod in (neptune_scale, neptune_scale.core.components.sync_process): + for mod in (neptune_scale, neptune_scale.sync.sync_process): if hasattr(mod, name): patch.setattr(mod, name, timeout) diff --git a/tests/unit/test_shared_var.py b/tests/unit/test_shared_var.py index 129685c5..d4cf9e07 100644 --- a/tests/unit/test_shared_var.py +++ b/tests/unit/test_shared_var.py @@ -2,7 +2,7 @@ import pytest -from neptune_scale.core.shared_var import ( +from neptune_scale.util import ( SharedFloat, SharedInt, ) From 477a6cb102e3b3a757486981435e15a2cc40e4ad Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 22 Nov 2024 16:26:57 +0100 Subject: [PATCH 2/2] Move `exceptions` a level up --- src/neptune_scale/__init__.py | 2 +- src/neptune_scale/api/run.py | 8 ++++---- src/neptune_scale/{api => }/exceptions.py | 0 src/neptune_scale/sync/errors_tracking.py | 2 +- src/neptune_scale/sync/metadata_splitter.py | 2 +- src/neptune_scale/sync/sync_process.py | 2 +- tests/unit/test_metadata_splitter.py | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) rename src/neptune_scale/{api => }/exceptions.py (100%) diff --git a/src/neptune_scale/__init__.py b/src/neptune_scale/__init__.py index 8376dfa2..295f3c39 100644 --- a/src/neptune_scale/__init__.py +++ b/src/neptune_scale/__init__.py @@ -4,7 +4,7 @@ import warnings -from neptune_scale.api.exceptions import NeptuneScaleWarning from neptune_scale.api.run import Run +from neptune_scale.exceptions import NeptuneScaleWarning warnings.simplefilter("once", category=NeptuneScaleWarning) diff --git a/src/neptune_scale/api/run.py b/src/neptune_scale/api/run.py index 25faf5cc..80852444 100644 --- a/src/neptune_scale/api/run.py +++ b/src/neptune_scale/api/run.py @@ -28,10 +28,6 @@ from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.api.exceptions import ( - NeptuneApiTokenNotProvided, - NeptuneProjectNotProvided, -) from neptune_scale.api.validation import ( verify_collection_type, verify_max_length, @@ -39,6 +35,10 @@ verify_project_qualified_name, verify_type, ) +from neptune_scale.exceptions import ( + NeptuneApiTokenNotProvided, + NeptuneProjectNotProvided, +) from neptune_scale.net.serialization import ( datetime_to_proto, make_step, diff --git a/src/neptune_scale/api/exceptions.py b/src/neptune_scale/exceptions.py similarity index 100% rename from src/neptune_scale/api/exceptions.py rename to src/neptune_scale/exceptions.py diff --git a/src/neptune_scale/sync/errors_tracking.py b/src/neptune_scale/sync/errors_tracking.py index c131cc39..4b82acd3 100644 --- a/src/neptune_scale/sync/errors_tracking.py +++ b/src/neptune_scale/sync/errors_tracking.py @@ -11,7 +11,7 @@ Type, ) -from neptune_scale.api.exceptions import ( +from neptune_scale.exceptions import ( NeptuneAsyncLagThresholdExceeded, NeptuneConnectionLostError, NeptuneOperationsQueueMaxSizeExceeded, diff --git a/src/neptune_scale/sync/metadata_splitter.py b/src/neptune_scale/sync/metadata_splitter.py index f03e142f..d30b5418 100644 --- a/src/neptune_scale/sync/metadata_splitter.py +++ b/src/neptune_scale/sync/metadata_splitter.py @@ -26,7 +26,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.api.exceptions import ( +from neptune_scale.exceptions import ( NeptuneFloatValueNanInfUnsupported, NeptuneScaleWarning, ) diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index ff170f56..501e3ba0 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -41,7 +41,7 @@ ) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation -from neptune_scale.api.exceptions import ( +from neptune_scale.exceptions import ( NeptuneAttributePathEmpty, NeptuneAttributePathExceedsSizeLimit, NeptuneAttributePathInvalid, diff --git a/tests/unit/test_metadata_splitter.py b/tests/unit/test_metadata_splitter.py index aa71aafc..fc79f5a8 100644 --- a/tests/unit/test_metadata_splitter.py +++ b/tests/unit/test_metadata_splitter.py @@ -19,7 +19,7 @@ from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation from pytest import mark -from neptune_scale.api.exceptions import NeptuneFloatValueNanInfUnsupported +from neptune_scale.exceptions import NeptuneFloatValueNanInfUnsupported from neptune_scale.sync.metadata_splitter import MetadataSplitter