11
11
12
12
import databricks
13
13
from databricks .sql .auth .retry import CommandType
14
- from databricks .sql .backend .sea .models .base import ExternalLink
14
+ from databricks .sql .backend .sea .models .base import ExternalLink , ResultManifest
15
15
from databricks .sql .backend .sea .utils .constants import (
16
16
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP ,
17
17
ResultFormat ,
@@ -320,6 +320,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
320
320
user_friendly_error_message = error_info .user_friendly_error_message (
321
321
no_retry_reason , attempt , elapsed
322
322
)
323
+ logger .info (f"User friendly error message: { user_friendly_error_message } " )
323
324
network_request_error = RequestError (
324
325
user_friendly_error_message , full_error_info_context , error_info .error
325
326
)
@@ -594,14 +595,6 @@ def open_session(
594
595
595
596
session_response = CreateSessionResponse .from_dict (response )
596
597
session_id = session_response .session_id
597
- if not session_id :
598
- raise ServerOperationError (
599
- "Failed to create session: No session ID returned" ,
600
- {
601
- "operation-id" : None ,
602
- "diagnostic-info" : None ,
603
- },
604
- )
605
598
606
599
return SessionId .from_sea_session_id (session_id )
607
600
except Exception as e :
@@ -733,7 +726,9 @@ def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
733
726
734
727
return link
735
728
736
- def _results_message_to_execute_response (self , sea_response , command_id ):
729
+ def _results_message_to_execute_response (
730
+ self , response : GetStatementResponse , command_id : CommandId
731
+ ) -> ExecuteResponse :
737
732
"""
738
733
Convert a SEA response to an ExecuteResponse and extract result data.
739
734
@@ -811,7 +806,7 @@ def execute_command(
811
806
lz4_compression : bool ,
812
807
cursor : "Cursor" ,
813
808
use_cloud_fetch : bool ,
814
- parameters : List [Dict [ str , Any ] ],
809
+ parameters : List [ttypes . TSparkParameter ],
815
810
async_op : bool ,
816
811
enforce_embedded_schema_correctness : bool ,
817
812
) -> Union ["ResultSet" , None ]:
@@ -845,9 +840,9 @@ def execute_command(
845
840
for param in parameters :
846
841
sea_parameters .append (
847
842
StatementParameter (
848
- name = param [ " name" ] ,
849
- value = param [ " value" ] ,
850
- type = param [ " type" ] if "type" in param else None ,
843
+ name = param . name ,
844
+ value = param . value . stringValue ,
845
+ type = param . type ,
851
846
)
852
847
)
853
848
@@ -1057,25 +1052,24 @@ def get_execution_result(
1057
1052
params = None ,
1058
1053
headers = None ,
1059
1054
)
1055
+ response = GetStatementResponse .from_dict (response_data )
1060
1056
1061
1057
# Create and return a SeaResultSet
1062
1058
from databricks .sql .result_set import SeaResultSet
1063
1059
1064
1060
# Convert the response to an ExecuteResponse and extract result data
1065
- (
1066
- execute_response ,
1067
- result_data ,
1068
- manifest ,
1069
- ) = self ._results_message_to_execute_response (response_data , command_id )
1061
+ execute_response = self ._results_message_to_execute_response (
1062
+ response , command_id
1063
+ )
1070
1064
1071
1065
return SeaResultSet (
1072
1066
connection = cursor .connection ,
1073
1067
execute_response = execute_response ,
1074
1068
sea_client = self ,
1075
1069
buffer_size_bytes = cursor .buffer_size_bytes ,
1076
1070
arraysize = cursor .arraysize ,
1077
- result_data = result_data ,
1078
- manifest = manifest ,
1071
+ result_data = response . result ,
1072
+ manifest = response . manifest ,
1079
1073
)
1080
1074
except Exception as e :
1081
1075
logger .error ("SeaDatabricksClient.get_execution_result: Exception: %s" , e )
@@ -1091,9 +1085,12 @@ def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
1091
1085
ExternalLink: External link for the chunk
1092
1086
"""
1093
1087
1094
- response_data = self .http_client . _make_request (
1095
- method = "GET" ,
1088
+ response_data = self .make_request (
1089
+ method_name = "GET" ,
1096
1090
path = self .CHUNK_PATH_WITH_ID_AND_INDEX .format (statement_id , chunk_index ),
1091
+ data = None ,
1092
+ params = None ,
1093
+ headers = None ,
1097
1094
)
1098
1095
response = GetChunksResponse .from_dict (response_data )
1099
1096
@@ -1180,9 +1177,6 @@ def get_tables(
1180
1177
table_types : Optional [List [str ]] = None ,
1181
1178
) -> "ResultSet" :
1182
1179
"""Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'."""
1183
- if not catalog_name :
1184
- raise ValueError ("Catalog name is required for get_tables" )
1185
-
1186
1180
operation = (
1187
1181
MetadataCommands .SHOW_TABLES_ALL_CATALOGS .value
1188
1182
if catalog_name in [None , "*" , "%" ]
0 commit comments