Skip to content

Dev/minimal flow #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Added minimal Run classes ([#6](https://github.com/neptune-ai/neptune-client-scale/pull/6))
- Added support for `max_queue_size` and `max_queue_size_exceeded_callback` parameters in `Run` ([#7](https://github.com/neptune-ai/neptune-client-scale/pull/7))
48 changes: 36 additions & 12 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@

__all__ = ["Run"]

import threading
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Callable

from neptune_scale.core.components.abstract import (
Resource,
WithResources,
)
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.validation import (
verify_max_length,
verify_non_empty,
Expand All @@ -17,16 +23,26 @@
)
from neptune_scale.parameters import (
MAX_FAMILY_LENGTH,
MAX_QUEUE_SIZE,
MAX_RUN_ID_LENGTH,
)


class Run(AbstractContextManager):
class Run(WithResources, AbstractContextManager):
"""
Representation of tracked metadata.
"""

def __init__(self, *, project: str, api_token: str, family: str, run_id: str) -> None:
def __init__(
self,
*,
project: str,
api_token: str,
family: str,
run_id: str,
max_queue_size: int = MAX_QUEUE_SIZE,
max_queue_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
"""
Initializes a run that logs the model-building metadata to Neptune.

Expand All @@ -36,10 +52,17 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
family: Identifies related runs. For example, the same value must apply to all runs within a run hierarchy.
Max length: 128 characters.
run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 characters.
max_queue_size: Maximum number of operations in a queue.
max_queue_size_exceeded_callback: Callback function triggered when a queue is full.
Accepts two arguments:
- Maximum size of the queue.
- Exception that made the queue full.
"""
verify_type("api_token", api_token, str)
verify_type("family", family, str)
verify_type("run_id", run_id, str)
verify_type("max_queue_size", max_queue_size, int)
verify_type("max_queue_size_exceeded_callback", max_queue_size_exceeded_callback, (Callable, type(None)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could check for > 1 here or add verify_int() with eg. positive=True argument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do

verify_non_empty("api_token", api_token)
verify_non_empty("family", family)
Expand All @@ -55,19 +78,20 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
self._family: str = family
self._run_id: str = run_id

self._lock = threading.RLock()
self._operations_queue: OperationsQueue = OperationsQueue(
lock=self._lock, max_size=max_queue_size, max_size_exceeded_callback=max_queue_size_exceeded_callback
)

def __enter__(self) -> Run:
return self

@property
def resources(self) -> tuple[Resource, ...]:
return (self._operations_queue,)

def close(self) -> None:
"""
Stops the connection to Neptune and synchronizes all data.
"""
pass

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
super().close()
Empty file.
52 changes: 52 additions & 0 deletions src/neptune_scale/core/components/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from abc import (
ABC,
abstractmethod,
)
from types import TracebackType


class AutoCloseable(ABC):
def __enter__(self) -> AutoCloseable:
return self

@abstractmethod
def close(self) -> None: ...

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()


class Resource(AutoCloseable):
@abstractmethod
def cleanup(self) -> None: ...

def flush(self) -> None:
pass

def close(self) -> None:
self.flush()


class WithResources(Resource):
@property
@abstractmethod
def resources(self) -> tuple[Resource, ...]: ...

def flush(self) -> None:
for resource in self.resources:
resource.flush()

def close(self) -> None:
for resource in self.resources:
resource.close()

def cleanup(self) -> None:
for resource in self.resources:
resource.cleanup()
69 changes: 69 additions & 0 deletions src/neptune_scale/core/components/operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

__all__ = ("OperationsQueue",)

from multiprocessing import Queue
from time import monotonic
from typing import (
TYPE_CHECKING,
Callable,
NamedTuple,
)

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.validation import verify_type
from neptune_scale.parameters import MAX_QUEUE_ELEMENT_SIZE

if TYPE_CHECKING:
from threading import RLock

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation


class QueueElement(NamedTuple):
sequence_id: int
occured_at: float
operation: bytes


def default_max_size_exceeded_callback(max_size: int, e: BaseException) -> None:
raise ValueError(f"Queue is full (max size: {max_size})") from e


class OperationsQueue(Resource):
def __init__(
self,
*,
lock: RLock,
max_size: int = 0,
max_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
verify_type("max_size", max_size, int)

self._lock: RLock = lock
self._max_size: int = max_size
self._max_size_exceeded_callback: Callable[[int, BaseException], None] = (
max_size_exceeded_callback if max_size_exceeded_callback is not None else default_max_size_exceeded_callback
)

self._sequence_id: int = 0
self._queue: Queue[QueueElement] = Queue(maxsize=max_size)

def enqueue(self, *, operation: RunOperation) -> None:
try:
with self._lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just have OpeartionsQueue own the lock? It doesn't seem Run is using it, apart from passing to the OperationsQueue constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the following PRs we're making sure that no one is putting any new operations once we're operating on SyncProcess so this cannot be a part of OperationsQueue

serialized_operation = operation.SerializeToString()

if len(serialized_operation) > MAX_QUEUE_ELEMENT_SIZE:
raise ValueError(f"Operation size exceeds the maximum allowed size ({MAX_QUEUE_ELEMENT_SIZE})")

self._queue.put_nowait(QueueElement(self._sequence_id, monotonic(), serialized_operation))
self._sequence_id += 1
except Exception as e:
self._max_size_exceeded_callback(self._max_size, e)

def cleanup(self) -> None:
pass

def close(self) -> None:
self._queue.close()
2 changes: 2 additions & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
MAX_RUN_ID_LENGTH = 128
MAX_FAMILY_LENGTH = 128
MAX_QUEUE_SIZE = 32767
MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB
63 changes: 63 additions & 0 deletions tests/unit/test_operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import threading
from unittest.mock import MagicMock

import pytest
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import (
UpdateRunSnapshot,
Value,
)
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.components.operations_queue import OperationsQueue


def test__enqueue():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=0)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 1

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 2


def test__max_queue_size_exceeded():
# given
lock = threading.RLock()
callback = MagicMock()
queue = OperationsQueue(lock=lock, max_size=1, max_size_exceeded_callback=callback)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)
queue.enqueue(operation=operation)

# then
callback.assert_called_once()


def test__max_element_size_exceeded():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=1)

# and
snapshot = UpdateRunSnapshot(assign={f"key_{i}": Value(string=("a" * 1024)) for i in range(1024)})
operation = RunOperation(update=snapshot)

# then
with pytest.raises(ValueError):
queue.enqueue(operation=operation)
Loading