Skip to content

Dev/minimal flow #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
Resource,
WithResources,
)
from neptune_scale.core.components.errors_monitor import ErrorsMonitor
from neptune_scale.core.components.errors_queue import ErrorsQueue
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.metadata_splitter import MetadataSplitter
from neptune_scale.core.serialization import (
Expand Down Expand Up @@ -140,8 +142,12 @@ def __init__(
self._operations_queue: OperationsQueue = OperationsQueue(
lock=self._lock, max_size=max_queue_size, max_size_exceeded_callback=max_queue_size_exceeded_callback
)
self._errors_queue: ErrorsQueue = ErrorsQueue()
self._errors_monitor = ErrorsMonitor(errors_queue=self._errors_queue)
self._backend: ApiClient = ApiClient(api_token=input_api_token)

self._errors_monitor.start()

if not resume:
self._create_run(
creation_time=datetime.now() if creation_time is None else creation_time,
Expand All @@ -155,7 +161,12 @@ def __enter__(self) -> Run:

@property
def resources(self) -> tuple[Resource, ...]:
return self._operations_queue, self._backend
return (
self._operations_queue,
self._backend,
self._errors_monitor,
self._errors_queue,
)

def close(self) -> None:
"""
Expand Down
84 changes: 84 additions & 0 deletions src/neptune_scale/core/components/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
__all__ = ["Daemon"]

import abc
import threading
from enum import Enum


class Daemon(threading.Thread):
class DaemonState(Enum):
INIT = 1
WORKING = 2
PAUSING = 3
PAUSED = 4
INTERRUPTED = 5
STOPPED = 6

def __init__(self, sleep_time: float, name: str) -> None:
super().__init__(daemon=True, name=name)
self._sleep_time = sleep_time
self._state: Daemon.DaemonState = Daemon.DaemonState.INIT
self._wait_condition = threading.Condition()

def interrupt(self) -> None:
with self._wait_condition:
self._state = Daemon.DaemonState.INTERRUPTED
self._wait_condition.notify_all()

def pause(self) -> None:
with self._wait_condition:
if self._state != Daemon.DaemonState.PAUSED:
if not self._is_interrupted():
self._state = Daemon.DaemonState.PAUSING
self._wait_condition.notify_all()
self._wait_condition.wait_for(lambda: self._state != Daemon.DaemonState.PAUSING)

def resume(self) -> None:
with self._wait_condition:
if not self._is_interrupted():
self._state = Daemon.DaemonState.WORKING
self._wait_condition.notify_all()

def wake_up(self) -> None:
with self._wait_condition:
self._wait_condition.notify_all()

def disable_sleep(self) -> None:
self._sleep_time = 0

def is_running(self) -> bool:
with self._wait_condition:
return self._state in (
Daemon.DaemonState.WORKING,
Daemon.DaemonState.PAUSING,
Daemon.DaemonState.PAUSED,
)

def _is_interrupted(self) -> bool:
with self._wait_condition:
return self._state in (Daemon.DaemonState.INTERRUPTED, Daemon.DaemonState.STOPPED)

def run(self) -> None:
with self._wait_condition:
if not self._is_interrupted():
self._state = Daemon.DaemonState.WORKING
try:
while not self._is_interrupted():
with self._wait_condition:
if self._state == Daemon.DaemonState.PAUSING:
self._state = Daemon.DaemonState.PAUSED
self._wait_condition.notify_all()
self._wait_condition.wait_for(lambda: self._state != Daemon.DaemonState.PAUSED)

if self._state == Daemon.DaemonState.WORKING:
self.work()
with self._wait_condition:
if self._sleep_time > 0 and self._state == Daemon.DaemonState.WORKING:
self._wait_condition.wait(timeout=self._sleep_time)
finally:
with self._wait_condition:
self._state = Daemon.DaemonState.STOPPED
self._wait_condition.notify_all()

@abc.abstractmethod
def work(self) -> None: ...
46 changes: 46 additions & 0 deletions src/neptune_scale/core/components/errors_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
__all__ = ("ErrorsMonitor",)

import logging
import queue
from typing import Callable

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.components.daemon import Daemon
from neptune_scale.core.components.errors_queue import ErrorsQueue

logger = logging.getLogger("neptune")
logger.setLevel(level=logging.INFO)


def on_error(error: BaseException) -> None:
logger.error(error)


class ErrorsMonitor(Daemon, Resource):
def __init__(
self,
errors_queue: ErrorsQueue,
on_error_callback: Callable[[BaseException], None] = on_error,
):
super().__init__(name="ErrorsMonitor", sleep_time=2)
self._errors_queue = errors_queue
self._on_error_callback = on_error_callback

def work(self) -> None:
try:
error = self._errors_queue.get(block=False)
if error is not None:
self._on_error_callback(error)
except KeyboardInterrupt:
with self._wait_condition:
self._wait_condition.notify_all()
raise
except queue.Empty:
pass

def cleanup(self) -> None:
pass

def close(self) -> None:
self.interrupt()
self.join(timeout=10)
24 changes: 24 additions & 0 deletions src/neptune_scale/core/components/errors_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

__all__ = ("ErrorsQueue",)

from multiprocessing import Queue

from neptune_scale.core.components.abstract import Resource


class ErrorsQueue(Resource):
def __init__(self) -> None:
self._errors_queue: Queue[BaseException] = Queue()

def put(self, error: BaseException) -> None:
self._errors_queue.put(error)

def get(self, block: bool = True, timeout: float | None = None) -> BaseException:
return self._errors_queue.get(block=block, timeout=timeout)

def cleanup(self) -> None:
pass

def close(self) -> None:
self._errors_queue.close()
22 changes: 22 additions & 0 deletions tests/unit/test_errors_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from unittest.mock import Mock

from neptune_scale.core.components.errors_monitor import ErrorsMonitor
from neptune_scale.core.components.errors_queue import ErrorsQueue


def test_errors_monitor():
# given
callback = Mock()

# and
errors_queue = ErrorsQueue()
errors_monitor = ErrorsMonitor(errors_queue=errors_queue, on_error_callback=callback)

# when
errors_queue.put(ValueError("error1"))
errors_monitor.start()
errors_monitor.interrupt()
errors_monitor.join(timeout=1)

# then
callback.assert_called()
Loading