Skip to content

Commit 44716b0

Browse files
committed
working on structlog option for logging (unreleased)
1 parent 013570f commit 44716b0

File tree

3 files changed

+415
-0
lines changed

3 files changed

+415
-0
lines changed

unreleased/log_example_structlog.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Author: Mike Ryan
4+
Date: 2024/05/16
5+
License: MIT
6+
"""
7+
import logging
8+
import multiprocessing
9+
import secrets
10+
import threading
11+
12+
import structlog
13+
from tqdm import tqdm
14+
15+
from dsg_lib.common_functions import logging_config_structlog
16+
17+
18+
logger = structlog.get_logger()
19+
20+
21+
def div_zero(x, y):
22+
try:
23+
return x / y
24+
except ZeroDivisionError as e:
25+
logger.error("division by zero", error=str(e))
26+
logging.error(f'{e}')
27+
28+
29+
def div_zero_two(x, y):
30+
try:
31+
return x / y
32+
except ZeroDivisionError as e:
33+
logger.error("division by zero", error=str(e))
34+
logging.error(f'{e}')
35+
36+
37+
def log_big_string(lqty=100, size=256):
38+
# big_string = secrets.token_urlsafe(size)
39+
big_string = """
40+
Bacon ipsum dolor amet meatball kielbasa chislic, corned beef ham hock frankfurter jowl sirloin meatloaf ribeye boudin. Capicola ham hock pork landjaeger, jerky t-bone strip steak pork chop boudin shankle tri-tip andouille pork belly flank.
41+
"""
42+
for _ in range(lqty):
43+
logging.debug(f'Lets make this a big message {big_string}')
44+
div_zero(x=1, y=0)
45+
div_zero_two(x=1, y=0)
46+
logger.debug('This is a debug message')
47+
logger.info('This is an info message')
48+
logger.error('This is an error message')
49+
logger.warning('This is a warning message')
50+
logger.critical('This is a critical message')
51+
52+
logging.debug('This is a debug message')
53+
logging.info('This is an info message')
54+
logging.error('This is an error message')
55+
logging.warning('This is a warning message')
56+
logging.critical('This is a critical message')
57+
58+
59+
def worker(wqty=1000, lqty=100, size=256):
60+
for _ in tqdm(range(wqty), desc="Worker", leave=True, ascii=True): # Adjusted for demonstration
61+
log_big_string(lqty=lqty, size=size)
62+
63+
64+
def main(wqty: int = 100, lqty: int = 10, size: int = 256, workers: int = 16, thread_test: bool = False, process_test: bool = False):
65+
if process_test:
66+
processes = []
67+
# Create worker processes
68+
for _ in tqdm(range(workers), desc="Multi-Processing Start", leave=True):
69+
p = multiprocessing.Process(
70+
target=worker, args=(wqty, lqty, size,))
71+
processes.append(p)
72+
p.start()
73+
74+
for p in tqdm((processes), desc="Multi-Processing Start", leave=True):
75+
p.join(timeout=60) # Timeout after 60 seconds
76+
if p.is_alive():
77+
logger.error(f"Process {p.name} is hanging. Terminating.")
78+
p.terminate()
79+
p.join()
80+
81+
if thread_test:
82+
threads = []
83+
for _ in tqdm(range(workers), desc="Threading Start", leave=True): # Create worker threads
84+
t = threading.Thread(target=worker, args=(wqty, lqty, size,))
85+
threads.append(t)
86+
t.start()
87+
88+
for t in tqdm(threads, desc="Threading Gather", leave=True):
89+
t.join()
90+
91+
92+
if __name__ == "__main__":
93+
logging_config_structlog.configure_logging(
94+
logging_directory='log',
95+
log_name='log',
96+
logging_level='INFO',
97+
log_rotation=100, # Size in MB
98+
log_retention=10
99+
)
100+
from time import time
101+
start = time()
102+
main(wqty=100, lqty=100, size=256, workers=16,
103+
thread_test=True, process_test=True)
104+
print(f"Execution time: {time()-start:.2f} seconds")
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
"""
2+
This module provides a logging configuration setup with structlog, including
3+
support for safe log rotation and multiprocessing.
4+
"""
5+
6+
import logging
7+
import logging.handlers
8+
import os
9+
import threading
10+
from collections import deque
11+
from datetime import datetime
12+
from multiprocessing import Lock, Process, Queue
13+
from queue import Empty
14+
15+
import structlog
16+
from pythonjsonlogger import jsonlogger
17+
18+
rotation_lock = Lock()
19+
20+
class SafeRotatingFileHandler(logging.handlers.RotatingFileHandler):
21+
"""A rotating file handler that safely handles log file rotation."""
22+
def __init__(self, *args, **kwargs):
23+
super().__init__(*args, **kwargs)
24+
self.process_name = os.getpid() # Get the process ID
25+
26+
def doRollover(self):
27+
"""Perform the log file rollover."""
28+
with rotation_lock:
29+
if self.stream:
30+
self.stream.close()
31+
self.stream = None
32+
33+
dfn = self.rotation_filename(f"{self.baseFilename}.{self.process_name}.{self.backupCount}")
34+
if os.path.exists(dfn):
35+
os.remove(dfn)
36+
self.rotate(self.baseFilename, dfn)
37+
38+
if not self.delay:
39+
self.stream = self._open()
40+
41+
class QueueHandler(logging.Handler):
42+
"""This is a logging handler which sends events to a queue. It can be used
43+
from different processes to send logs to a single log file.
44+
"""
45+
def __init__(self, log_queue):
46+
super().__init__()
47+
self.log_queue = log_queue
48+
49+
def emit(self, record):
50+
try:
51+
self.log_queue.put_nowait(record)
52+
except Exception as e:
53+
print(f"Error emitting log record: {e}")
54+
55+
class QueueListener:
56+
"""This is a listener which receives log events from the queue and processes
57+
them. It should be run in a separate process.
58+
"""
59+
def __init__(self, log_queue, handlers):
60+
self.log_queue = log_queue
61+
self.handlers = handlers
62+
self.stop_event = threading.Event()
63+
64+
def start(self):
65+
"""Start the queue listener."""
66+
while not self.stop_event.is_set():
67+
try:
68+
record = self.log_queue.get(timeout=0.05)
69+
self.handle(record)
70+
except Empty:
71+
continue
72+
73+
def handle(self, record):
74+
"""Handle a log record."""
75+
for handler in self.handlers:
76+
handler.handle(record)
77+
78+
def stop(self):
79+
"""Stop the queue listener."""
80+
self.stop_event.set()
81+
82+
class CachingRotatingFileHandler(logging.handlers.RotatingFileHandler):
83+
"""A rotating file handler with caching capabilities."""
84+
def __init__(self, *args, **kwargs):
85+
super().__init__(*args, **kwargs)
86+
self.cache = deque(maxlen=1000)
87+
88+
def emit(self, record):
89+
"""Emit a record."""
90+
try:
91+
self.cache.append(record)
92+
except Exception as e:
93+
print(f"Error caching log record: {e}")
94+
95+
def flush_cache(self):
96+
"""Flush the cache."""
97+
while self.cache:
98+
record = self.cache.popleft()
99+
super().emit(record)
100+
101+
def configure_logging(
102+
logging_directory: str = 'log',
103+
log_name: str = 'log',
104+
logging_level: str = 'INFO',
105+
log_rotation: int = 100, # Size in MB
106+
log_retention: int = 10,
107+
multiprocess: bool = False
108+
):
109+
"""Configure logging with rotating file handlers."""
110+
if not os.path.exists(logging_directory):
111+
os.makedirs(logging_directory)
112+
113+
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
114+
log_name = f"{log_name}_{timestamp}.json"
115+
log_path = os.path.join(logging_directory, log_name)
116+
max_bytes = log_rotation * 1024 * 1024
117+
118+
cache_rotating_handler = CachingRotatingFileHandler(
119+
log_path,
120+
maxBytes=max_bytes,
121+
backupCount=log_retention
122+
)
123+
safe_rotating_file_handler = SafeRotatingFileHandler(
124+
filename=log_path,
125+
maxBytes=max_bytes,
126+
backupCount=log_retention
127+
)
128+
formatter = jsonlogger.JsonFormatter()
129+
cache_rotating_handler.setFormatter(formatter)
130+
safe_rotating_file_handler.setFormatter(formatter)
131+
132+
handlers = [cache_rotating_handler, safe_rotating_file_handler]
133+
134+
listener_process, listener_instance = None, None
135+
136+
if multiprocess:
137+
log_queue = Queue()
138+
queue_handler = QueueHandler(log_queue)
139+
handlers = [queue_handler]
140+
listener_instance = QueueListener(log_queue, [cache_rotating_handler, safe_rotating_file_handler])
141+
listener_process = Process(target=listener_instance.start)
142+
listener_process.start()
143+
144+
logging.basicConfig(
145+
level=logging_level,
146+
handlers=handlers
147+
)
148+
149+
structlog.configure(
150+
processors=[
151+
structlog.processors.TimeStamper(fmt="iso", utc=True),
152+
structlog.processors.StackInfoRenderer(),
153+
structlog.processors.format_exc_info,
154+
structlog.processors.JSONRenderer()
155+
],
156+
context_class=dict,
157+
logger_factory=structlog.stdlib.LoggerFactory(),
158+
wrapper_class=structlog.stdlib.BoundLogger,
159+
cache_logger_on_first_use=True,
160+
)
161+
162+
cache_rotating_handler.flush_cache()
163+
164+
return listener_instance, listener_process
165+
166+
# Example usage
167+
if __name__ == "__main__":
168+
listener_instance, listener_process = configure_logging(
169+
logging_directory='log',
170+
log_name='log',
171+
logging_level='INFO',
172+
log_rotation=100, # Size in MB
173+
log_retention=10,
174+
multiprocess=True
175+
)
176+
177+
logger = structlog.get_logger()
178+
logger.info("Logging configured with SafeRotatingFileHandler")

0 commit comments

Comments
 (0)