-
Notifications
You must be signed in to change notification settings - Fork 114
Added classes required for telemetry #572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
4c122b1
23374be
3971d3a
91e3f40
6331fc1
63593a6
bb69dc9
4cb0d70
d1efa03
2fc3cb6
73471e9
74c6463
cbc9ebf
9d10e16
1c467f3
6302327
e16fce5
7461d96
95e43e4
d72fb27
28efaba
c8c08dd
74ea9b6
ac7881f
bff17b5
6219d38
6305323
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
import time | ||
from typing import Dict, Tuple, List, Optional, Any, Union, Sequence | ||
|
||
import pandas | ||
|
||
try: | ||
|
@@ -234,6 +233,13 @@ def read(self) -> Optional[OAuthToken]: | |
server_hostname, **kwargs | ||
) | ||
|
||
self.server_telemetry_enabled = True | ||
self.client_telemetry_enabled = kwargs.get("enable_telemetry", False) | ||
self.telemetry_enabled = ( | ||
self.client_telemetry_enabled and self.server_telemetry_enabled | ||
) | ||
telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there some basis for this 200 value ? cc @vikrantpuppala There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default telemetry batch size is 200 in the JDBC driver There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we fetch the hardcoded variable instead of hardcoding it ourselves? Consider this scenario : Developer changes the default value, but forgets to change line 241 - this will incorrectly populate the telemetry logs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, why should we indicate the JDBC driver values here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moreover, the populating of telemetry fields should be abstracted out of client.py |
||
|
||
user_agent_entry = kwargs.get("user_agent_entry") | ||
if user_agent_entry is None: | ||
user_agent_entry = kwargs.get("_user_agent_entry") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from enum import Enum | ||
|
||
|
||
class AuthFlow(Enum): | ||
saishreeeee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TOKEN_PASSTHROUGH = "token_passthrough" | ||
CLIENT_CREDENTIALS = "client_credentials" | ||
BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" | ||
AZURE_MANAGED_IDENTITIES = "azure_managed_identities" | ||
|
||
|
||
class AuthMech(Enum): | ||
OTHER = "other" | ||
PAT = "pat" | ||
OAUTH = "oauth" | ||
|
||
|
||
class DatabricksClientType(Enum): | ||
SEA = "SEA" | ||
THRIFT = "THRIFT" | ||
|
||
|
||
class DriverVolumeOperationType(Enum): | ||
TYPE_UNSPECIFIED = "type_unspecified" | ||
PUT = "put" | ||
GET = "get" | ||
DELETE = "delete" | ||
LIST = "list" | ||
QUERY = "query" | ||
|
||
|
||
class ExecutionResultFormat(Enum): | ||
FORMAT_UNSPECIFIED = "format_unspecified" | ||
INLINE_ARROW = "inline_arrow" | ||
EXTERNAL_LINKS = "external_links" | ||
COLUMNAR_INLINE = "columnar_inline" | ||
|
||
|
||
class StatementType(Enum): | ||
NONE = "none" | ||
QUERY = "query" | ||
SQL = "sql" | ||
UPDATE = "update" | ||
METADATA = "metadata" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import json | ||
from dataclasses import dataclass, asdict | ||
from typing import List, Optional | ||
|
||
|
||
@dataclass | ||
class TelemetryRequest: | ||
""" | ||
Represents a request to send telemetry data to the server side. | ||
Contains the telemetry items to be uploaded and optional protocol buffer logs. | ||
Attributes: | ||
uploadTime (int): Unix timestamp in milliseconds when the request is made | ||
items (List[str]): List of telemetry event items to be uploaded | ||
protoLogs (Optional[List[str]]): Optional list of protocol buffer formatted logs | ||
""" | ||
|
||
uploadTime: int | ||
items: List[str] | ||
protoLogs: Optional[List[str]] | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class TelemetryResponse: | ||
""" | ||
Represents the response from the telemetry backend after processing a request. | ||
Contains information about the success or failure of the telemetry upload. | ||
Attributes: | ||
errors (List[str]): List of error messages if any occurred during processing | ||
numSuccess (int): Number of successfully processed telemetry items | ||
numProtoSuccess (int): Number of successfully processed protocol buffer logs | ||
""" | ||
|
||
errors: List[str] | ||
numSuccess: int | ||
numProtoSuccess: int | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
import json | ||
from dataclasses import dataclass, asdict | ||
from databricks.sql.telemetry.enums import ( | ||
AuthMech, | ||
AuthFlow, | ||
DatabricksClientType, | ||
DriverVolumeOperationType, | ||
StatementType, | ||
ExecutionResultFormat, | ||
) | ||
from typing import Optional | ||
|
||
|
||
@dataclass | ||
class HostDetails: | ||
""" | ||
Represents the host connection details for a Databricks workspace. | ||
|
||
Attributes: | ||
host_url (str): The URL of the Databricks workspace (e.g., https://my-workspace.cloud.databricks.com) | ||
port (int): The port number for the connection (typically 443 for HTTPS) | ||
""" | ||
|
||
host_url: str | ||
port: int | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class DriverConnectionParameters: | ||
""" | ||
Contains all connection parameters used to establish a connection to Databricks SQL. | ||
This includes authentication details, host information, and connection settings. | ||
|
||
Attributes: | ||
http_path (str): The HTTP path for the SQL endpoint | ||
mode (DatabricksClientType): The type of client connection (e.g., THRIFT) | ||
host_info (HostDetails): Details about the host connection | ||
auth_mech (AuthMech): The authentication mechanism used | ||
auth_flow (AuthFlow): The authentication flow type | ||
auth_scope (str): The scope of authentication | ||
discovery_url (str): URL for service discovery | ||
allowed_volume_ingestion_paths (str): JSON string of allowed paths for volume operations | ||
azure_tenant_id (str): Azure tenant ID for Azure authentication | ||
socket_timeout (int): Connection timeout in milliseconds | ||
""" | ||
|
||
http_path: str | ||
mode: DatabricksClientType | ||
host_info: HostDetails | ||
auth_mech: AuthMech | ||
auth_flow: AuthFlow | ||
auth_scope: str | ||
discovery_url: str | ||
allowed_volume_ingestion_paths: str | ||
azure_tenant_id: str | ||
socket_timeout: int | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class DriverSystemConfiguration: | ||
""" | ||
Contains system-level configuration information about the client environment. | ||
This includes details about the operating system, runtime, and driver version. | ||
|
||
Attributes: | ||
driver_version (str): Version of the Databricks SQL driver | ||
os_name (str): Name of the operating system | ||
os_version (str): Version of the operating system | ||
os_arch (str): Architecture of the operating system | ||
runtime_name (str): Name of the Python runtime (e.g., CPython) | ||
runtime_version (str): Version of the Python runtime | ||
runtime_vendor (str): Vendor of the Python runtime | ||
client_app_name (str): Name of the client application | ||
locale_name (str): System locale setting | ||
driver_name (str): Name of the driver | ||
char_set_encoding (str): Character set encoding used | ||
""" | ||
|
||
driver_version: str | ||
os_name: str | ||
os_version: str | ||
os_arch: str | ||
runtime_name: str | ||
runtime_version: str | ||
runtime_vendor: str | ||
client_app_name: str | ||
locale_name: str | ||
driver_name: str | ||
char_set_encoding: str | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class DriverVolumeOperation: | ||
""" | ||
Represents a volume operation performed by the driver. | ||
Used for tracking volume-related operations in telemetry. | ||
|
||
Attributes: | ||
volume_operation_type (DriverVolumeOperationType): Type of volume operation (e.g., LIST) | ||
volume_path (str): Path to the volume being operated on | ||
""" | ||
|
||
volume_operation_type: DriverVolumeOperationType | ||
volume_path: str | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class DriverErrorInfo: | ||
""" | ||
Contains detailed information about errors that occur during driver operations. | ||
Used for error tracking and debugging in telemetry. | ||
|
||
Attributes: | ||
error_name (str): Name/type of the error | ||
stack_trace (str): Full stack trace of the error | ||
""" | ||
|
||
error_name: str | ||
stack_trace: str | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class SqlExecutionEvent: | ||
""" | ||
Represents a SQL query execution event. | ||
Contains details about the query execution, including type, compression, and result format. | ||
|
||
Attributes: | ||
statement_type (StatementType): Type of SQL statement | ||
is_compressed (bool): Whether the result is compressed | ||
execution_result (ExecutionResultFormat): Format of the execution result | ||
retry_count (int): Number of retry attempts made | ||
""" | ||
|
||
statement_type: StatementType | ||
is_compressed: bool | ||
execution_result: ExecutionResultFormat | ||
retry_count: int | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class TelemetryEvent: | ||
""" | ||
Main telemetry event class that aggregates all telemetry data. | ||
Contains information about the session, system configuration, connection parameters, | ||
and any operations or errors that occurred. | ||
|
||
Attributes: | ||
session_id (str): Unique identifier for the session | ||
sql_statement_id (Optional[str]): ID of the SQL statement if applicable | ||
system_configuration (DriverSystemConfiguration): System configuration details | ||
driver_connection_params (DriverConnectionParameters): Connection parameters | ||
auth_type (Optional[str]): Type of authentication used | ||
vol_operation (Optional[DriverVolumeOperation]): Volume operation details if applicable | ||
sql_operation (Optional[SqlExecutionEvent]): SQL execution details if applicable | ||
error_info (Optional[DriverErrorInfo]): Error information if an error occurred | ||
operation_latency_ms (Optional[int]): Operation latency in milliseconds | ||
""" | ||
|
||
session_id: str | ||
system_configuration: DriverSystemConfiguration | ||
driver_connection_params: DriverConnectionParameters | ||
sql_statement_id: Optional[str] = None | ||
auth_type: Optional[str] = None | ||
vol_operation: Optional[DriverVolumeOperation] = None | ||
sql_operation: Optional[SqlExecutionEvent] = None | ||
error_info: Optional[DriverErrorInfo] = None | ||
operation_latency_ms: Optional[int] = None | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import json | ||
from dataclasses import dataclass, asdict | ||
from databricks.sql.telemetry.models.telemetry_event import TelemetryEvent | ||
from typing import Optional | ||
|
||
|
||
@dataclass | ||
class TelemetryClientContext: | ||
""" | ||
Contains client-side context information for telemetry events. | ||
This includes timestamp and user agent information for tracking when and how the client is being used. | ||
|
||
Attributes: | ||
timestamp_millis (int): Unix timestamp in milliseconds when the event occurred | ||
user_agent (str): Identifier for the client application making the request | ||
""" | ||
|
||
timestamp_millis: int | ||
user_agent: str | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class FrontendLogContext: | ||
""" | ||
Wrapper for client context information in frontend logs. | ||
Provides additional context about the client environment for telemetry events. | ||
|
||
Attributes: | ||
client_context (TelemetryClientContext): Client-specific context information | ||
""" | ||
|
||
client_context: TelemetryClientContext | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class FrontendLogEntry: | ||
""" | ||
Contains the actual telemetry event data in a frontend log. | ||
Wraps the SQL driver log information for frontend processing. | ||
|
||
Attributes: | ||
sql_driver_log (TelemetryEvent): The telemetry event containing SQL driver information | ||
""" | ||
|
||
sql_driver_log: TelemetryEvent | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) | ||
|
||
|
||
@dataclass | ||
class TelemetryFrontendLog: | ||
""" | ||
Main container for frontend telemetry data. | ||
Aggregates workspace information, event ID, context, and the actual log entry. | ||
Used for sending telemetry data to the server side. | ||
|
||
Attributes: | ||
workspace_id (int): Unique identifier for the Databricks workspace | ||
frontend_log_event_id (str): Unique identifier for this telemetry event | ||
context (FrontendLogContext): Context information about the client | ||
entry (FrontendLogEntry): The actual telemetry event data | ||
""" | ||
|
||
frontend_log_event_id: str | ||
context: FrontendLogContext | ||
entry: FrontendLogEntry | ||
workspace_id: Optional[int] = None | ||
|
||
def to_json(self): | ||
return json.dumps(asdict(self)) |
Uh oh!
There was an error while loading. Please reload this page.