-
Notifications
You must be signed in to change notification settings - Fork 1
Dev/minimal flow #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dev/minimal flow #16
Changes from 1 commit
6cf6715
b85f1c5
2faf3a5
4c91b15
67f63cb
6e4ada2
d8098f9
e35876c
cceddab
04a7dce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from __future__ import annotations | ||
|
||
from abc import ( | ||
ABC, | ||
abstractmethod, | ||
) | ||
from types import TracebackType | ||
|
||
|
||
class AutoCloseable(ABC): | ||
def __enter__(self) -> AutoCloseable: | ||
return self | ||
|
||
@abstractmethod | ||
def close(self) -> None: ... | ||
|
||
def __exit__( | ||
self, | ||
exc_type: type[BaseException] | None, | ||
exc_value: BaseException | None, | ||
traceback: TracebackType | None, | ||
) -> None: | ||
self.close() | ||
|
||
|
||
class Resource(AutoCloseable): | ||
@abstractmethod | ||
def cleanup(self) -> None: ... | ||
|
||
def flush(self) -> None: | ||
pass | ||
|
||
def close(self) -> None: | ||
self.flush() | ||
|
||
|
||
class WithResources(Resource): | ||
@property | ||
@abstractmethod | ||
def resources(self) -> tuple[Resource, ...]: ... | ||
|
||
def flush(self) -> None: | ||
for resource in self.resources: | ||
resource.flush() | ||
|
||
def close(self) -> None: | ||
for resource in self.resources: | ||
resource.close() | ||
|
||
def cleanup(self) -> None: | ||
for resource in self.resources: | ||
resource.cleanup() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from __future__ import annotations | ||
|
||
__all__ = ("OperationsQueue",) | ||
|
||
from multiprocessing import Queue | ||
from time import monotonic | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Callable, | ||
NamedTuple, | ||
) | ||
|
||
from neptune_scale.core.components.abstract import Resource | ||
from neptune_scale.core.validation import verify_type | ||
from neptune_scale.parameters import MAX_QUEUE_ELEMENT_SIZE | ||
|
||
if TYPE_CHECKING: | ||
from threading import RLock | ||
|
||
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation | ||
|
||
|
||
class QueueElement(NamedTuple): | ||
sequence_id: int | ||
occured_at: float | ||
operation: bytes | ||
|
||
|
||
def default_max_size_exceeded_callback(max_size: int, e: BaseException) -> None: | ||
raise ValueError(f"Queue is full (max size: {max_size})") from e | ||
|
||
|
||
class OperationsQueue(Resource): | ||
def __init__( | ||
self, | ||
*, | ||
lock: RLock, | ||
max_size: int = 0, | ||
max_size_exceeded_callback: Callable[[int, BaseException], None] | None = None, | ||
) -> None: | ||
verify_type("max_size", max_size, int) | ||
|
||
self._lock: RLock = lock | ||
self._max_size: int = max_size | ||
self._max_size_exceeded_callback: Callable[[int, BaseException], None] = ( | ||
max_size_exceeded_callback if max_size_exceeded_callback is not None else default_max_size_exceeded_callback | ||
) | ||
|
||
self._sequence_id: int = 0 | ||
self._queue: Queue[QueueElement] = Queue(maxsize=max_size) | ||
|
||
def enqueue(self, *, operation: RunOperation) -> None: | ||
try: | ||
with self._lock: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or just have OpeartionsQueue own the lock? It doesn't seem Run is using it, apart from passing to the OperationsQueue constructor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the following PRs we're making sure that no one is putting any new operations once we're operating on SyncProcess so this cannot be a part of |
||
serialized_operation = operation.SerializeToString() | ||
|
||
if len(serialized_operation) > MAX_QUEUE_ELEMENT_SIZE: | ||
raise ValueError(f"Operation size exceeds the maximum allowed size ({MAX_QUEUE_ELEMENT_SIZE})") | ||
|
||
self._queue.put_nowait(QueueElement(self._sequence_id, monotonic(), serialized_operation)) | ||
self._sequence_id += 1 | ||
except Exception as e: | ||
self._max_size_exceeded_callback(self._max_size, e) | ||
|
||
def cleanup(self) -> None: | ||
pass | ||
|
||
def close(self) -> None: | ||
self._queue.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
MAX_RUN_ID_LENGTH = 128 | ||
MAX_FAMILY_LENGTH = 128 | ||
MAX_QUEUE_SIZE = 32767 | ||
MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import threading | ||
from unittest.mock import MagicMock | ||
|
||
import pytest | ||
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ( | ||
UpdateRunSnapshot, | ||
Value, | ||
) | ||
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation | ||
|
||
from neptune_scale.core.components.operations_queue import OperationsQueue | ||
|
||
|
||
def test__enqueue(): | ||
# given | ||
lock = threading.RLock() | ||
queue = OperationsQueue(lock=lock, max_size=0) | ||
|
||
# and | ||
operation = RunOperation() | ||
|
||
# when | ||
queue.enqueue(operation=operation) | ||
|
||
# then | ||
assert queue._sequence_id == 1 | ||
|
||
# when | ||
queue.enqueue(operation=operation) | ||
|
||
# then | ||
assert queue._sequence_id == 2 | ||
|
||
|
||
def test__max_queue_size_exceeded(): | ||
# given | ||
lock = threading.RLock() | ||
callback = MagicMock() | ||
queue = OperationsQueue(lock=lock, max_size=1, max_size_exceeded_callback=callback) | ||
|
||
# and | ||
operation = RunOperation() | ||
|
||
# when | ||
queue.enqueue(operation=operation) | ||
queue.enqueue(operation=operation) | ||
|
||
# then | ||
callback.assert_called_once() | ||
|
||
|
||
def test__max_element_size_exceeded(): | ||
# given | ||
lock = threading.RLock() | ||
queue = OperationsQueue(lock=lock, max_size=1) | ||
|
||
# and | ||
snapshot = UpdateRunSnapshot(assign={f"key_{i}": Value(string=("a" * 1024)) for i in range(1024)}) | ||
operation = RunOperation(update=snapshot) | ||
|
||
# then | ||
with pytest.raises(ValueError): | ||
queue.enqueue(operation=operation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could check for > 1 here or add
verify_int()
with eg.positive=True
argumentThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do