Skip to content

Commit 4d56141

Browse files
committed
added doc strings to latency_logger, abstracted export_telemetry_log
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent a059a03 commit 4d56141

File tree

2 files changed

+115
-64
lines changed

2 files changed

+115
-64
lines changed

src/databricks/sql/telemetry/latency_logger.py

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,32 @@
1111

1212

1313
class TelemetryExtractor:
14+
"""
15+
Base class for extracting telemetry information from various object types.
16+
17+
This class serves as a proxy that delegates attribute access to the wrapped object
18+
while providing a common interface for extracting telemetry-related data.
19+
"""
20+
1421
def __init__(self, obj):
22+
"""
23+
Initialize the extractor with an object to wrap.
24+
25+
Args:
26+
obj: The object to extract telemetry information from.
27+
"""
1528
self._obj = obj
1629

1730
def __getattr__(self, name):
31+
"""
32+
Delegate attribute access to the wrapped object.
33+
34+
Args:
35+
name (str): The name of the attribute to access.
36+
37+
Returns:
38+
The attribute value from the wrapped object.
39+
"""
1840
return getattr(self._obj, name)
1941

2042
def get_session_id_hex(self):
@@ -37,6 +59,13 @@ def get_retry_count(self):
3759

3860

3961
class CursorExtractor(TelemetryExtractor):
62+
"""
63+
Telemetry extractor specialized for Cursor objects.
64+
65+
Extracts telemetry information from database cursor objects, including
66+
statement IDs, session information, compression settings, and result formats.
67+
"""
68+
4069
def get_statement_id(self) -> Optional[str]:
4170
return self.query_id
4271

@@ -72,6 +101,13 @@ def get_statement_type(self) -> StatementType:
72101

73102

74103
class ResultSetExtractor(TelemetryExtractor):
104+
"""
105+
Telemetry extractor specialized for ResultSet objects.
106+
107+
Extracts telemetry information from database result set objects, including
108+
operation IDs, session information, compression settings, and result formats.
109+
"""
110+
75111
def get_statement_id(self) -> Optional[str]:
76112
if self.command_id:
77113
return str(UUID(bytes=self.command_id.operationId.guid))
@@ -106,15 +142,63 @@ def get_retry_count(self) -> int:
106142

107143

108144
def get_extractor(obj):
145+
"""
146+
Factory function to create the appropriate telemetry extractor for an object.
147+
148+
Determines the object type and returns the corresponding specialized extractor
149+
that can extract telemetry information from that object type.
150+
151+
Args:
152+
obj: The object to create an extractor for. Can be a Cursor, ResultSet,
153+
or any other object.
154+
155+
Returns:
156+
TelemetryExtractor: A specialized extractor instance:
157+
- CursorExtractor for Cursor objects
158+
- ResultSetExtractor for ResultSet objects
159+
- TelemetryExtractor (base) for all other objects
160+
"""
109161
if obj.__class__.__name__ == "Cursor":
110162
return CursorExtractor(obj)
111163
elif obj.__class__.__name__ == "ResultSet":
112164
return ResultSetExtractor(obj)
113165
else:
114-
return TelemetryExtractor(obj)
166+
raise NotImplementedError(f"No extractor found for {obj.__class__.__name__}")
115167

116168

117169
def log_latency():
170+
"""
171+
Decorator for logging execution latency and telemetry information.
172+
173+
This decorator measures the execution time of a method and sends telemetry
174+
data about the operation, including latency, statement information, and
175+
execution context.
176+
177+
The decorator automatically:
178+
- Measures execution time using high-precision performance counters
179+
- Extracts telemetry information from the method's object (self)
180+
- Creates a SqlExecutionEvent with execution details
181+
- Sends the telemetry data asynchronously via TelemetryClient
182+
183+
Usage:
184+
@log_latency()
185+
def execute(self, query):
186+
# Method implementation
187+
pass
188+
189+
@log_latency()
190+
def fetchall(self):
191+
# Method implementation
192+
pass
193+
194+
Returns:
195+
function: A decorator that wraps methods to add latency logging.
196+
197+
Note:
198+
The wrapped method's object (self) must be compatible with the
199+
telemetry extractor system (e.g., Cursor or ResultSet objects).
200+
"""
201+
118202
def decorator(func):
119203
@functools.wraps(func)
120204
def wrapper(self, *args, **kwargs):

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 30 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,24 @@ def _telemetry_request_callback(self, future):
248248
except Exception as e:
249249
logger.debug("Telemetry request failed with exception: %s", e)
250250

251-
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
252-
logger.debug(
253-
"Exporting initial telemetry log for connection %s", self._session_id_hex
254-
)
251+
def _export_telemetry_log(self, **telemetry_event_kwargs):
252+
"""
253+
Common helper method for exporting telemetry logs.
254+
255+
Args:
256+
**telemetry_event_kwargs: Keyword arguments to pass to TelemetryEvent constructor
257+
"""
258+
logger.debug("Exporting telemetry log for connection %s", self._session_id_hex)
255259

256260
try:
257-
self._driver_connection_params = driver_connection_params
258-
self._user_agent = user_agent
261+
# Set common fields for all telemetry events
262+
event_kwargs = {
263+
"session_id": self._session_id_hex,
264+
"system_configuration": TelemetryHelper.get_driver_system_configuration(),
265+
"driver_connection_params": self._driver_connection_params,
266+
}
267+
# Add any additional fields passed in
268+
event_kwargs.update(telemetry_event_kwargs)
259269

260270
telemetry_frontend_log = TelemetryFrontendLog(
261271
frontend_log_event_id=str(uuid.uuid4()),
@@ -265,72 +275,29 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
265275
user_agent=self._user_agent,
266276
)
267277
),
268-
entry=FrontendLogEntry(
269-
sql_driver_log=TelemetryEvent(
270-
session_id=self._session_id_hex,
271-
system_configuration=TelemetryHelper.get_driver_system_configuration(),
272-
driver_connection_params=self._driver_connection_params,
273-
)
274-
),
278+
entry=FrontendLogEntry(sql_driver_log=TelemetryEvent(**event_kwargs)),
275279
)
276280

277281
self._export_event(telemetry_frontend_log)
278282

279283
except Exception as e:
280-
logger.debug("Failed to export initial telemetry log: %s", e)
284+
logger.debug("Failed to export telemetry log: %s", e)
285+
286+
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
287+
self._driver_connection_params = driver_connection_params
288+
self._user_agent = user_agent
289+
self._export_telemetry_log()
281290

282291
def export_failure_log(self, error_name, error_message):
283-
logger.debug("Exporting failure log for connection %s", self._session_id_hex)
284-
try:
285-
error_info = DriverErrorInfo(
286-
error_name=error_name, stack_trace=error_message
287-
)
288-
telemetry_frontend_log = TelemetryFrontendLog(
289-
frontend_log_event_id=str(uuid.uuid4()),
290-
context=FrontendLogContext(
291-
client_context=TelemetryClientContext(
292-
timestamp_millis=int(time.time() * 1000),
293-
user_agent=self._user_agent,
294-
)
295-
),
296-
entry=FrontendLogEntry(
297-
sql_driver_log=TelemetryEvent(
298-
session_id=self._session_id_hex,
299-
system_configuration=TelemetryHelper.get_driver_system_configuration(),
300-
driver_connection_params=self._driver_connection_params,
301-
error_info=error_info,
302-
)
303-
),
304-
)
305-
self._export_event(telemetry_frontend_log)
306-
except Exception as e:
307-
logger.debug("Failed to export failure log: %s", e)
292+
error_info = DriverErrorInfo(error_name=error_name, stack_trace=error_message)
293+
self._export_telemetry_log(error_info=error_info)
308294

309295
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
310-
logger.debug("Exporting latency log for connection %s", self._session_id_hex)
311-
try:
312-
telemetry_frontend_log = TelemetryFrontendLog(
313-
frontend_log_event_id=str(uuid.uuid4()),
314-
context=FrontendLogContext(
315-
client_context=TelemetryClientContext(
316-
timestamp_millis=int(time.time() * 1000),
317-
user_agent=self._user_agent,
318-
)
319-
),
320-
entry=FrontendLogEntry(
321-
sql_driver_log=TelemetryEvent(
322-
session_id=self._session_id_hex,
323-
system_configuration=TelemetryHelper.get_driver_system_configuration(),
324-
driver_connection_params=self._driver_connection_params,
325-
sql_statement_id=sql_statement_id,
326-
sql_operation=sql_execution_event,
327-
operation_latency_ms=latency_ms,
328-
)
329-
),
330-
)
331-
self._export_event(telemetry_frontend_log)
332-
except Exception as e:
333-
logger.debug("Failed to export latency log: %s", e)
296+
self._export_telemetry_log(
297+
sql_statement_id=sql_statement_id,
298+
sql_operation=sql_execution_event,
299+
operation_latency_ms=latency_ms,
300+
)
334301

335302
def close(self):
336303
"""Flush remaining events before closing"""

0 commit comments

Comments
 (0)