-
Notifications
You must be signed in to change notification settings - Fork 9
feat: Add ClpKeyValuePairStreamHandler
which supports logging dictionary-type log events into CLP's key-value pair IR format.
#46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
51636a1
e03b6d7
5a51940
919a15a
a1ed017
5a53fe5
048531d
ec36f20
d414d65
06608a7
096c3ce
e854fa1
b310993
b039278
715961b
5d8cd33
f270b39
1bcf7d7
2f03629
fe77d91
5ab34d4
9f96dda
47d3af7
306fba6
1ea69de
cb8be0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import logging | ||
from typing import Any, Dict, Optional | ||
|
||
ZONED_TIMESTAMP_KEY: str = "zoned_timestamp" | ||
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: str = "utc_epoch_ms" | ||
|
||
ZONED_TIMESTAMP_TZ_KEY: str = "timezone" | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
LEVEL_KEY: str = "level" | ||
LEVEL_NO_KEY: str = "no" | ||
LEVEL_NAME_KEY: str = "name" | ||
|
||
SOURCE_CONTEXT_KEY: str = "source_context" | ||
SOURCE_CONTEXT_PATH_KEY: str = "path" | ||
SOURCE_CONTEXT_LINE_KEY: str = "line" | ||
|
||
LOGLIB_GENERATED_MSG_KEY: str = "loglib_generated_msg" | ||
|
||
|
||
class AutoGeneratedKeyValuePairsBuffer: | ||
""" | ||
A reusable buffer for creating auto-generated key-value pairs for log | ||
events. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
This buffer maintains a predefined dictionary structure for common metadata | ||
fields, allowing efficient reuse without creating new dictionaries for each | ||
log event. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
""" | ||
|
||
def __init__(self) -> None: | ||
self._buf: Dict[str, Any] = { | ||
ZONED_TIMESTAMP_KEY: { | ||
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: None, | ||
ZONED_TIMESTAMP_TZ_KEY: None, | ||
}, | ||
LEVEL_KEY: { | ||
LEVEL_NO_KEY: None, | ||
LEVEL_NAME_KEY: None, | ||
}, | ||
SOURCE_CONTEXT_KEY: { | ||
SOURCE_CONTEXT_PATH_KEY: None, | ||
SOURCE_CONTEXT_LINE_KEY: None, | ||
}, | ||
} | ||
|
||
def generate( | ||
self, timestamp: int, timezone: Optional[str], record: logging.LogRecord | ||
) -> Dict[str, Any]: | ||
""" | ||
Generated auto-generated key-value pairs by populating the underlying | ||
buffer with the given log event metadata. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
:param timestamp: The Unix epoch timestamp in millisecond of the log | ||
event. | ||
:param timezone: The timezone of the log event, or None if not | ||
applicable. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
:param record: The LogRecord containing metadata for the log event. | ||
:return: The populated underlying buffer as the auto-generated key-value | ||
pairs. | ||
""" | ||
|
||
self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY] = timestamp | ||
self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_TZ_KEY] = timezone | ||
|
||
# NOTE: we don't serialize all the metadata given by `record`. Currently, we only add the | ||
# following metadata into auto-generated kv pairs: | ||
# - log level | ||
# - source context | ||
|
||
self._buf[LEVEL_KEY][LEVEL_NO_KEY] = record.levelno | ||
self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname | ||
|
||
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_PATH_KEY] = record.pathname | ||
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_LINE_KEY] = record.lineno | ||
|
||
return self._buf | ||
|
||
|
||
def create_loglib_generated_log_event_as_auto_generated_kv_pairs( | ||
timestamp: int, timezone: Optional[str], msg: str | ||
) -> Dict[str, Any]: | ||
""" | ||
:param timestamp: The Unix epoch timestamp in millisecond of the log event. | ||
:param timezone: The timezone of the log event, or None if not applicable. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
:param msg: The log message generated by the logging library. | ||
:return: The auto-generated key-value pairs that represents a log event generated by the logging | ||
library itself. | ||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
""" | ||
return { | ||
ZONED_TIMESTAMP_KEY: { | ||
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: timestamp, | ||
ZONED_TIMESTAMP_TZ_KEY: timezone, | ||
}, | ||
LOGLIB_GENERATED_MSG_KEY: msg, | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -3,19 +3,25 @@ | |||
import socket | ||||
import sys | ||||
import time | ||||
import warnings | ||||
from abc import ABCMeta, abstractmethod | ||||
from math import floor | ||||
from pathlib import Path | ||||
from queue import Empty, Queue | ||||
from signal import SIGINT, signal, SIGTERM | ||||
from threading import Thread, Timer | ||||
from types import FrameType | ||||
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union | ||||
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union | ||||
|
||||
import tzlocal | ||||
from clp_ffi_py.ir import FourByteEncoder | ||||
from clp_ffi_py.ir import FourByteEncoder, Serializer | ||||
from clp_ffi_py.utils import serialize_dict_to_msgpack | ||||
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor | ||||
|
||||
from clp_logging.auto_generated_kv_pairs_utils import ( | ||||
AutoGeneratedKeyValuePairsBuffer, | ||||
create_loglib_generated_log_event_as_auto_generated_kv_pairs, | ||||
) | ||||
from clp_logging.protocol import ( | ||||
BYTE_ORDER, | ||||
EOF_CHAR, | ||||
|
@@ -31,6 +37,8 @@ | |||
|
||||
DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s" | ||||
WARN_PREFIX: str = " [WARN][clp_logging]" | ||||
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs" | ||||
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs" | ||||
|
||||
|
||||
def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]: | ||||
|
@@ -216,11 +224,11 @@ def __init__( | |||
self.timeout_fn: Callable[[], None] = timeout_fn | ||||
self.next_hard_timeout_ts: int = ULONG_MAX | ||||
self.min_soft_timeout_delta: int = ULONG_MAX | ||||
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes]]] = None | ||||
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes], Serializer]] = None | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
self.hard_timeout_thread: Optional[Timer] = None | ||||
self.soft_timeout_thread: Optional[Timer] = None | ||||
|
||||
def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes]]) -> None: | ||||
def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes], Serializer]) -> None: | ||||
self.ostream = ostream | ||||
|
||||
def timeout(self) -> None: | ||||
|
@@ -792,3 +800,168 @@ def __init__( | |||
super().__init__( | ||||
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout | ||||
) | ||||
|
||||
|
||||
class ClpKeyValuePairStreamHandler(logging.Handler): | ||||
""" | ||||
A custom logging handler that processes log events containing key-value | ||||
pairs and serializes them into the CLP key-value pair IR format. | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
||||
Differences from `logging.StreamHandler`: | ||||
- Expects log events (`logging.LogRecord`) to include key-value pairs represented as a Python | ||||
dictionary. | ||||
|
||||
- Serializes the key-value pairs into the CLP key-value pair IR format before writing to the | ||||
stream. | ||||
|
||||
Rules for key-value pair representation: | ||||
- Key: | ||||
- Must be of type `str`. | ||||
- Value: | ||||
- Must be one of the following types: | ||||
- Primitive types: `int`, `float`, `str`, `bool`, or `None`. | ||||
- Arrays: | ||||
- May contain primitive values, dictionaries, or nested arrays. | ||||
- Can be empty. | ||||
- Dictionaries: | ||||
- Must adhere to the same key-value rules. | ||||
- Can be empty. | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
||||
|
||||
:param stream: A writable byte output stream to which the handler will write the serialized IR | ||||
byte sequences. | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
:param enable_compression: Whether to compress the serialized IR byte sequences using zstd. | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
:param loglevel_timeout: Customized timeout configuration. | ||||
""" | ||||
|
||||
def __init__( | ||||
self, | ||||
stream: IO[bytes], | ||||
enable_compression: bool = True, | ||||
timezone: Optional[str] = None, | ||||
loglevel_timeout: Optional[CLPLogLevelTimeout] = None, | ||||
) -> None: | ||||
super().__init__() | ||||
|
||||
self._enable_compression: bool = enable_compression | ||||
self._tz: Optional[str] = timezone | ||||
self._loglevel_timeout: Optional[CLPLogLevelTimeout] = loglevel_timeout | ||||
self._serializer: Optional[Serializer] = None | ||||
self._formatter: Optional[logging.Formatter] = None | ||||
self._ostream: IO[bytes] = stream | ||||
|
||||
self._auto_generated_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = ( | ||||
AutoGeneratedKeyValuePairsBuffer() | ||||
) | ||||
|
||||
self._init_new_serializer(stream) | ||||
|
||||
# override | ||||
def setFormatter(self, fmt: Optional[logging.Formatter]) -> None: | ||||
if fmt is None: | ||||
return | ||||
warnings.warn( | ||||
f"Formatter is currently not supported in the current {self.__class__.__name__}", | ||||
LinZhihao-723 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
category=RuntimeWarning, | ||||
) | ||||
self._formatter = fmt | ||||
|
||||
# override | ||||
def emit(self, record: logging.LogRecord) -> None: | ||||
""" | ||||
Overrides `logging.Handler.emit` to ensure `logging.Handler.handleError` | ||||
is always called and avoid requiring a `logging.LogRecord` to call | ||||
internal writing functions. | ||||
|
def emit(self, record: logging.LogRecord) -> None: |
Do u want me to rewrite it for this new handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh, not necessarily. I'm just confused about the docstring is supposed to mean? Should we ask David?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what the original comment means is to avoid the default emit
implementation to call output stream-level write
directly: https://github.com/python/cpython/blob/c1f352bf0813803bb795b796c16040a5cd4115f2/Lib/logging/__init__.py#L1138
Uh oh!
There was an error while loading. Please reload this page.