3
3
import json
4
4
import requests
5
5
from concurrent .futures import ThreadPoolExecutor
6
- import logging
7
- from abc import ABC , abstractmethod
8
6
from databricks .sql .telemetry .models .event import (
9
7
TelemetryEvent ,
10
8
DriverConnectionParameters ,
11
9
DriverSystemConfiguration ,
12
- DriverErrorInfo ,
13
- DriverVolumeOperation ,
14
- SqlExecutionEvent ,
15
10
HostDetails ,
16
11
)
17
12
from databricks .sql .telemetry .models .frontend_logs import (
27
22
import locale
28
23
29
24
30
- class BaseTelemetryClient (ABC ):
31
- """Abstract base class for telemetry clients."""
32
-
33
- @abstractmethod
34
- def export_event (self , event ):
35
- pass
36
-
37
- @abstractmethod
38
- def flush (self ):
39
- pass
40
-
41
- @abstractmethod
42
- def close (self ):
43
- pass
44
-
45
- @abstractmethod
46
- def export_initial_telemetry_log (self , http_path , port , socket_timeout ):
47
- pass
48
-
49
-
50
- class TelemetryClient (BaseTelemetryClient ):
25
+ class TelemetryClient :
51
26
def __init__ (
52
27
self ,
53
28
host ,
@@ -71,14 +46,14 @@ def __init__(
71
46
self .DriverConnectionParameters = None
72
47
73
48
def export_event (self , event ):
74
- # Add an event to the batch queue and flush if batch is full
49
+ """ Add an event to the batch queue and flush if batch is full"""
75
50
with self .lock :
76
51
self .events_batch .append (event )
77
52
if len (self .events_batch ) >= self .batch_size :
78
53
self .flush ()
79
54
80
55
def flush (self ):
81
- # Flush the current batch of events to the server
56
+ """ Flush the current batch of events to the server"""
82
57
with self .lock :
83
58
events_to_flush = self .events_batch .copy ()
84
59
self .events_batch = []
@@ -87,7 +62,7 @@ def flush(self):
87
62
self .executor .submit (self ._send_telemetry , events_to_flush )
88
63
89
64
def _send_telemetry (self , events ):
90
- # Send telemetry events to the server
65
+ """ Send telemetry events to the server"""
91
66
request = {
92
67
"uploadTime" : int (time .time () * 1000 ),
93
68
"items" : [],
@@ -102,12 +77,36 @@ def _send_telemetry(self, events):
102
77
if self .is_authenticated and self .auth_provider :
103
78
self .auth_provider .add_headers (headers )
104
79
80
+ # print("\n=== Request Details ===", flush=True)
81
+ # print(f"URL: {url}", flush=True)
82
+ # print("\nHeaders:", flush=True)
83
+ # for key, value in headers.items():
84
+ # print(f" {key}: {value}", flush=True)
85
+
86
+ # print("\nRequest Body:", flush=True)
87
+ # print(json.dumps(request, indent=2), flush=True)
88
+ # sys.stdout.flush()
89
+
105
90
response = requests .post (
106
91
url , data = json .dumps (request ), headers = headers , timeout = 10
107
92
)
108
93
94
+ # print("\n=== Response Details ===", flush=True)
95
+ # print(f"Status Code: {response.status_code}", flush=True)
96
+ # print("\nResponse Headers:", flush=True)
97
+ # for key, value in response.headers.items():
98
+ # print(f" {key}: {value}", flush=True)
99
+
100
+ # print("\nResponse Body:", flush=True)
101
+ # try:
102
+ # response_json = response.json()
103
+ # print(json.dumps(response_json, indent=2), flush=True)
104
+ # except json.JSONDecodeError:
105
+ # print(response.text, flush=True)
106
+ # sys.stdout.flush()
107
+
109
108
def close (self ):
110
- # Flush remaining events and shut down executor
109
+ """ Flush remaining events and shut down executor"""
111
110
self .flush ()
112
111
self .executor .shutdown (wait = True )
113
112
@@ -162,23 +161,20 @@ def export_volume_latency_log(self, latency_ms, volume_operation):
162
161
pass
163
162
164
163
165
- class NoopTelemetryClient (BaseTelemetryClient ):
166
- """A no-operation telemetry client that implements the same interface but does nothing"""
167
-
168
- def export_event (self , event ):
169
- pass
170
-
171
- def flush (self ):
172
- pass
164
+ class TelemetryManager :
165
+ """A singleton manager class that handles telemetry operations for SQL connections.
173
166
174
- def close (self ):
175
- pass
176
-
177
- def export_initial_telemetry_log (self , http_path , port , socket_timeout ):
178
- pass
167
+ This class maintains a map of connection_uuid to TelemetryClient instances. The initialize()
168
+ method is only called from the connection class when telemetry is enabled for that connection.
169
+ All telemetry operations (initial logs, failure logs, latency logs) first check if the
170
+ connection_uuid exists in the map. If it doesn't exist (meaning telemetry was not enabled
171
+ for that connection), the operation is skipped. If it exists, the operation is delegated
172
+ to the corresponding TelemetryClient instance.
179
173
174
+ This design ensures that telemetry operations are only performed for connections where
175
+ telemetry was explicitly enabled during initialization.
176
+ """
180
177
181
- class TelemetryManager :
182
178
_instance = None
183
179
_DRIVER_SYSTEM_CONFIGURATION = None
184
180
0 commit comments