Skip to content

Commit 4466821

Browse files
committed
thread name
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 395049a commit 4466821

File tree

1 file changed

+77
-77
lines changed

1 file changed

+77
-77
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -31,79 +31,79 @@
3131
logger = logging.getLogger(__name__)
3232

3333

34-
# class DebugLock:
35-
# """A wrapper around threading.Lock that provides detailed debugging for lock acquisition/release"""
36-
37-
# def __init__(self, name: str = "DebugLock"):
38-
# self._lock = threading.Lock()
39-
# self._name = name
40-
# self._owner: Optional[str] = None
41-
# self._waiters: List[str] = []
42-
# self._debug_logger = logging.getLogger(f"{__name__}.{name}")
43-
# # Ensure debug logging is visible
44-
# if not self._debug_logger.handlers:
45-
# handler = logging.StreamHandler()
46-
# formatter = logging.Formatter(
47-
# ":lock: %(asctime)s [%(threadName)s-%(thread)d] LOCK-%(name)s: %(message)s"
48-
# )
49-
# handler.setFormatter(formatter)
50-
# self._debug_logger.addHandler(handler)
51-
# self._debug_logger.setLevel(logging.DEBUG)
52-
53-
# def acquire(self, blocking=True, timeout=-1):
54-
# current = threading.current_thread()
55-
# thread_info = f"{current.name}-{current.ident}"
56-
# if self._owner:
57-
# self._debug_logger.warning(
58-
# f": WAITING: {thread_info} waiting for lock held by {self._owner}"
59-
# )
60-
# self._waiters.append(thread_info)
61-
# else:
62-
# self._debug_logger.debug(
63-
# f": TRYING: {thread_info} attempting to acquire lock"
64-
# )
65-
# # Try to acquire the lock
66-
# acquired = self._lock.acquire(blocking, timeout)
67-
# if acquired:
68-
# self._owner = thread_info
69-
# self._debug_logger.info(f": ACQUIRED: {thread_info} got the lock")
70-
# if self._waiters:
71-
# self._debug_logger.info(
72-
# f": WAITERS: {len(self._waiters)} threads waiting: {self._waiters}"
73-
# )
74-
# else:
75-
# self._debug_logger.error(
76-
# f": FAILED: {thread_info} failed to acquire lock (timeout)"
77-
# )
78-
# if thread_info in self._waiters:
79-
# self._waiters.remove(thread_info)
80-
# return acquired
81-
82-
# def release(self):
83-
# current = threading.current_thread()
84-
# thread_info = f"{current.name}-{current.ident}"
85-
# if self._owner != thread_info:
86-
# self._debug_logger.error(
87-
# f": ERROR: {thread_info} trying to release lock owned by {self._owner}"
88-
# )
89-
# else:
90-
# self._debug_logger.info(f": RELEASED: {thread_info} released the lock")
91-
# self._owner = None
92-
# # Remove from waiters if present
93-
# if thread_info in self._waiters:
94-
# self._waiters.remove(thread_info)
95-
# if self._waiters:
96-
# self._debug_logger.info(
97-
# f": NEXT: {len(self._waiters)} threads still waiting: {self._waiters}"
98-
# )
99-
# self._lock.release()
100-
101-
# def __enter__(self):
102-
# self.acquire()
103-
# return self
104-
105-
# def __exit__(self, exc_type, exc_val, exc_tb):
106-
# self.release()
34+
class DebugLock:
35+
"""A wrapper around threading.Lock that provides detailed debugging for lock acquisition/release"""
36+
37+
def __init__(self, name: str = "DebugLock"):
38+
self._lock = threading.Lock()
39+
self._name = name
40+
self._owner: Optional[str] = None
41+
self._waiters: List[str] = []
42+
self._debug_logger = logging.getLogger(f"{__name__}.{name}")
43+
# Ensure debug logging is visible
44+
if not self._debug_logger.handlers:
45+
handler = logging.StreamHandler()
46+
formatter = logging.Formatter(
47+
":lock: %(asctime)s [%(threadName)s-%(thread)d] LOCK-%(name)s: %(message)s"
48+
)
49+
handler.setFormatter(formatter)
50+
self._debug_logger.addHandler(handler)
51+
self._debug_logger.setLevel(logging.DEBUG)
52+
53+
def acquire(self, blocking=True, timeout=-1):
54+
current = threading.current_thread()
55+
thread_info = f"{current.name}-{current.ident}"
56+
if self._owner:
57+
self._debug_logger.warning(
58+
f": WAITING: {thread_info} waiting for lock held by {self._owner}"
59+
)
60+
self._waiters.append(thread_info)
61+
else:
62+
self._debug_logger.debug(
63+
f": TRYING: {thread_info} attempting to acquire lock"
64+
)
65+
# Try to acquire the lock
66+
acquired = self._lock.acquire(blocking, timeout)
67+
if acquired:
68+
self._owner = thread_info
69+
self._debug_logger.info(f": ACQUIRED: {thread_info} got the lock")
70+
if self._waiters:
71+
self._debug_logger.info(
72+
f": WAITERS: {len(self._waiters)} threads waiting: {self._waiters}"
73+
)
74+
else:
75+
self._debug_logger.error(
76+
f": FAILED: {thread_info} failed to acquire lock (timeout)"
77+
)
78+
if thread_info in self._waiters:
79+
self._waiters.remove(thread_info)
80+
return acquired
81+
82+
def release(self):
83+
current = threading.current_thread()
84+
thread_info = f"{current.name}-{current.ident}"
85+
if self._owner != thread_info:
86+
self._debug_logger.error(
87+
f": ERROR: {thread_info} trying to release lock owned by {self._owner}"
88+
)
89+
else:
90+
self._debug_logger.info(f": RELEASED: {thread_info} released the lock")
91+
self._owner = None
92+
# Remove from waiters if present
93+
if thread_info in self._waiters:
94+
self._waiters.remove(thread_info)
95+
if self._waiters:
96+
self._debug_logger.info(
97+
f": NEXT: {len(self._waiters)} threads still waiting: {self._waiters}"
98+
)
99+
self._lock.release()
100+
101+
def __enter__(self):
102+
self.acquire()
103+
return self
104+
105+
def __exit__(self, exc_type, exc_val, exc_tb):
106+
self.release()
107107

108108

109109
class TelemetryHelper:
@@ -430,10 +430,10 @@ class TelemetryClientFactory:
430430
] = {} # Map of session_id_hex -> BaseTelemetryClient
431431
_executor: Optional[ThreadPoolExecutor] = None
432432
_initialized: bool = False
433-
_lock = threading.Lock() # Thread safety for factory operations
434-
# _lock = DebugLock(
435-
# "TelemetryClientFactory"
436-
# ) # Thread safety for factory operations with debugging
433+
# _lock = threading.Lock() # Thread safety for factory operations
434+
_lock = DebugLock(
435+
"TelemetryClientFactory"
436+
) # Thread safety for factory operations with debugging
437437
_original_excepthook = None
438438
_excepthook_installed = False
439439

0 commit comments

Comments
 (0)