Skip to content

Commit 6e61887

Browse files
committed
Revert "shifted thread pool executor to telemetry manager"
This reverts commit 419dac3302f67edd00c1b1a8254947f6e55f74b8.
1 parent 0ea69d9 commit 6e61887

File tree

1 file changed

+156
-8
lines changed

1 file changed

+156
-8
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 156 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from abc import ABC, abstractmethod
2424

2525

26+
<<<<<<< HEAD
2627
class BaseTelemetryClient(ABC):
2728
@abstractmethod
2829
def export_initial_telemetry_log(self, **kwargs):
@@ -52,13 +53,29 @@ class TelemetryClient(BaseTelemetryClient):
5253
def __init__(self, telemetry_enabled, batch_size, connection_uuid, **kwargs):
5354
self.telemetry_enabled = telemetry_enabled
5455
self.batch_size = batch_size
56+
=======
57+
class TelemetryClient:
58+
def __init__(
59+
self,
60+
host,
61+
connection_uuid,
62+
batch_size,
63+
auth_provider=None,
64+
is_authenticated=False,
65+
user_agent=None,
66+
):
67+
self.host_url = host
68+
>>>>>>> parent of 419dac3 (shifted thread pool executor to telemetry manager)
5569
self.connection_uuid = connection_uuid
56-
self.host_url = kwargs.get("host_url", None)
57-
self.auth_provider = kwargs.get("auth_provider", None)
58-
self.is_authenticated = kwargs.get("is_authenticated", False)
59-
self.user_agent = kwargs.get("user_agent", None)
70+
self.auth_provider = auth_provider
71+
self.is_authenticated = is_authenticated
72+
self.batch_size = batch_size
73+
self.user_agent = user_agent
6074
self.events_batch = []
6175
self.lock = threading.Lock()
76+
self.executor = ThreadPoolExecutor(
77+
max_workers=10 # TODO: Decide on max workers
78+
) # Thread pool for async operations
6279
self.DriverConnectionParameters = None
6380

6481
def export_event(self, event):
@@ -75,18 +92,67 @@ def flush(self):
7592
self.events_batch = []
7693

7794
if events_to_flush:
95+
<<<<<<< HEAD
7896
telemetry_client_factory._send_telemetry(
7997
events_to_flush,
8098
self.host_url,
8199
self.is_authenticated,
82100
self.auth_provider,
83101
)
102+
=======
103+
self.executor.submit(self._send_telemetry, events_to_flush)
84104

85-
def export_initial_telemetry_log(self, **kwargs):
86-
http_path = kwargs.get("http_path", None)
87-
port = kwargs.get("port", None)
88-
socket_timeout = kwargs.get("socket_timeout", None)
105+
def _send_telemetry(self, events):
106+
"""Send telemetry events to the server"""
107+
request = {
108+
"uploadTime": int(time.time() * 1000),
109+
"items": [],
110+
"protoLogs": [event.to_json() for event in events],
111+
}
112+
113+
path = "/telemetry-ext" if self.is_authenticated else "/telemetry-unauth"
114+
url = f"https://{self.host_url}{path}"
115+
116+
headers = {"Accept": "application/json", "Content-Type": "application/json"}
89117

118+
if self.is_authenticated and self.auth_provider:
119+
self.auth_provider.add_headers(headers)
120+
121+
# print("\n=== Request Details ===", flush=True)
122+
# print(f"URL: {url}", flush=True)
123+
# print("\nHeaders:", flush=True)
124+
# for key, value in headers.items():
125+
# print(f" {key}: {value}", flush=True)
126+
127+
# print("\nRequest Body:", flush=True)
128+
# print(json.dumps(request, indent=2), flush=True)
129+
# sys.stdout.flush()
130+
131+
response = requests.post(
132+
url, data=json.dumps(request), headers=headers, timeout=10
133+
)
134+
135+
# print("\n=== Response Details ===", flush=True)
136+
# print(f"Status Code: {response.status_code}", flush=True)
137+
# print("\nResponse Headers:", flush=True)
138+
# for key, value in response.headers.items():
139+
# print(f" {key}: {value}", flush=True)
140+
141+
# print("\nResponse Body:", flush=True)
142+
# try:
143+
# response_json = response.json()
144+
# print(json.dumps(response_json, indent=2), flush=True)
145+
# except json.JSONDecodeError:
146+
# print(response.text, flush=True)
147+
# sys.stdout.flush()
148+
149+
def close(self):
150+
"""Flush remaining events and shut down executor"""
151+
self.flush()
152+
self.executor.shutdown(wait=True)
153+
>>>>>>> parent of 419dac3 (shifted thread pool executor to telemetry manager)
154+
155+
def export_initial_telemetry_log(self, http_path, port, socket_timeout):
90156
discovery_url = None
91157
if hasattr(self.auth_provider, "oauth_manager") and hasattr(
92158
self.auth_provider.oauth_manager, "idp_endpoint"
@@ -123,10 +189,26 @@ def export_initial_telemetry_log(self, **kwargs):
123189

124190
self.export_event(telemetry_frontend_log)
125191

192+
<<<<<<< HEAD
126193
def close(self):
127194
"""Flush remaining events before closing"""
128195
self.flush()
129196
telemetry_client_factory.close(self.connection_uuid)
197+
=======
198+
def export_failure_log(self, errorName, errorMessage):
199+
pass
200+
201+
def export_sql_latency_log(
202+
self, latency_ms, sql_execution_event, sql_statement_id=None
203+
):
204+
"""Export telemetry for sql execution"""
205+
pass
206+
207+
def export_volume_latency_log(self, latency_ms, volume_operation):
208+
"""Export telemetry for volume operation"""
209+
pass
210+
211+
>>>>>>> parent of 419dac3 (shifted thread pool executor to telemetry manager)
130212

131213

132214
class TelemetryClientFactory:
@@ -145,6 +227,7 @@ def __init__(self):
145227
return
146228

147229
self._clients = {} # Map of connection_uuid -> TelemetryClient
230+
<<<<<<< HEAD
148231
self.executor = ThreadPoolExecutor(
149232
max_workers=10
150233
) # Thread pool for async operations TODO: Decide on max workers
@@ -186,6 +269,59 @@ def _send_telemetry(self, events, host_url, is_authenticated, auth_provider):
186269
requests.post, url, data=json.dumps(request), headers=headers, timeout=10
187270
)
188271

272+
=======
273+
self._initialized = True
274+
275+
def initialize(
276+
self,
277+
host,
278+
connection_uuid,
279+
batch_size,
280+
auth_provider=None,
281+
is_authenticated=False,
282+
user_agent=None,
283+
):
284+
"""Initialize a telemetry client for a specific connection"""
285+
if connection_uuid not in self._clients:
286+
self._clients[connection_uuid] = TelemetryClient(
287+
host=host,
288+
connection_uuid=connection_uuid,
289+
batch_size=batch_size,
290+
auth_provider=auth_provider,
291+
is_authenticated=is_authenticated,
292+
user_agent=user_agent,
293+
)
294+
295+
def export_failure_log(self, error_name, error_message, connection_uuid):
296+
"""Export error logs for a specific connection or all connections if connection_uuid is None"""
297+
pass
298+
299+
def export_initial_telemetry_log(
300+
self, http_path, port, socket_timeout, connection_uuid
301+
):
302+
"""Export initial telemetry for a specific connection"""
303+
if connection_uuid in self._clients:
304+
self._clients[connection_uuid].export_initial_telemetry_log(
305+
http_path, port, socket_timeout
306+
)
307+
308+
def export_sql_latency_log(
309+
self,
310+
latency_ms,
311+
sql_execution_event,
312+
sql_statement_id=None,
313+
connection_uuid=None,
314+
):
315+
"""Export latency logs for sql execution for a specific connection"""
316+
pass
317+
318+
def export_volume_latency_log(
319+
self, latency_ms, volume_operation, connection_uuid=None
320+
):
321+
"""Export latency logs for volume operation for a specific connection"""
322+
pass
323+
324+
>>>>>>> parent of 419dac3 (shifted thread pool executor to telemetry manager)
189325
@classmethod
190326
def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration:
191327
if cls._DRIVER_SYSTEM_CONFIGURATION is None:
@@ -207,6 +343,7 @@ def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration:
207343
return cls._DRIVER_SYSTEM_CONFIGURATION
208344

209345
def close(self, connection_uuid):
346+
<<<<<<< HEAD
210347
del self._clients[connection_uuid]
211348

212349
# Shutdown executor if no more clients
@@ -216,3 +353,14 @@ def close(self, connection_uuid):
216353

217354
# Create a global instance
218355
telemetry_client_factory = TelemetryClientFactory()
356+
=======
357+
"""Close telemetry client(s)"""
358+
if connection_uuid:
359+
if connection_uuid in self._clients:
360+
self._clients[connection_uuid].close()
361+
del self._clients[connection_uuid]
362+
363+
364+
# Create a global instance
365+
telemetry_client = TelemetryManager()
366+
>>>>>>> parent of 419dac3 (shifted thread pool executor to telemetry manager)

0 commit comments

Comments
 (0)