23
23
from abc import ABC , abstractmethod
24
24
25
25
26
- < << << << HEAD
27
26
class BaseTelemetryClient (ABC ):
28
27
@abstractmethod
29
28
def export_initial_telemetry_log (self , ** kwargs ):
@@ -53,29 +52,13 @@ class TelemetryClient(BaseTelemetryClient):
53
52
def __init__ (self , telemetry_enabled , batch_size , connection_uuid , ** kwargs ):
54
53
self .telemetry_enabled = telemetry_enabled
55
54
self .batch_size = batch_size
56
- == == == =
57
- class TelemetryClient :
58
- def __init__ (
59
- self ,
60
- host ,
61
- connection_uuid ,
62
- batch_size ,
63
- auth_provider = None ,
64
- is_authenticated = False ,
65
- user_agent = None ,
66
- ):
67
- self .host_url = host
68
- > >> >> >> parent of 419 dac3 (shifted thread pool executor to telemetry manager )
69
55
self .connection_uuid = connection_uuid
70
- self .auth_provider = auth_provider
71
- self .is_authenticated = is_authenticated
72
- self .batch_size = batch_size
73
- self .user_agent = user_agent
56
+ self .host_url = kwargs . get ( "host_url" , None )
57
+ self .auth_provider = kwargs . get ( "auth_provider" , None )
58
+ self .is_authenticated = kwargs . get ( "is_authenticated" , False )
59
+ self .user_agent = kwargs . get ( " user_agent" , None )
74
60
self .events_batch = []
75
61
self .lock = threading .Lock ()
76
- self .executor = ThreadPoolExecutor (
77
- max_workers = 10 # TODO: Decide on max workers
78
- ) # Thread pool for async operations
79
62
self .DriverConnectionParameters = None
80
63
81
64
def export_event (self , event ):
@@ -92,67 +75,18 @@ def flush(self):
92
75
self .events_batch = []
93
76
94
77
if events_to_flush :
95
- < << << << HEAD
96
78
telemetry_client_factory ._send_telemetry (
97
79
events_to_flush ,
98
80
self .host_url ,
99
81
self .is_authenticated ,
100
82
self .auth_provider ,
101
83
)
102
- == == == =
103
- self .executor .submit (self ._send_telemetry , events_to_flush )
104
-
105
- def _send_telemetry (self , events ):
106
- """Send telemetry events to the server"""
107
- request = {
108
- "uploadTime" : int (time .time () * 1000 ),
109
- "items" : [],
110
- "protoLogs" : [event .to_json () for event in events ],
111
- }
112
-
113
- path = "/telemetry-ext" if self .is_authenticated else "/telemetry-unauth"
114
- url = f"https://{ self .host_url } { path } "
115
-
116
- headers = {"Accept" : "application/json" , "Content-Type" : "application/json" }
117
84
118
- if self .is_authenticated and self .auth_provider :
119
- self .auth_provider .add_headers (headers )
120
-
121
- # print("\n=== Request Details ===", flush=True)
122
- # print(f"URL: {url}", flush=True)
123
- # print("\nHeaders:", flush=True)
124
- # for key, value in headers.items():
125
- # print(f" {key}: {value}", flush=True)
126
-
127
- # print("\nRequest Body:", flush=True)
128
- # print(json.dumps(request, indent=2), flush=True)
129
- # sys.stdout.flush()
130
-
131
- response = requests .post (
132
- url , data = json .dumps (request ), headers = headers , timeout = 10
133
- )
134
-
135
- # print("\n=== Response Details ===", flush=True)
136
- # print(f"Status Code: {response.status_code}", flush=True)
137
- # print("\nResponse Headers:", flush=True)
138
- # for key, value in response.headers.items():
139
- # print(f" {key}: {value}", flush=True)
140
-
141
- # print("\nResponse Body:", flush=True)
142
- # try:
143
- # response_json = response.json()
144
- # print(json.dumps(response_json, indent=2), flush=True)
145
- # except json.JSONDecodeError:
146
- # print(response.text, flush=True)
147
- # sys.stdout.flush()
148
-
149
- def close (self ):
150
- """Flush remaining events and shut down executor"""
151
- self .flush ()
152
- self .executor .shutdown (wait = True )
153
- >> >> >> > parent of 419 dac3 (shifted thread pool executor to telemetry manager )
85
+ def export_initial_telemetry_log (self , ** kwargs ):
86
+ http_path = kwargs .get ("http_path" , None )
87
+ port = kwargs .get ("port" , None )
88
+ socket_timeout = kwargs .get ("socket_timeout" , None )
154
89
155
- def export_initial_telemetry_log (self , http_path , port , socket_timeout ):
156
90
discovery_url = None
157
91
if hasattr (self .auth_provider , "oauth_manager" ) and hasattr (
158
92
self .auth_provider .oauth_manager , "idp_endpoint"
@@ -189,26 +123,10 @@ def export_initial_telemetry_log(self, http_path, port, socket_timeout):
189
123
190
124
self .export_event (telemetry_frontend_log )
191
125
192
- < << << << HEAD
193
126
def close (self ):
194
127
"""Flush remaining events before closing"""
195
128
self .flush ()
196
129
telemetry_client_factory .close (self .connection_uuid )
197
- == == == =
198
- def export_failure_log (self , errorName , errorMessage ):
199
- pass
200
-
201
- def export_sql_latency_log (
202
- self , latency_ms , sql_execution_event , sql_statement_id = None
203
- ):
204
- """Export telemetry for sql execution"""
205
- pass
206
-
207
- def export_volume_latency_log (self , latency_ms , volume_operation ):
208
- """Export telemetry for volume operation"""
209
- pass
210
-
211
- >> >> >> > parent of 419 dac3 (shifted thread pool executor to telemetry manager )
212
130
213
131
214
132
class TelemetryClientFactory :
@@ -227,7 +145,6 @@ def __init__(self):
227
145
return
228
146
229
147
self ._clients = {} # Map of connection_uuid -> TelemetryClient
230
- < << << << HEAD
231
148
self .executor = ThreadPoolExecutor (
232
149
max_workers = 10
233
150
) # Thread pool for async operations TODO: Decide on max workers
@@ -269,59 +186,6 @@ def _send_telemetry(self, events, host_url, is_authenticated, auth_provider):
269
186
requests .post , url , data = json .dumps (request ), headers = headers , timeout = 10
270
187
)
271
188
272
- == == == =
273
- self ._initialized = True
274
-
275
- def initialize (
276
- self ,
277
- host ,
278
- connection_uuid ,
279
- batch_size ,
280
- auth_provider = None ,
281
- is_authenticated = False ,
282
- user_agent = None ,
283
- ):
284
- """Initialize a telemetry client for a specific connection"""
285
- if connection_uuid not in self ._clients :
286
- self ._clients [connection_uuid ] = TelemetryClient (
287
- host = host ,
288
- connection_uuid = connection_uuid ,
289
- batch_size = batch_size ,
290
- auth_provider = auth_provider ,
291
- is_authenticated = is_authenticated ,
292
- user_agent = user_agent ,
293
- )
294
-
295
- def export_failure_log (self , error_name , error_message , connection_uuid ):
296
- """Export error logs for a specific connection or all connections if connection_uuid is None"""
297
- pass
298
-
299
- def export_initial_telemetry_log (
300
- self , http_path , port , socket_timeout , connection_uuid
301
- ):
302
- """Export initial telemetry for a specific connection"""
303
- if connection_uuid in self ._clients :
304
- self ._clients [connection_uuid ].export_initial_telemetry_log (
305
- http_path , port , socket_timeout
306
- )
307
-
308
- def export_sql_latency_log (
309
- self ,
310
- latency_ms ,
311
- sql_execution_event ,
312
- sql_statement_id = None ,
313
- connection_uuid = None ,
314
- ):
315
- """Export latency logs for sql execution for a specific connection"""
316
- pass
317
-
318
- def export_volume_latency_log (
319
- self , latency_ms , volume_operation , connection_uuid = None
320
- ):
321
- """Export latency logs for volume operation for a specific connection"""
322
- pass
323
-
324
- >> >> >> > parent of 419 dac3 (shifted thread pool executor to telemetry manager )
325
189
@classmethod
326
190
def getDriverSystemConfiguration (cls ) -> DriverSystemConfiguration :
327
191
if cls ._DRIVER_SYSTEM_CONFIGURATION is None :
@@ -343,7 +207,6 @@ def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration:
343
207
return cls ._DRIVER_SYSTEM_CONFIGURATION
344
208
345
209
def close (self , connection_uuid ):
346
- < << << << HEAD
347
210
del self ._clients [connection_uuid ]
348
211
349
212
# Shutdown executor if no more clients
@@ -353,14 +216,3 @@ def close(self, connection_uuid):
353
216
354
217
# Create a global instance
355
218
telemetry_client_factory = TelemetryClientFactory ()
356
- == == == =
357
- """Close telemetry client(s)"""
358
- if connection_uuid :
359
- if connection_uuid in self ._clients :
360
- self ._clients [connection_uuid ].close ()
361
- del self ._clients [connection_uuid ]
362
-
363
-
364
- # Create a global instance
365
- telemetry_client = TelemetryManager ()
366
- > >> >> >> parent of 419 dac3 (shifted thread pool executor to telemetry manager )
0 commit comments