Skip to content

Simplify logger configuration #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@
from neptune_scale.core.components.lag_tracking import LagTracker
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.components.sync_process import SyncProcess
from neptune_scale.core.logger import (
get_logger,
init_main_process_logger,
)
from neptune_scale.core.logger import get_logger
from neptune_scale.core.metadata_splitter import MetadataSplitter
from neptune_scale.core.serialization import (
datetime_to_proto,
Expand Down Expand Up @@ -133,8 +130,6 @@ def __init__(
on_warning_callback: Callback function triggered when a warning occurs.
"""

_, self._logging_queue = init_main_process_logger()

verify_type("run_id", run_id, str)
verify_type("resume", resume, bool)
verify_type("project", project, (str, type(None)))
Expand Down Expand Up @@ -234,7 +229,6 @@ def __init__(
family=self._run_id,
operations_queue=self._operations_queue.queue,
errors_queue=self._errors_queue,
logging_queue=self._logging_queue,
api_token=input_api_token,
last_put_seq=self._last_put_seq,
last_put_seq_wait=self._last_put_seq_wait,
Expand Down
9 changes: 1 addition & 8 deletions src/neptune_scale/core/components/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@
BatchedOperations,
SingleOperation,
)
from neptune_scale.core.logger import (
get_logger,
init_child_process_logger,
)
from neptune_scale.core.logger import get_logger
from neptune_scale.core.util import safe_signal_name
from neptune_scale.exceptions import (
NeptuneConnectionLostError,
Expand Down Expand Up @@ -196,7 +193,6 @@ def __init__(
self,
operations_queue: Queue,
errors_queue: ErrorsQueue,
logging_queue: Queue,
api_token: str,
project: str,
family: str,
Expand All @@ -213,7 +209,6 @@ def __init__(

self._external_operations_queue: Queue[SingleOperation] = operations_queue
self._errors_queue: ErrorsQueue = errors_queue
self._logging_queue: Queue = logging_queue
self._api_token: str = api_token
self._project: str = project
self._family: str = family
Expand All @@ -234,8 +229,6 @@ def _handle_signal(self, signum: int, frame: Optional[FrameType]) -> None:
self._stop_event.set() # Trigger the stop event

def run(self) -> None:
init_child_process_logger(self._logging_queue)

logger.info("Data synchronization started")

# Register signals handlers
Expand Down
112 changes: 21 additions & 91 deletions src/neptune_scale/core/logger.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,52 @@
__all__ = (
"init_main_process_logger",
"init_child_process_logger",
"get_logger",
)
__all__ = ("get_logger",)

import atexit
import logging
import multiprocessing
import os
from logging.handlers import (
QueueHandler,
QueueListener,
)
from typing import (
List,
Tuple,
)

from neptune_scale.core.styles import (
STYLES,
ensure_style_detected,
)
from neptune_scale.envs import DEBUG_MODE

NEPTUNE_LOGGER_NAME = "neptune"
NEPTUNE_DEBUG_FILE_NAME = "neptune.log"
LOG_FORMAT = "{blue}%(name)s{end} :: {bold}%(levelname)s{end} :: %(message)s"
DEBUG_FORMAT = "%(asctime)s :: %(name)s :: %(levelname)s :: %(processName)s(%(process)d):%(threadName)s:%(filename)s:%(funcName)s():%(lineno)d %(message)s"
LOG_FORMAT = "{blue}%(name)s{end}:{bold}%(levelname)s{end}: %(message)s"
DEBUG_FORMAT = (
"%(asctime)s:%(name)s:%(levelname)s:%(processName)s(%(process)d):%(threadName)s:%(filename)s:"
"%(funcName)s():%(lineno)d: %(message)s"
)


def get_logger() -> logging.Logger:
"""Use in modules to get the root Neptune logger"""
return logging.getLogger(NEPTUNE_LOGGER_NAME)


def init_main_process_logger() -> Tuple[logging.Logger, multiprocessing.Queue]:
"""
Initialize the root 'neptune' logger to use a mp.Queue. This should be called only once in the main process.

Returns:
A 2-tuple of (logger, queue). Queue should be passed to `init_child_logger()` in child processes.
"""

if multiprocessing.parent_process() is not None:
raise RuntimeError("This function should be called only in the main process.")

logger = logging.getLogger(NEPTUNE_LOGGER_NAME)
logger = logging.getLogger("neptune")

# If the user has also imported `neptune-fetcher` the root logger will already be initialized.
# We want our handlers to take precedence. We will remove all handlers and add our own.
if logger.hasHandlers():
# Not initialized by us, clear handlers and proceed.
if not hasattr(logger, "__neptune_scale_listener"):
logger.handlers.clear()
else:
return logger, _get_queue_from_handlers(logger.handlers)
# Already initialized by us
if hasattr(logger, "__neptune_scale"):
return logger

# Clear handlers and proceed with initialization
logger.handlers.clear()

logger.setLevel(logging.INFO)
ensure_style_detected()

handlers: List[logging.Handler] = []
if os.environ.get(DEBUG_MODE, "False").lower() in ("true", "1"):
logger.setLevel(logging.DEBUG)

file_handler = logging.FileHandler(NEPTUNE_DEBUG_FILE_NAME)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(DEBUG_FORMAT))
handlers.append(file_handler)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT.format(**STYLES)))
handlers.append(stream_handler)

queue: multiprocessing.Queue = multiprocessing.Queue()
logger.addHandler(QueueHandler(queue))

listener = QueueListener(queue, *handlers, respect_handler_level=True)
listener.start()
logger.addHandler(stream_handler)

logger.__neptune_scale_listener = listener # type: ignore
atexit.register(listener.stop)

return logger, queue


def init_child_process_logger(queue: multiprocessing.Queue) -> logging.Logger:
"""
Initialize a child logger to use the given queue. This should be called in child processes only once.
After it's called, the logger can be retrieved using `get_logger()`.

Args:
queue: A multiprocessing.Queue object returned by `init_root_logger()`.

Returns:
A logger instance.
"""

if multiprocessing.parent_process() is None:
raise RuntimeError("This function should be called only in child processes.")

logger = logging.getLogger(NEPTUNE_LOGGER_NAME)
if logger.hasHandlers():
# Make sure the QueueHandler is already registered
_ = _get_queue_from_handlers(logger.handlers)
return logger
if os.environ.get(DEBUG_MODE, "False").lower() in ("true", "1"):
logger.setLevel(logging.DEBUG)

ensure_style_detected()
file_handler = logging.FileHandler(f"neptune.{os.getpid()}.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(DEBUG_FORMAT))
logger.addHandler(file_handler)

logger.addHandler(QueueHandler(queue))
logger.setLevel(logging.INFO)
logger.__neptune_scale = True # type: ignore

return logger


def _get_queue_from_handlers(handlers: List[logging.Handler]) -> multiprocessing.Queue:
for h in handlers:
if isinstance(h, QueueHandler):
return h.queue # type: ignore # mypy doesn't know it's always Queue

raise RuntimeError("Expected to find a QueueHandler in the logger handlers.")
50 changes: 0 additions & 50 deletions tests/unit/test_logger.py

This file was deleted.

Loading