-
Notifications
You must be signed in to change notification settings - Fork 113
Add optional telemetry support to the python connector #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
70a2393
891f744
07c709e
a283faf
71defd5
187cbe8
e5a5001
86969b1
abaa45a
870787b
10d765b
34397db
5fd729a
5612e5f
f1241ba
6bafb9d
afcb0f0
af945aa
441a6ae
29fe6b4
06d9df8
cf3130e
4387f93
285e516
ea0f076
616a5c8
e39d294
1ea2fe0
3638fa2
1a4cf4b
8d6d47f
3d3c692
5cbfcac
c6e573c
7c53b76
4f221b3
cfa38a1
2f2a761
def5e0e
3cc9393
aa55a6e
ce158cb
0ed7e53
9a06d6c
20e789f
3a60599
e627649
c43eaf8
b0b6abd
f440791
5f247e5
c1d9510
c5731d8
7087236
61b6911
b5ab608
ad6fbd9
73108e2
7d85814
4077c7f
cdf1857
5539b26
728e2b1
eada549
cdc50d2
2904788
782ebb6
b7ada62
c6cf88f
95cf95b
3680a0f
ba2cd84
061c763
e8fc63b
813c73c
d3f0513
6786933
203735f
bd08f58
9508c4f
8140be9
850235c
4c766ef
7fe5ddf
50dfd93
4b0b8bd
f07df30
683e03c
d168598
fcfe8f4
972f7cc
1c3ce1e
667f719
ddf8a5f
56c7d41
312c7b9
9bc0d3e
33390db
e176f65
854c56f
0d1d7d8
c32b71a
4588ff3
b9bd2a1
329b7ee
9489087
b94f59e
9592098
84a6cbc
9d93e1b
ef5fbda
f138703
04c99e4
cbe21e5
77a8886
4a70379
0e791ba
f198a25
d975611
a596776
16a5106
ca84f1a
45c6073
3a8b4ea
8a0ec56
4905952
7444425
9a8ac88
6bc7413
95e5595
c69d886
012f6ed
b09ff05
fd4336e
f3081a5
ff51bfb
ca000db
6aa7890
bf084fe
23b51c9
5a1acdc
e768d48
505a522
5c01874
2027145
9e963a0
f703d81
bdd2cb6
00b8d3e
a6e81ed
c89da23
6482c76
d20d931
e3e0f49
456fec5
01cfc66
9ff99b8
072ef2c
f52c658
b1bd792
70f3738
912127c
1ed5c9d
1577506
e01ef74
7cfd6f6
6cf12fb
02d08d6
4122597
3631e55
4b1b7ad
f2d927b
2d2f8f7
d9802a8
9c158d9
0400bdb
683a033
3a68fa8
94a2597
3a50d70
37d8a7b
0017b0c
2a1875a
9fd4a25
74bcc86
e7c0c06
677483d
512efca
1a1497b
b751088
4959197
7467860
2de70ec
2675099
c755ecc
1a44d91
92dff6c
b4bcf8a
28a0fe6
82efe73
5e11582
a9ae775
c251d91
8a63786
9d6813b
8468a2b
aa673ac
fd7f85c
b20c55b
d61a964
efd82fb
ed19388
f8f9f4e
0e51281
9665a74
b24ddd7
0013ba4
851d23b
f321b49
b000892
2553bcf
078f41b
adc2c86
f9fe172
6f99449
6790dca
3a4d6d3
557bb68
9a3f946
7233e4e
b88eba0
14c8a7e
8013a0d
fdd385f
9dc7d52
ce2cc1a
99ec875
cf89ce3
380b0b9
23d8881
350e745
4a2356d
97df72e
6748c2c
0dfe0f4
10375a8
8c0f474
13ebfb4
79db09f
5005561
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
import time | ||
import functools | ||
from typing import Optional | ||
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory | ||
from databricks.sql.telemetry.models.event import ( | ||
SqlExecutionEvent, | ||
) | ||
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType | ||
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue | ||
from uuid import UUID | ||
|
||
|
||
class TelemetryExtractor: | ||
""" | ||
Base class for extracting telemetry information from various object types. | ||
This class serves as a proxy that delegates attribute access to the wrapped object | ||
while providing a common interface for extracting telemetry-related data. | ||
""" | ||
|
||
def __init__(self, obj): | ||
""" | ||
Initialize the extractor with an object to wrap. | ||
Args: | ||
obj: The object to extract telemetry information from. | ||
""" | ||
self._obj = obj | ||
|
||
def __getattr__(self, name): | ||
""" | ||
Delegate attribute access to the wrapped object. | ||
Args: | ||
name (str): The name of the attribute to access. | ||
Returns: | ||
The attribute value from the wrapped object. | ||
""" | ||
return getattr(self._obj, name) | ||
|
||
def get_session_id_hex(self): | ||
pass | ||
|
||
def get_statement_id(self): | ||
pass | ||
|
||
def get_is_compressed(self): | ||
pass | ||
|
||
def get_execution_result(self): | ||
pass | ||
|
||
def get_retry_count(self): | ||
pass | ||
|
||
|
||
class CursorExtractor(TelemetryExtractor): | ||
""" | ||
Telemetry extractor specialized for Cursor objects. | ||
Extracts telemetry information from database cursor objects, including | ||
statement IDs, session information, compression settings, and result formats. | ||
""" | ||
|
||
def get_statement_id(self) -> Optional[str]: | ||
return self.query_id | ||
|
||
def get_session_id_hex(self) -> Optional[str]: | ||
return self.connection.get_session_id_hex() | ||
|
||
def get_is_compressed(self) -> bool: | ||
return self.connection.lz4_compression | ||
|
||
def get_execution_result(self) -> ExecutionResultFormat: | ||
if self.active_result_set is None: | ||
return ExecutionResultFormat.FORMAT_UNSPECIFIED | ||
|
||
if isinstance(self.active_result_set.results, ColumnQueue): | ||
return ExecutionResultFormat.COLUMNAR_INLINE | ||
elif isinstance(self.active_result_set.results, CloudFetchQueue): | ||
return ExecutionResultFormat.EXTERNAL_LINKS | ||
elif isinstance(self.active_result_set.results, ArrowQueue): | ||
return ExecutionResultFormat.INLINE_ARROW | ||
return ExecutionResultFormat.FORMAT_UNSPECIFIED | ||
|
||
def get_retry_count(self) -> int: | ||
if ( | ||
hasattr(self.thrift_backend, "retry_policy") | ||
and self.thrift_backend.retry_policy | ||
): | ||
return len(self.thrift_backend.retry_policy.history) | ||
return 0 | ||
|
||
|
||
class ResultSetExtractor(TelemetryExtractor): | ||
""" | ||
Telemetry extractor specialized for ResultSet objects. | ||
Extracts telemetry information from database result set objects, including | ||
operation IDs, session information, compression settings, and result formats. | ||
""" | ||
|
||
def get_statement_id(self) -> Optional[str]: | ||
if self.command_id: | ||
return str(UUID(bytes=self.command_id.operationId.guid)) | ||
return None | ||
|
||
def get_session_id_hex(self) -> Optional[str]: | ||
return self.connection.get_session_id_hex() | ||
|
||
def get_is_compressed(self) -> bool: | ||
return self.lz4_compressed | ||
|
||
def get_execution_result(self) -> ExecutionResultFormat: | ||
if isinstance(self.results, ColumnQueue): | ||
return ExecutionResultFormat.COLUMNAR_INLINE | ||
elif isinstance(self.results, CloudFetchQueue): | ||
return ExecutionResultFormat.EXTERNAL_LINKS | ||
elif isinstance(self.results, ArrowQueue): | ||
return ExecutionResultFormat.INLINE_ARROW | ||
return ExecutionResultFormat.FORMAT_UNSPECIFIED | ||
|
||
def get_retry_count(self) -> int: | ||
if ( | ||
hasattr(self.thrift_backend, "retry_policy") | ||
and self.thrift_backend.retry_policy | ||
): | ||
return len(self.thrift_backend.retry_policy.history) | ||
return 0 | ||
|
||
|
||
def get_extractor(obj): | ||
""" | ||
Factory function to create the appropriate telemetry extractor for an object. | ||
Determines the object type and returns the corresponding specialized extractor | ||
that can extract telemetry information from that object type. | ||
Args: | ||
obj: The object to create an extractor for. Can be a Cursor, ResultSet, | ||
or any other object. | ||
Returns: | ||
TelemetryExtractor: A specialized extractor instance: | ||
- CursorExtractor for Cursor objects | ||
- ResultSetExtractor for ResultSet objects | ||
- Throws an NotImplementedError for all other objects | ||
""" | ||
if obj.__class__.__name__ == "Cursor": | ||
return CursorExtractor(obj) | ||
elif obj.__class__.__name__ == "ResultSet": | ||
return ResultSetExtractor(obj) | ||
else: | ||
raise NotImplementedError(f"No extractor found for {obj.__class__.__name__}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this error break anything or just be printed in debug mode? |
||
|
||
|
||
def log_latency(statement_type: StatementType = StatementType.NONE): | ||
""" | ||
Decorator for logging execution latency and telemetry information. | ||
This decorator measures the execution time of a method and sends telemetry | ||
data about the operation, including latency, statement information, and | ||
execution context. | ||
The decorator automatically: | ||
- Measures execution time using high-precision performance counters | ||
- Extracts telemetry information from the method's object (self) | ||
- Creates a SqlExecutionEvent with execution details | ||
- Sends the telemetry data asynchronously via TelemetryClient | ||
Args: | ||
statement_type (StatementType): The type of SQL statement being executed. | ||
Usage: | ||
@log_latency(StatementType.SQL) | ||
def execute(self, query): | ||
# Method implementation | ||
pass | ||
Returns: | ||
function: A decorator that wraps methods to add latency logging. | ||
Note: | ||
The wrapped method's object (self) must be compatible with the | ||
telemetry extractor system (e.g., Cursor or ResultSet objects). | ||
""" | ||
|
||
def decorator(func): | ||
@functools.wraps(func) | ||
def wrapper(self, *args, **kwargs): | ||
start_time = time.perf_counter() | ||
result = None | ||
try: | ||
result = func(self, *args, **kwargs) | ||
return result | ||
finally: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if anything in this finally block fails? |
||
|
||
def _safe_call(func_to_call): | ||
"""Calls a function and returns a default value on any exception.""" | ||
try: | ||
return func_to_call() | ||
except Exception: | ||
return None | ||
|
||
end_time = time.perf_counter() | ||
duration_ms = int((end_time - start_time) * 1000) | ||
|
||
extractor = get_extractor(self) | ||
session_id_hex = _safe_call(extractor.get_session_id_hex) | ||
statement_id = _safe_call(extractor.get_statement_id) | ||
|
||
sql_exec_event = SqlExecutionEvent( | ||
statement_type=statement_type, | ||
is_compressed=_safe_call(extractor.get_is_compressed), | ||
execution_result=_safe_call(extractor.get_execution_result), | ||
retry_count=_safe_call(extractor.get_retry_count), | ||
) | ||
|
||
telemetry_client = TelemetryClientFactory.get_telemetry_client( | ||
session_id_hex | ||
) | ||
telemetry_client.export_latency_log( | ||
latency_ms=duration_ms, | ||
sql_execution_event=sql_exec_event, | ||
sql_statement_id=statement_id, | ||
) | ||
|
||
return wrapper | ||
|
||
return decorator |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import json | ||
from dataclasses import dataclass, asdict | ||
from typing import List, Optional | ||
|
||
|
||
@dataclass | ||
class TelemetryRequest: | ||
""" | ||
Represents a request to send telemetry data to the server side. | ||
Contains the telemetry items to be uploaded and optional protocol buffer logs. | ||
Attributes: | ||
uploadTime (int): Unix timestamp in milliseconds when the request is made | ||
items (List[str]): List of telemetry event items to be uploaded | ||
protoLogs (Optional[List[str]]): Optional list of protocol buffer formatted logs | ||
""" | ||
|
||
uploadTime: int | ||
items: List[str] | ||
protoLogs: Optional[List[str]] | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class TelemetryResponse: | ||
""" | ||
Represents the response from the telemetry backend after processing a request. | ||
Contains information about the success or failure of the telemetry upload. | ||
Attributes: | ||
errors (List[str]): List of error messages if any occurred during processing | ||
numSuccess (int): Number of successfully processed telemetry items | ||
numProtoSuccess (int): Number of successfully processed protocol buffer logs | ||
""" | ||
|
||
errors: List[str] | ||
numSuccess: int | ||
numProtoSuccess: int | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from enum import Enum | ||
|
||
|
||
class AuthFlow(Enum): | ||
TOKEN_PASSTHROUGH = "token_passthrough" | ||
BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" | ||
|
||
|
||
class AuthMech(Enum): | ||
CLIENT_CERT = "CLIENT_CERT" # ssl certificate authentication | ||
PAT = "PAT" # Personal Access Token authentication | ||
DATABRICKS_OAUTH = "DATABRICKS_OAUTH" # Databricks-managed OAuth flow | ||
EXTERNAL_AUTH = "EXTERNAL_AUTH" # External identity provider (AWS, Azure, etc.) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @jprakash-db is adding some enums in his latest PR #621, is there scope to re-use? |
||
|
||
|
||
class DatabricksClientType(Enum): | ||
SEA = "SEA" | ||
THRIFT = "THRIFT" | ||
|
||
|
||
class DriverVolumeOperationType(Enum): | ||
TYPE_UNSPECIFIED = "type_unspecified" | ||
PUT = "put" | ||
GET = "get" | ||
DELETE = "delete" | ||
LIST = "list" | ||
QUERY = "query" | ||
|
||
|
||
class ExecutionResultFormat(Enum): | ||
FORMAT_UNSPECIFIED = "format_unspecified" | ||
INLINE_ARROW = "inline_arrow" | ||
EXTERNAL_LINKS = "external_links" | ||
COLUMNAR_INLINE = "columnar_inline" | ||
|
||
|
||
class StatementType(Enum): | ||
NONE = "none" | ||
QUERY = "query" | ||
SQL = "sql" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. query type is used when sql command is expected to return results, in jdbc it makes sense due to the nature of interface, but do we need both query and sql in python connector? |
||
UPDATE = "update" | ||
METADATA = "metadata" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think we're using this?