Skip to content

Change the logger's configuration to be more resilient #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
from neptune_scale.core.components.lag_tracking import LagTracker
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.components.sync_process import SyncProcess
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import (
get_logger,
init_main_process_logger,
)
from neptune_scale.core.metadata_splitter import MetadataSplitter
from neptune_scale.core.serialization import (
datetime_to_proto,
Expand Down Expand Up @@ -74,6 +77,8 @@
STOP_MESSAGE_FREQUENCY,
)

logger = get_logger()


class Run(WithResources, AbstractContextManager):
"""
Expand Down Expand Up @@ -127,6 +132,8 @@ def __init__(
on_warning_callback: Callback function triggered when a warning occurs.
"""

_, self._logging_queue = init_main_process_logger()

verify_type("run_id", run_id, str)
verify_type("resume", resume, bool)
verify_type("project", project, (str, type(None)))
Expand Down Expand Up @@ -224,6 +231,7 @@ def __init__(
family=self._run_id,
operations_queue=self._operations_queue.queue,
errors_queue=self._errors_queue,
logging_queue=self._logging_queue,
api_token=input_api_token,
last_put_seq=self._last_put_seq,
last_put_seq_wait=self._last_put_seq_wait,
Expand Down
4 changes: 3 additions & 1 deletion src/neptune_scale/api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
from neptune_api.types import Response

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import get_logger
from neptune_scale.envs import ALLOW_SELF_SIGNED_CERTIFICATE
from neptune_scale.parameters import REQUEST_TIMEOUT

logger = get_logger()


@dataclass
class TokenRefreshingURLs:
Expand Down
4 changes: 3 additions & 1 deletion src/neptune_scale/core/components/aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
BatchedOperations,
SingleOperation,
)
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import get_logger
from neptune_scale.parameters import (
BATCH_WAIT_TIME_SECONDS,
MAX_BATCH_SIZE,
MAX_QUEUE_ELEMENT_SIZE,
)

logger = get_logger()


class AggregatingQueue(Resource):
def __init__(
Expand Down
4 changes: 3 additions & 1 deletion src/neptune_scale/core/components/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import threading
from enum import Enum

from neptune_scale.core.logger import logger
from neptune_scale.core.logger import get_logger

logger = get_logger()


class Daemon(threading.Thread):
Expand Down
4 changes: 3 additions & 1 deletion src/neptune_scale/core/components/errors_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.components.daemon import Daemon
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import get_logger
from neptune_scale.core.process_killer import kill_me
from neptune_scale.exceptions import (
NeptuneAsyncLagThresholdExceeded,
Expand All @@ -25,6 +25,8 @@
)
from neptune_scale.parameters import ERRORS_MONITOR_THREAD_SLEEP_TIME

logger = get_logger()


class ErrorsQueue(Resource):
def __init__(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion src/neptune_scale/core/components/operations_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.components.queue_element import SingleOperation
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import get_logger
from neptune_scale.core.validation import verify_type
from neptune_scale.parameters import (
MAX_MULTIPROCESSING_QUEUE_SIZE,
Expand All @@ -24,6 +24,8 @@

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

logger = get_logger()


class OperationsQueue(Resource):
def __init__(
Expand Down
10 changes: 9 additions & 1 deletion src/neptune_scale/core/components/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@
BatchedOperations,
SingleOperation,
)
from neptune_scale.core.logger import logger
from neptune_scale.core.logger import (
get_logger,
init_child_process_logger,
)
from neptune_scale.core.util import safe_signal_name
from neptune_scale.exceptions import (
NeptuneConnectionLostError,
Expand Down Expand Up @@ -105,6 +108,7 @@

T = TypeVar("T")

logger = get_logger()

CODE_TO_ERROR: Dict[IngestCode.ValueType, Optional[Type[Exception]]] = {
IngestCode.OK: None,
Expand Down Expand Up @@ -192,6 +196,7 @@ def __init__(
self,
operations_queue: Queue,
errors_queue: ErrorsQueue,
logging_queue: Queue,
api_token: str,
project: str,
family: str,
Expand All @@ -208,6 +213,7 @@ def __init__(

self._external_operations_queue: Queue[SingleOperation] = operations_queue
self._errors_queue: ErrorsQueue = errors_queue
self._logging_queue: Queue = logging_queue
self._api_token: str = api_token
self._project: str = project
self._family: str = family
Expand All @@ -228,6 +234,8 @@ def _handle_signal(self, signum: int, frame: Optional[FrameType]) -> None:
self._stop_event.set() # Trigger the stop event

def run(self) -> None:
init_child_process_logger(self._logging_queue)

logger.info("Data synchronization started")

# Register signals handlers
Expand Down
100 changes: 91 additions & 9 deletions src/neptune_scale/core/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
__all__ = ("logger",)
__all__ = (
"init_main_process_logger",
"init_child_process_logger",
"get_logger",
)

import atexit
import logging
import multiprocessing
import os
from logging.handlers import (
QueueHandler,
QueueListener,
)
from typing import (
List,
Tuple,
)

from neptune_scale.core.styles import (
STYLES,
Expand All @@ -16,25 +30,93 @@


def get_logger() -> logging.Logger:
ensure_style_detected()
"""Use in modules to get the root Neptune logger"""
return logging.getLogger(NEPTUNE_LOGGER_NAME)


def init_main_process_logger() -> Tuple[logging.Logger, multiprocessing.Queue]:
"""
Initialize the root 'neptune' logger to use a mp.Queue. This should be called only once in the main process.

Returns:
A 2-tuple of (logger, queue). Queue should be passed to `init_child_logger()` in child processes.
"""

neptune_logger = logging.getLogger(NEPTUNE_LOGGER_NAME)
neptune_logger.setLevel(logging.INFO)
if multiprocessing.parent_process() is not None:
raise RuntimeError("This function should be called only in the main process.")

logger = logging.getLogger(NEPTUNE_LOGGER_NAME)

# If the user has also imported `neptune-fetcher` the root logger will already be initialized.
# We want our handlers to take precedence. We will remove all handlers and add our own.
if logger.hasHandlers():
# Not initialized by us, clear handlers and proceed.
if not hasattr(logger, "__neptune_scale_listener"):
logger.handlers.clear()
else:
return logger, _get_queue_from_handlers(logger.handlers)

logger.setLevel(logging.INFO)
ensure_style_detected()

handlers: List[logging.Handler] = []
if os.environ.get(DEBUG_MODE, "False").lower() in ("true", "1"):
neptune_logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)

file_handler = logging.FileHandler(NEPTUNE_DEBUG_FILE_NAME)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(DEBUG_FORMAT))
neptune_logger.addHandler(file_handler)
handlers.append(file_handler)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT.format(**STYLES)))
neptune_logger.addHandler(stream_handler)
handlers.append(stream_handler)

queue: multiprocessing.Queue = multiprocessing.Queue()
logger.addHandler(QueueHandler(queue))

listener = QueueListener(queue, *handlers, respect_handler_level=True)
listener.start()

logger.__neptune_scale_listener = listener # type: ignore
atexit.register(listener.stop)

return logger, queue


def init_child_process_logger(queue: multiprocessing.Queue) -> logging.Logger:
"""
Initialize a child logger to use the given queue. This should be called in child processes only once.
After it's called, the logger can be retrieved using `get_logger()`.

Args:
queue: A multiprocessing.Queue object returned by `init_root_logger()`.

Returns:
A logger instance.
"""

if multiprocessing.parent_process() is None:
raise RuntimeError("This function should be called only in child processes.")

logger = logging.getLogger(NEPTUNE_LOGGER_NAME)
if logger.hasHandlers():
# Make sure the QueueHandler is already registered
_ = _get_queue_from_handlers(logger.handlers)
return logger

ensure_style_detected()

logger.addHandler(QueueHandler(queue))
logger.setLevel(logging.INFO)

return logger

return neptune_logger

def _get_queue_from_handlers(handlers: List[logging.Handler]) -> multiprocessing.Queue:
for h in handlers:
if isinstance(h, QueueHandler):
return h.queue # type: ignore # mypy doesn't know it's always Queue

logger = get_logger()
raise RuntimeError("Expected to find a QueueHandler in the logger handlers.")
50 changes: 50 additions & 0 deletions tests/unit/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from concurrent.futures.process import ProcessPoolExecutor
from unittest.mock import Mock

import pytest

from neptune_scale.core.logger import (
get_logger,
init_child_process_logger,
init_main_process_logger,
)


def test_multiple_initialization_in_main_process():
logger, queue = init_main_process_logger()

# Keep a copy of handlers to make sure they're not modified once the logger is initialized.
handlers = logger.handlers[:]
_, queue2 = init_main_process_logger()

assert queue is queue2
assert get_logger().handlers == handlers


def test_restrict_main_process_initialization_to_main_process():
with ProcessPoolExecutor() as executor, pytest.raises(RuntimeError) as err:
executor.submit(init_main_process_logger).result()

err.match("only in the main process")


def test_restrict_child_process_initialization_to_child_process():
with pytest.raises(RuntimeError) as err:
init_child_process_logger(Mock())

err.match("only in child process")


def _child():
logger = init_child_process_logger(Mock())

# Keep a copy of handlers to make sure they're not modified once the logger is initialized.
handlers = logger.handlers[:]
init_child_process_logger(Mock())

assert get_logger().handlers == handlers


def test_multiple_initialization_in_child_process():
with ProcessPoolExecutor() as executor:
executor.submit(_child).result()
Loading