Skip to content

Commit cf89ce3

Browse files
authored
Added functionality for export of failure logs (#591)
* added functionality for export of failure logs Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * changed logger.error to logger.debug in exc.py Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * Fix telemetry loss during Python shutdown Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * unit tests for export_failure_log Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * try-catch blocks to make telemetry failures non-blocking for connector operations Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed redundant try/catch blocks, added try/catch block to initialize and get telemetry client Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * skip null fields in telemetry request Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed dup import, renamed func, changed a filter_null_values to lamda Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed unnecassary class variable and a redundant try/except block Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * public functions defined at interface level Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * changed export_event and flush to private functions Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * formatting Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * changed connection_uuid to thread local in thrift backend Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * made errors more specific Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * revert change to connection_uuid Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * reverting change in close in telemetry client Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * JsonSerializableMixin Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * isdataclass check in JsonSerializableMixin Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * convert TelemetryClientFactory to module-level functions, replace NoopTelemetryClient class with NOOP_TELEMETRY_CLIENT singleton, updated tests accordingly Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * renamed connection_uuid as session_id_hex Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * added NotImplementedError to abstract class, added unit tests Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * formatting Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * added PEP-249 link, changed NoopTelemetryClient implementation Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed unused import Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * made telemetry client close a module-level function Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * unit tests verbose Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * debug logs in unit tests Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * debug logs in unit tests Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed ABC from mixin, added try/catch block around executor shutdown Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * checking stuff Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * finding out * finding out more * more more finding out more nice * locks are useless anyways * haha * normal * := looks like walrus horizontally * one more * walrus again * old stuff without walrus seems to fail * manually do the walrussing * change 3.13t, v2 Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * formatting, added walrus Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * formatting Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed walrus, removed test before stalling test Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * changed order of stalling test Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * removed debugging, added TelemetryClientFactory Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> * remove more debugging Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com> --------- Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 99ec875 commit cf89ce3

File tree

8 files changed

+643
-305
lines changed

8 files changed

+643
-305
lines changed

src/databricks/sql/client.py

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
OperationalError,
1919
SessionAlreadyClosedError,
2020
CursorAlreadyClosedError,
21+
InterfaceError,
22+
NotSupportedError,
23+
ProgrammingError,
2124
)
2225
from databricks.sql.thrift_api.TCLIService import ttypes
2326
from databricks.sql.thrift_backend import ThriftBackend
@@ -50,8 +53,8 @@
5053
TOperationState,
5154
)
5255
from databricks.sql.telemetry.telemetry_client import (
53-
TelemetryClientFactory,
5456
TelemetryHelper,
57+
TelemetryClientFactory,
5558
)
5659
from databricks.sql.telemetry.models.enums import DatabricksClientType
5760
from databricks.sql.telemetry.models.event import (
@@ -305,13 +308,13 @@ def read(self) -> Optional[OAuthToken]:
305308

306309
TelemetryClientFactory.initialize_telemetry_client(
307310
telemetry_enabled=self.telemetry_enabled,
308-
connection_uuid=self.get_session_id_hex(),
311+
session_id_hex=self.get_session_id_hex(),
309312
auth_provider=auth_provider,
310313
host_url=self.host,
311314
)
312315

313316
self._telemetry_client = TelemetryClientFactory.get_telemetry_client(
314-
connection_uuid=self.get_session_id_hex()
317+
session_id_hex=self.get_session_id_hex()
315318
)
316319

317320
driver_connection_params = DriverConnectionParameters(
@@ -421,7 +424,10 @@ def cursor(
421424
Will throw an Error if the connection has been closed.
422425
"""
423426
if not self.open:
424-
raise Error("Cannot create cursor from closed connection")
427+
raise InterfaceError(
428+
"Cannot create cursor from closed connection",
429+
session_id_hex=self.get_session_id_hex(),
430+
)
425431

426432
cursor = Cursor(
427433
self,
@@ -464,14 +470,17 @@ def _close(self, close_cursors=True) -> None:
464470

465471
self.open = False
466472

467-
self._telemetry_client.close()
473+
TelemetryClientFactory.close(self.get_session_id_hex())
468474

469475
def commit(self):
470476
"""No-op because Databricks does not support transactions"""
471477
pass
472478

473479
def rollback(self):
474-
raise NotSupportedError("Transactions are not supported on Databricks")
480+
raise NotSupportedError(
481+
"Transactions are not supported on Databricks",
482+
session_id_hex=self.get_session_id_hex(),
483+
)
475484

476485

477486
class Cursor:
@@ -523,7 +532,10 @@ def __iter__(self):
523532
for row in self.active_result_set:
524533
yield row
525534
else:
526-
raise Error("There is no active result set")
535+
raise ProgrammingError(
536+
"There is no active result set",
537+
session_id_hex=self.connection.get_session_id_hex(),
538+
)
527539

528540
def _determine_parameter_approach(
529541
self, params: Optional[TParameterCollection]
@@ -660,7 +672,10 @@ def _close_and_clear_active_result_set(self):
660672

661673
def _check_not_closed(self):
662674
if not self.open:
663-
raise Error("Attempting operation on closed cursor")
675+
raise InterfaceError(
676+
"Attempting operation on closed cursor",
677+
session_id_hex=self.connection.get_session_id_hex(),
678+
)
664679

665680
def _handle_staging_operation(
666681
self, staging_allowed_local_path: Union[None, str, List[str]]
@@ -677,8 +692,9 @@ def _handle_staging_operation(
677692
elif isinstance(staging_allowed_local_path, type(list())):
678693
_staging_allowed_local_paths = staging_allowed_local_path
679694
else:
680-
raise Error(
681-
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands"
695+
raise ProgrammingError(
696+
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands",
697+
session_id_hex=self.connection.get_session_id_hex(),
682698
)
683699

684700
abs_staging_allowed_local_paths = [
@@ -706,8 +722,9 @@ def _handle_staging_operation(
706722
else:
707723
continue
708724
if not allow_operation:
709-
raise Error(
710-
"Local file operations are restricted to paths within the configured staging_allowed_local_path"
725+
raise ProgrammingError(
726+
"Local file operations are restricted to paths within the configured staging_allowed_local_path",
727+
session_id_hex=self.connection.get_session_id_hex(),
711728
)
712729

713730
# May be real headers, or could be json string
@@ -735,9 +752,10 @@ def _handle_staging_operation(
735752
handler_args.pop("local_file")
736753
return self._handle_staging_remove(**handler_args)
737754
else:
738-
raise Error(
755+
raise ProgrammingError(
739756
f"Operation {row.operation} is not supported. "
740-
+ "Supported operations are GET, PUT, and REMOVE"
757+
+ "Supported operations are GET, PUT, and REMOVE",
758+
session_id_hex=self.connection.get_session_id_hex(),
741759
)
742760

743761
def _handle_staging_put(
@@ -749,7 +767,10 @@ def _handle_staging_put(
749767
"""
750768

751769
if local_file is None:
752-
raise Error("Cannot perform PUT without specifying a local_file")
770+
raise ProgrammingError(
771+
"Cannot perform PUT without specifying a local_file",
772+
session_id_hex=self.connection.get_session_id_hex(),
773+
)
753774

754775
with open(local_file, "rb") as fh:
755776
r = requests.put(url=presigned_url, data=fh, headers=headers)
@@ -765,8 +786,9 @@ def _handle_staging_put(
765786
# fmt: on
766787

767788
if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]:
768-
raise Error(
769-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
789+
raise OperationalError(
790+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
791+
session_id_hex=self.connection.get_session_id_hex(),
770792
)
771793

772794
if r.status_code == ACCEPTED:
@@ -784,15 +806,19 @@ def _handle_staging_get(
784806
"""
785807

786808
if local_file is None:
787-
raise Error("Cannot perform GET without specifying a local_file")
809+
raise ProgrammingError(
810+
"Cannot perform GET without specifying a local_file",
811+
session_id_hex=self.connection.get_session_id_hex(),
812+
)
788813

789814
r = requests.get(url=presigned_url, headers=headers)
790815

791816
# response.ok verifies the status code is not between 400-600.
792817
# Any 2xx or 3xx will evaluate r.ok == True
793818
if not r.ok:
794-
raise Error(
795-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
819+
raise OperationalError(
820+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
821+
session_id_hex=self.connection.get_session_id_hex(),
796822
)
797823

798824
with open(local_file, "wb") as fp:
@@ -806,8 +832,9 @@ def _handle_staging_remove(
806832
r = requests.delete(url=presigned_url, headers=headers)
807833

808834
if not r.ok:
809-
raise Error(
810-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
835+
raise OperationalError(
836+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
837+
session_id_hex=self.connection.get_session_id_hex(),
811838
)
812839

813840
def execute(
@@ -1005,8 +1032,9 @@ def get_async_execution_result(self):
10051032

10061033
return self
10071034
else:
1008-
raise Error(
1009-
f"get_execution_result failed with Operation status {operation_state}"
1035+
raise OperationalError(
1036+
f"get_execution_result failed with Operation status {operation_state}",
1037+
session_id_hex=self.connection.get_session_id_hex(),
10101038
)
10111039

10121040
def executemany(self, operation, seq_of_parameters):
@@ -1156,7 +1184,10 @@ def fetchall(self) -> List[Row]:
11561184
if self.active_result_set:
11571185
return self.active_result_set.fetchall()
11581186
else:
1159-
raise Error("There is no active result set")
1187+
raise ProgrammingError(
1188+
"There is no active result set",
1189+
session_id_hex=self.connection.get_session_id_hex(),
1190+
)
11601191

11611192
def fetchone(self) -> Optional[Row]:
11621193
"""
@@ -1170,7 +1201,10 @@ def fetchone(self) -> Optional[Row]:
11701201
if self.active_result_set:
11711202
return self.active_result_set.fetchone()
11721203
else:
1173-
raise Error("There is no active result set")
1204+
raise ProgrammingError(
1205+
"There is no active result set",
1206+
session_id_hex=self.connection.get_session_id_hex(),
1207+
)
11741208

11751209
def fetchmany(self, size: int) -> List[Row]:
11761210
"""
@@ -1192,21 +1226,30 @@ def fetchmany(self, size: int) -> List[Row]:
11921226
if self.active_result_set:
11931227
return self.active_result_set.fetchmany(size)
11941228
else:
1195-
raise Error("There is no active result set")
1229+
raise ProgrammingError(
1230+
"There is no active result set",
1231+
session_id_hex=self.connection.get_session_id_hex(),
1232+
)
11961233

11971234
def fetchall_arrow(self) -> "pyarrow.Table":
11981235
self._check_not_closed()
11991236
if self.active_result_set:
12001237
return self.active_result_set.fetchall_arrow()
12011238
else:
1202-
raise Error("There is no active result set")
1239+
raise ProgrammingError(
1240+
"There is no active result set",
1241+
session_id_hex=self.connection.get_session_id_hex(),
1242+
)
12031243

12041244
def fetchmany_arrow(self, size) -> "pyarrow.Table":
12051245
self._check_not_closed()
12061246
if self.active_result_set:
12071247
return self.active_result_set.fetchmany_arrow(size)
12081248
else:
1209-
raise Error("There is no active result set")
1249+
raise ProgrammingError(
1250+
"There is no active result set",
1251+
session_id_hex=self.connection.get_session_id_hex(),
1252+
)
12101253

12111254
def cancel(self) -> None:
12121255
"""

src/databricks/sql/exc.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
11
import json
22
import logging
33

4-
logger = logging.getLogger(__name__)
4+
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
55

6+
logger = logging.getLogger(__name__)
67

78
### PEP-249 Mandated ###
9+
# https://peps.python.org/pep-0249/#exceptions
810
class Error(Exception):
911
"""Base class for DB-API2.0 exceptions.
1012
`message`: An optional user-friendly error message. It should be short, actionable and stable
1113
`context`: Optional extra context about the error. MUST be JSON serializable
1214
"""
1315

14-
def __init__(self, message=None, context=None, *args, **kwargs):
16+
def __init__(
17+
self, message=None, context=None, session_id_hex=None, *args, **kwargs
18+
):
1519
super().__init__(message, *args, **kwargs)
1620
self.message = message
1721
self.context = context or {}
1822

23+
error_name = self.__class__.__name__
24+
if session_id_hex:
25+
telemetry_client = TelemetryClientFactory.get_telemetry_client(
26+
session_id_hex
27+
)
28+
telemetry_client.export_failure_log(error_name, self.message)
29+
1930
def __str__(self):
2031
return self.message
2132

0 commit comments

Comments
 (0)