-
Notifications
You must be signed in to change notification settings - Fork 9
feat: Add ClpKeyValuePairStreamHandler
which supports logging dictionary-type log events into CLP's key-value pair IR format.
#46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
51636a1
e03b6d7
5a51940
919a15a
a1ed017
5a53fe5
048531d
ec36f20
d414d65
06608a7
096c3ce
e854fa1
b310993
b039278
715961b
5d8cd33
f270b39
1bcf7d7
2f03629
fe77d91
5ab34d4
9f96dda
47d3af7
306fba6
1ea69de
cb8be0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import logging | ||
from typing import Any, Dict | ||
|
||
from clp_logging.utils import Timestamp | ||
|
||
TIMESTAMP_KEY: str = "timestamp" | ||
TIMESTAMP_UNIX_TS_MS: str = "unix_ts_ms" | ||
TIMESTAMP_UTC_OFFSET_SEC: str = "utc_offset_sec" | ||
|
||
LEVEL_KEY: str = "level" | ||
LEVEL_NO_KEY: str = "no" | ||
LEVEL_NAME_KEY: str = "name" | ||
|
||
SOURCE_CONTEXT_KEY: str = "source_context" | ||
SOURCE_CONTEXT_PATH_KEY: str = "path" | ||
SOURCE_CONTEXT_LINE_KEY: str = "line" | ||
|
||
|
||
class AutoGeneratedKeyValuePairsBuffer: | ||
""" | ||
A reusable buffer for auto-generated key-value pairs. | ||
This buffer maintains a predefined dictionary for common metadata fields, to | ||
enable efficient reuse without creating new dictionaries for each log event. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self._buf: Dict[str, Any] = { | ||
TIMESTAMP_KEY: { | ||
TIMESTAMP_UNIX_TS_MS: None, | ||
TIMESTAMP_UTC_OFFSET_SEC: None, | ||
}, | ||
LEVEL_KEY: { | ||
LEVEL_NO_KEY: None, | ||
LEVEL_NAME_KEY: None, | ||
}, | ||
SOURCE_CONTEXT_KEY: { | ||
SOURCE_CONTEXT_PATH_KEY: None, | ||
SOURCE_CONTEXT_LINE_KEY: None, | ||
}, | ||
} | ||
LinZhihao-723 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def generate(self, ts: Timestamp, record: logging.LogRecord) -> Dict[str, Any]: | ||
""" | ||
Generates the auto-generated key-value pairs by populating the | ||
underlying buffer with the given log event metadata. | ||
:param ts: The timestamp assigned to the log event. | ||
:param record: The LogRecord containing metadata for the log event. | ||
:return: The populated underlying buffer as the auto-generated key-value | ||
pairs. | ||
""" | ||
|
||
self._buf[TIMESTAMP_KEY][TIMESTAMP_UNIX_TS_MS] = ts.get_unix_ts() | ||
self._buf[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SEC] = ts.get_utc_offset() | ||
|
||
# NOTE: We don't add all the metadata contained in `record`. Instead, we only add the | ||
# following fields: | ||
# - Log level | ||
# - Source context | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
self._buf[LEVEL_KEY][LEVEL_NO_KEY] = record.levelno | ||
self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname | ||
|
||
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_PATH_KEY] = record.pathname | ||
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_LINE_KEY] = record.lineno | ||
|
||
return self._buf |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -3,19 +3,23 @@ | |||
import socket | ||||
import sys | ||||
import time | ||||
import warnings | ||||
from abc import ABCMeta, abstractmethod | ||||
from contextlib import nullcontext | ||||
from math import floor | ||||
from pathlib import Path | ||||
from queue import Empty, Queue | ||||
from signal import SIGINT, signal, SIGTERM | ||||
from threading import Thread, Timer | ||||
from types import FrameType | ||||
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union | ||||
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union | ||||
|
||||
import tzlocal | ||||
from clp_ffi_py.ir import FourByteEncoder | ||||
from clp_ffi_py.ir import FourByteEncoder, Serializer | ||||
from clp_ffi_py.utils import serialize_dict_to_msgpack | ||||
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor | ||||
|
||||
from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer | ||||
from clp_logging.protocol import ( | ||||
BYTE_ORDER, | ||||
EOF_CHAR, | ||||
|
@@ -25,12 +29,15 @@ | |||
UINT_MAX, | ||||
ULONG_MAX, | ||||
) | ||||
from clp_logging.utils import Timestamp | ||||
|
||||
# TODO: lock writes to zstream if GIL ever goes away | ||||
# Note: no need to quote "Queue[Tuple[int, bytes]]" in python 3.9 | ||||
|
||||
DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s" | ||||
WARN_PREFIX: str = " [WARN][clp_logging]" | ||||
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs" | ||||
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs" | ||||
|
||||
|
||||
def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]: | ||||
|
@@ -216,7 +223,7 @@ def __init__( | |||
self.timeout_fn: Callable[[], None] = timeout_fn | ||||
self.next_hard_timeout_ts: int = ULONG_MAX | ||||
self.min_soft_timeout_delta: int = ULONG_MAX | ||||
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes]]] = None | ||||
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes], Serializer]] = None | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
self.hard_timeout_thread: Optional[Timer] = None | ||||
self.soft_timeout_thread: Optional[Timer] = None | ||||
|
||||
|
@@ -792,3 +799,169 @@ def __init__( | |||
super().__init__( | ||||
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout | ||||
) | ||||
|
||||
|
||||
class ClpKeyValuePairStreamHandler(logging.Handler): | ||||
""" | ||||
A custom logging handler that serializes key-value pair log events into the | ||||
CLP key-value pair IR format. | ||||
|
||||
Differences from `logging.StreamHandler`: | ||||
|
||||
- Log events (`logging.LogRecord`) should contain the key-value pairs that a user wants to log | ||||
as a Python dictionary. | ||||
- As a result, the key-value pairs will not be formatted into a string before being written. | ||||
- The key-value pairs will be serialized into the CLP key-value pair IR format before writing to | ||||
the stream. | ||||
|
||||
Key-value pairs in the log event must abide by the following rules: | ||||
- Keys must be of type `str`. | ||||
- Values must be one of the following types: | ||||
- Primitives: `int`, `float`, `str`, `bool`, or `None`. | ||||
- Arrays, where each array: | ||||
- may contain primitive values, dictionaries, or nested arrays. | ||||
- can be empty. | ||||
- Dictionaries, where each dictionary: | ||||
- must adhere to the aforementioned rules for keys and values. | ||||
- can be empty. | ||||
|
||||
:param stream: A writable byte output stream to which the handler will write the serialized IR | ||||
byte sequences. | ||||
:param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard. | ||||
""" | ||||
|
||||
def __init__( | ||||
self, | ||||
stream: IO[bytes], | ||||
enable_compression: bool = True, | ||||
) -> None: | ||||
super().__init__() | ||||
|
||||
self._enable_compression: bool = enable_compression | ||||
self._serializer: Optional[Serializer] = None | ||||
self._formatter: Optional[logging.Formatter] = None | ||||
self._ostream: IO[bytes] = stream | ||||
|
||||
self._auto_gen_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = ( | ||||
AutoGeneratedKeyValuePairsBuffer() | ||||
) | ||||
|
||||
self._init_new_serializer(stream) | ||||
|
||||
# override | ||||
def setFormatter(self, fmt: Optional[logging.Formatter]) -> None: | ||||
if fmt is None: | ||||
return | ||||
warnings.warn( | ||||
f"{self.__class__.__name__} doesn't currently support Formatters", | ||||
category=RuntimeWarning, | ||||
) | ||||
self._formatter = fmt | ||||
|
||||
# override | ||||
def emit(self, record: logging.LogRecord) -> None: | ||||
""" | ||||
Overrides `logging.Handler.emit` to ensure `logging.Handler.handleError` | ||||
is always called and avoid requiring a `logging.LogRecord` to call | ||||
internal writing functions. | ||||
|
def emit(self, record: logging.LogRecord) -> None: |
Do u want me to rewrite it for this new handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh, not necessarily. I'm just confused about the docstring is supposed to mean? Should we ask David?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what the original comment means is to avoid the default emit
implementation to call output stream-level write
directly: https://github.com/python/cpython/blob/c1f352bf0813803bb795b796c16040a5cd4115f2/Lib/logging/__init__.py#L1138
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
LinZhihao-723 marked this conversation as resolved.
Show resolved
Hide resolved
LinZhihao-723 marked this conversation as resolved.
Show resolved
Hide resolved
LinZhihao-723 marked this conversation as resolved.
Show resolved
Hide resolved
LinZhihao-723 marked this conversation as resolved.
Show resolved
Hide resolved
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from __future__ import annotations | ||
|
||
import time | ||
from math import floor | ||
|
||
|
||
class Timestamp: | ||
""" | ||
A timestamp represented as a Unix timestamp and a timezone offset from UTC. | ||
""" | ||
|
||
@staticmethod | ||
def now() -> Timestamp: | ||
""" | ||
:return: A `Timestamp` instance representing the current time. | ||
""" | ||
ts: float = time.time() | ||
return Timestamp( | ||
unix_ts=floor(ts * 1000), | ||
utc_offset=time.localtime(ts).tm_gmtoff, | ||
) | ||
|
||
def __init__(self, unix_ts: int, utc_offset: int): | ||
""" | ||
Initializes a `Timestamp` instance with the given time. | ||
|
||
:param unix_ts: Unix timestamp in milliseconds. | ||
:param utc_offset: The number of seconds the timezone is ahead of | ||
(positive) or behind (negative) UTC. | ||
""" | ||
self._utc_offset: int = utc_offset | ||
self._unix_ts: int = unix_ts | ||
|
||
def get_unix_ts(self) -> int: | ||
""" | ||
:return: The Unix timestamp in milliseconds. | ||
""" | ||
return self._unix_ts | ||
|
||
def get_utc_offset(self) -> int: | ||
""" | ||
:return: The number of seconds the timezone is ahead of (positive) or behind (negative) UTC. | ||
""" | ||
return self._utc_offset |
Uh oh!
There was an error while loading. Please reload this page.