Skip to content

Commit f9fcb79

Browse files
introduce databricksClient interface and thrift backend implementation
WARNING: incomplete - result set still aligned heavily with thrift and the necessity of some defined functions is to be validated Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 6253d4f commit f9fcb79

File tree

4 files changed

+211
-46
lines changed

4 files changed

+211
-46
lines changed

src/databricks/sql/client.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
CursorAlreadyClosedError,
2222
)
2323
from databricks.sql.thrift_api.TCLIService import ttypes
24-
from databricks.sql.thrift_backend import ThriftBackend
24+
from databricks.sql.thrift_backend import ThriftDatabricksClient
25+
from databricks.sql.db_client_interface import DatabricksClient
2526
from databricks.sql.utils import (
2627
ExecuteResponse,
2728
ParamEscaper,
@@ -336,7 +337,7 @@ def cursor(
336337

337338
cursor = Cursor(
338339
self,
339-
self.session.thrift_backend,
340+
self.session.backend,
340341
arraysize=arraysize,
341342
result_buffer_size_bytes=buffer_size_bytes,
342343
)
@@ -369,7 +370,7 @@ class Cursor:
369370
def __init__(
370371
self,
371372
connection: Connection,
372-
thrift_backend: ThriftBackend,
373+
backend: DatabricksClient,
373374
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
374375
arraysize: int = DEFAULT_ARRAY_SIZE,
375376
) -> None:
@@ -388,7 +389,7 @@ def __init__(
388389
# Note that Cursor closed => active result set closed, but not vice versa
389390
self.open = True
390391
self.executing_command_id = None
391-
self.thrift_backend = thrift_backend
392+
self.backend = backend
392393
self.active_op_handle = None
393394
self.escaper = ParamEscaper()
394395
self.lastrowid = None
@@ -753,7 +754,8 @@ def execute(
753754

754755
self._check_not_closed()
755756
self._close_and_clear_active_result_set()
756-
execute_response = self.thrift_backend.execute_command(
757+
print("here")
758+
execute_response = self.backend.execute_command(
757759
operation=prepared_operation,
758760
session_handle=self.connection.session._session_handle,
759761
max_rows=self.arraysize,
@@ -768,15 +770,15 @@ def execute(
768770
self.active_result_set = ResultSet(
769771
self.connection,
770772
execute_response,
771-
self.thrift_backend,
773+
self.backend,
772774
self.buffer_size_bytes,
773775
self.arraysize,
774776
self.connection.use_cloud_fetch,
775777
)
776778

777779
if execute_response.is_staging_operation:
778780
self._handle_staging_operation(
779-
staging_allowed_local_path=self.thrift_backend.staging_allowed_local_path
781+
staging_allowed_local_path=self.backend.staging_allowed_local_path
780782
)
781783

782784
return self
@@ -816,7 +818,7 @@ def execute_async(
816818

817819
self._check_not_closed()
818820
self._close_and_clear_active_result_set()
819-
self.thrift_backend.execute_command(
821+
self.backend.execute_command(
820822
operation=prepared_operation,
821823
session_handle=self.connection.session._session_handle,
822824
max_rows=self.arraysize,
@@ -838,7 +840,7 @@ def get_query_state(self) -> "TOperationState":
838840
:return:
839841
"""
840842
self._check_not_closed()
841-
return self.thrift_backend.get_query_state(self.active_op_handle)
843+
return self.backend.get_query_state(self.active_op_handle)
842844

843845
def is_query_pending(self):
844846
"""
@@ -868,20 +870,20 @@ def get_async_execution_result(self):
868870

869871
operation_state = self.get_query_state()
870872
if operation_state == ttypes.TOperationState.FINISHED_STATE:
871-
execute_response = self.thrift_backend.get_execution_result(
873+
execute_response = self.backend.get_execution_result(
872874
self.active_op_handle, self
873875
)
874876
self.active_result_set = ResultSet(
875877
self.connection,
876878
execute_response,
877-
self.thrift_backend,
879+
self.backend,
878880
self.buffer_size_bytes,
879881
self.arraysize,
880882
)
881883

882884
if execute_response.is_staging_operation:
883885
self._handle_staging_operation(
884-
staging_allowed_local_path=self.thrift_backend.staging_allowed_local_path
886+
staging_allowed_local_path=self.backend.staging_allowed_local_path
885887
)
886888

887889
return self
@@ -913,7 +915,7 @@ def catalogs(self) -> "Cursor":
913915
"""
914916
self._check_not_closed()
915917
self._close_and_clear_active_result_set()
916-
execute_response = self.thrift_backend.get_catalogs(
918+
execute_response = self.backend.get_catalogs(
917919
session_handle=self.connection.session._session_handle,
918920
max_rows=self.arraysize,
919921
max_bytes=self.buffer_size_bytes,
@@ -922,9 +924,10 @@ def catalogs(self) -> "Cursor":
922924
self.active_result_set = ResultSet(
923925
self.connection,
924926
execute_response,
925-
self.thrift_backend,
927+
self.backend,
926928
self.buffer_size_bytes,
927929
self.arraysize,
930+
self.connection.use_cloud_fetch,
928931
)
929932
return self
930933

@@ -939,7 +942,7 @@ def schemas(
939942
"""
940943
self._check_not_closed()
941944
self._close_and_clear_active_result_set()
942-
execute_response = self.thrift_backend.get_schemas(
945+
execute_response = self.backend.get_schemas(
943946
session_handle=self.connection.session._session_handle,
944947
max_rows=self.arraysize,
945948
max_bytes=self.buffer_size_bytes,
@@ -950,9 +953,10 @@ def schemas(
950953
self.active_result_set = ResultSet(
951954
self.connection,
952955
execute_response,
953-
self.thrift_backend,
956+
self.backend,
954957
self.buffer_size_bytes,
955958
self.arraysize,
959+
self.connection.use_cloud_fetch,
956960
)
957961
return self
958962

@@ -972,7 +976,7 @@ def tables(
972976
self._check_not_closed()
973977
self._close_and_clear_active_result_set()
974978

975-
execute_response = self.thrift_backend.get_tables(
979+
execute_response = self.backend.get_tables(
976980
session_handle=self.connection.session._session_handle,
977981
max_rows=self.arraysize,
978982
max_bytes=self.buffer_size_bytes,
@@ -985,9 +989,10 @@ def tables(
985989
self.active_result_set = ResultSet(
986990
self.connection,
987991
execute_response,
988-
self.thrift_backend,
992+
self.backend,
989993
self.buffer_size_bytes,
990994
self.arraysize,
995+
self.connection.use_cloud_fetch,
991996
)
992997
return self
993998

@@ -1007,7 +1012,7 @@ def columns(
10071012
self._check_not_closed()
10081013
self._close_and_clear_active_result_set()
10091014

1010-
execute_response = self.thrift_backend.get_columns(
1015+
execute_response = self.backend.get_columns(
10111016
session_handle=self.connection.session._session_handle,
10121017
max_rows=self.arraysize,
10131018
max_bytes=self.buffer_size_bytes,
@@ -1020,9 +1025,10 @@ def columns(
10201025
self.active_result_set = ResultSet(
10211026
self.connection,
10221027
execute_response,
1023-
self.thrift_backend,
1028+
self.backend,
10241029
self.buffer_size_bytes,
10251030
self.arraysize,
1031+
self.connection.use_cloud_fetch,
10261032
)
10271033
return self
10281034

@@ -1097,7 +1103,7 @@ def cancel(self) -> None:
10971103
This method can be called from another thread.
10981104
"""
10991105
if self.active_op_handle is not None:
1100-
self.thrift_backend.cancel_command(self.active_op_handle)
1106+
self.backend.cancel_command(self.active_op_handle)
11011107
else:
11021108
logger.warning(
11031109
"Attempting to cancel a command, but there is no "
@@ -1172,7 +1178,7 @@ def __init__(
11721178
self,
11731179
connection: Connection,
11741180
execute_response: ExecuteResponse,
1175-
thrift_backend: ThriftBackend,
1181+
backend: DatabricksClient,
11761182
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
11771183
arraysize: int = 10000,
11781184
use_cloud_fetch: bool = True,
@@ -1182,8 +1188,10 @@ def __init__(
11821188
11831189
:param connection: The parent connection that was used to execute this command
11841190
:param execute_response: A `ExecuteResponse` class returned by a command execution
1185-
:param result_buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch
1186-
amount :param arraysize: The max number of rows to fetch at a time (PEP-249)
1191+
:param backend: The DatabricksClient instance to use for fetching results
1192+
:param result_buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch amount
1193+
:param arraysize: The max number of rows to fetch at a time (PEP-249)
1194+
:param use_cloud_fetch: Whether to use cloud fetch for retrieving results
11871195
"""
11881196
self.connection = connection
11891197
self.command_id = execute_response.command_handle
@@ -1193,7 +1201,7 @@ def __init__(
11931201
self.buffer_size_bytes = result_buffer_size_bytes
11941202
self.lz4_compressed = execute_response.lz4_compressed
11951203
self.arraysize = arraysize
1196-
self.thrift_backend = thrift_backend
1204+
self.backend = backend
11971205
self.description = execute_response.description
11981206
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
11991207
self._next_row_index = 0
@@ -1216,8 +1224,15 @@ def __iter__(self):
12161224
break
12171225

12181226
def _fill_results_buffer(self):
1219-
# At initialization or if the server does not have cloud fetch result links available
1220-
results, has_more_rows = self.thrift_backend.fetch_results(
1227+
if not isinstance(self.backend, ThriftDatabricksClient):
1228+
# This specific logic is for Thrift. SEA will have its own way.
1229+
raise NotImplementedError(
1230+
"Fetching further result batches is currently only implemented for the Thrift backend."
1231+
)
1232+
1233+
# Now we know self.backend is ThriftDatabricksClient, so it has fetch_results
1234+
thrift_backend_instance = self.backend # type: ThriftDatabricksClient
1235+
results, has_more_rows = thrift_backend_instance.fetch_results(
12211236
op_handle=self.command_id,
12221237
max_rows=self.arraysize,
12231238
max_bytes=self.buffer_size_bytes,
@@ -1433,19 +1448,20 @@ def close(self) -> None:
14331448
If the connection has not been closed, and the cursor has not already
14341449
been closed on the server for some other reason, issue a request to the server to close it.
14351450
"""
1451+
# TODO: the state is still thrift specific, define some ENUM for status that each service has to map to
14361452
try:
14371453
if (
1438-
self.op_state != self.thrift_backend.CLOSED_OP_STATE
1454+
self.op_state != ttypes.TOperationState.CLOSED_STATE
14391455
and not self.has_been_closed_server_side
14401456
and self.connection.open
14411457
):
1442-
self.thrift_backend.close_command(self.command_id)
1458+
self.backend.close_command(self.command_id)
14431459
except RequestError as e:
14441460
if isinstance(e.args[1], CursorAlreadyClosedError):
14451461
logger.info("Operation was canceled by a prior request")
14461462
finally:
14471463
self.has_been_closed_server_side = True
1448-
self.op_state = self.thrift_backend.CLOSED_OP_STATE
1464+
self.op_state = ttypes.TOperationState.CLOSED_STATE
14491465

14501466
@staticmethod
14511467
def _get_schema_description(table_schema_message):

0 commit comments

Comments
 (0)