-
Notifications
You must be signed in to change notification settings - Fork 114
Implement ResultSet Abstraction (backend interfaces for fetch phase) #574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 250 commits
8ea5cf4
3eba92a
6735fd5
12388e8
853e6c3
4f8b54c
12ee56f
a5152d0
2c2984e
4979422
6361e85
904c304
e5ac8c6
e06fd2f
7126f97
b1245da
b54b04a
e1d7f71
1c721e0
f1fa67a
512d37c
ff0ec64
c5900c9
c86e99b
bcd6f01
ca51d1d
d403fcd
1b4154c
eac0433
1b0fc9b
b45871e
561c351
a3188ad
ff7abf6
e7e2333
fd4fae6
677e66a
4ec8703
6ecb6bf
fe2ce17
568b1f4
ebbd150
aa8af45
e7be76b
7461715
b295acd
8afc5d5
b8f9146
7c733ee
0917ea1
3fd2a46
03d3ae7
fb0fa46
e3770cd
3b8002f
f1a350a
98b0dc7
afee423
2d24fdd
a5561e8
0d890a5
c2aa762
fe642da
394333c
ef07acd
1ef46cf
76ca997
afc6f8f
f6660ba
9871a93
595d795
10ee940
3d75d6c
b8e1bbd
dac08f2
7b0cbed
9267ef9
5f00532
59ed5ce
c95951d
335d918
ee4f94c
1c8bb11
9de280e
04b626a
2a01173
b1faa09
270edcf
8523fd3
763f070
1e0d9d5
890cdd7
9bdee1d
cdd7a19
fcc2da9
d354309
d63544e
5bbf223
9c62b21
7bb7ca6
438a080
eb50411
2a5b9c7
d31aa59
3e62c90
f9a6b13
a941575
032c276
d36889d
22e5ce4
7772403
8b27150
398db45
c962b63
c246872
326f338
37e73a9
3d7123c
132e1b7
46090c0
28249c0
5ab0a2c
6528cd1
8f7754b
f7d3865
61cc398
554d011
6f28297
983ec03
6f3b5b7
29a2840
0d28b69
8cb8cdd
4495f9b
c744117
ef5a06b
abbaaa5
33765cb
788d8c7
4debbd3
0e6e215
925394c
4ad6c8d
51369c8
9541464
cbdd3d7
ca38e95
b40c0fd
35ed462
37f3af1
09c5e2f
4ce6aab
307f447
802d8dc
944d446
6338083
3658a91
8ef6ed6
61300b2
44e7d17
d2035ea
8b4451b
d21d2c3
21068a3
476e763
1e1cf1e
b408c2c
73649f2
a61df99
e1a2c0e
71ba9d5
160ba9f
6b3436f
30849dc
4d455bb
6fc0834
d254e48
370627d
ca1b57d
6c120c0
cdf6865
12ce717
1215fd8
dd083f6
8d30436
066aef9
1ed3514
ca80f94
6027fb1
7a2f9b5
39294e9
709e910
1ad0ace
913da63
d8159e7
0b91183
ff78b5f
c1d53d2
a5a8e51
f7be10c
a888dd6
29a2985
9b9735e
0a8226c
42263c4
ac984e4
8da84e8
f4f27e3
4e3ccce
04eb8c1
ca425eb
7a47dd0
00d9aeb
eecc67d
9800636
e07f56c
73fb141
d838653
6654f06
89425f9
93e55e8
7689d75
1ec8c45
80b7bc3
904efe7
c91bc37
7f6073d
1dada97
2603f37
ad97cc4
d4ce10f
6297f7b
536387b
7a6237e
9b6afd2
d80a3e2
88996e0
1bec8e9
af166b9
ac34732
7e730db
dfb4cc3
6a9f0e2
0b2e83f
1f0c81f
aa7207e
fde4634
54f2b93
db73ecf
cf8a629
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
|
||
from databricks.sql.thrift_api.TCLIService.ttypes import TOperationState | ||
from databricks.sql.backend.types import ( | ||
CommandState, | ||
SessionId, | ||
CommandId, | ||
BackendType, | ||
Comment on lines
+16
to
19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: alphabetic ordered imports; do we have that setup in the linter? |
||
|
@@ -51,6 +52,7 @@ | |
) | ||
from databricks.sql.types import SSLOptions | ||
from databricks.sql.backend.databricks_client import DatabricksClient | ||
from databricks.sql.result_set import ResultSet, ThriftResultSet | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -808,7 +810,7 @@ def _results_message_to_execute_response(self, resp, operation_state): | |
|
||
def get_execution_result( | ||
self, command_id: CommandId, cursor: "Cursor" | ||
) -> ExecuteResponse: | ||
) -> "ResultSet": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for a follow-up PR to this, since the actual implementations of databricks client interface might be different for SEA and thrift, consider adding specific pydoc to different implementations |
||
thrift_handle = command_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift command ID") | ||
|
@@ -857,7 +859,7 @@ def get_execution_result( | |
ssl_options=self._ssl_options, | ||
) | ||
|
||
return ExecuteResponse( | ||
execute_response = ExecuteResponse( | ||
arrow_queue=queue, | ||
status=resp.status, | ||
has_been_closed_server_side=False, | ||
|
@@ -869,6 +871,15 @@ def get_execution_result( | |
arrow_schema_bytes=schema_bytes, | ||
) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the need for connection in result set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the current implementation, it is used for:
Passing the property is not an essential use-case, because we can just pass it as a parameter to the ResultSet if required, but I don't think we can circumvent the first use case. |
||
execute_response=execute_response, | ||
thrift_client=self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please be mindful of introducing circular dependencies or references. thrift_backend outputs thrift_result_set which depends on thrift_backend There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case it feels essential to me, since the The only way I can think of to avoid this anti-pattern is to create separate fetch phase and exec phase clients, but that would be a large deviation from the existing connectors. Any other ideas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already pass connection which has session which in turn has the backend? can we look into normalising this init params (in a separate PR) |
||
buffer_size_bytes=cursor.buffer_size_bytes, | ||
arraysize=cursor.arraysize, | ||
use_cloud_fetch=cursor.connection.use_cloud_fetch, | ||
) | ||
|
||
def _wait_until_command_done(self, op_handle, initial_operation_status_resp): | ||
if initial_operation_status_resp: | ||
self._check_command_not_in_error_or_closed_state( | ||
|
@@ -887,15 +898,15 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): | |
self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) | ||
return operation_state | ||
|
||
def get_query_state(self, command_id: CommandId) -> "TOperationState": | ||
def get_query_state(self, command_id: CommandId) -> CommandState: | ||
thrift_handle = command_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift command ID") | ||
|
||
poll_resp = self._poll_for_status(thrift_handle) | ||
operation_state = poll_resp.operationState | ||
self._check_command_not_in_error_or_closed_state(thrift_handle, poll_resp) | ||
return operation_state | ||
return CommandState.from_thrift_state(operation_state) | ||
|
||
@staticmethod | ||
def _check_direct_results_for_error(t_spark_direct_results): | ||
|
@@ -929,7 +940,7 @@ def execute_command( | |
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) -> Optional[ExecuteResponse]: | ||
) -> Union["ResultSet", None]: | ||
thrift_handle = session_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift session ID") | ||
|
@@ -976,15 +987,24 @@ def execute_command( | |
self._handle_execute_response_async(resp, cursor) | ||
return None | ||
else: | ||
return self._handle_execute_response(resp, cursor) | ||
execute_response = self._handle_execute_response(resp, cursor) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
execute_response=execute_response, | ||
thrift_client=self, | ||
buffer_size_bytes=max_bytes, | ||
arraysize=max_rows, | ||
use_cloud_fetch=use_cloud_fetch, | ||
) | ||
|
||
def get_catalogs( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
) -> ExecuteResponse: | ||
) -> "ResultSet": | ||
thrift_handle = session_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift session ID") | ||
|
@@ -996,7 +1016,17 @@ def get_catalogs( | |
), | ||
) | ||
resp = self.make_request(self._client.GetCatalogs, req) | ||
return self._handle_execute_response(resp, cursor) | ||
|
||
execute_response = self._handle_execute_response(resp, cursor) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
execute_response=execute_response, | ||
thrift_client=self, | ||
buffer_size_bytes=max_bytes, | ||
arraysize=max_rows, | ||
use_cloud_fetch=cursor.connection.use_cloud_fetch, | ||
) | ||
|
||
def get_schemas( | ||
self, | ||
|
@@ -1006,7 +1036,7 @@ def get_schemas( | |
cursor: "Cursor", | ||
catalog_name=None, | ||
schema_name=None, | ||
) -> ExecuteResponse: | ||
) -> "ResultSet": | ||
thrift_handle = session_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift session ID") | ||
|
@@ -1020,7 +1050,17 @@ def get_schemas( | |
schemaName=schema_name, | ||
) | ||
resp = self.make_request(self._client.GetSchemas, req) | ||
return self._handle_execute_response(resp, cursor) | ||
|
||
execute_response = self._handle_execute_response(resp, cursor) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
execute_response=execute_response, | ||
thrift_client=self, | ||
buffer_size_bytes=max_bytes, | ||
arraysize=max_rows, | ||
use_cloud_fetch=cursor.connection.use_cloud_fetch, | ||
) | ||
|
||
def get_tables( | ||
self, | ||
|
@@ -1032,7 +1072,7 @@ def get_tables( | |
schema_name=None, | ||
table_name=None, | ||
table_types=None, | ||
) -> ExecuteResponse: | ||
) -> "ResultSet": | ||
thrift_handle = session_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift session ID") | ||
|
@@ -1048,7 +1088,17 @@ def get_tables( | |
tableTypes=table_types, | ||
) | ||
resp = self.make_request(self._client.GetTables, req) | ||
return self._handle_execute_response(resp, cursor) | ||
|
||
execute_response = self._handle_execute_response(resp, cursor) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
execute_response=execute_response, | ||
thrift_client=self, | ||
buffer_size_bytes=max_bytes, | ||
arraysize=max_rows, | ||
use_cloud_fetch=cursor.connection.use_cloud_fetch, | ||
) | ||
|
||
def get_columns( | ||
self, | ||
|
@@ -1060,7 +1110,7 @@ def get_columns( | |
schema_name=None, | ||
table_name=None, | ||
column_name=None, | ||
) -> ExecuteResponse: | ||
) -> "ResultSet": | ||
thrift_handle = session_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift session ID") | ||
|
@@ -1076,7 +1126,17 @@ def get_columns( | |
columnName=column_name, | ||
) | ||
resp = self.make_request(self._client.GetColumns, req) | ||
return self._handle_execute_response(resp, cursor) | ||
|
||
execute_response = self._handle_execute_response(resp, cursor) | ||
|
||
return ThriftResultSet( | ||
connection=cursor.connection, | ||
execute_response=execute_response, | ||
thrift_client=self, | ||
buffer_size_bytes=max_bytes, | ||
arraysize=max_rows, | ||
use_cloud_fetch=cursor.connection.use_cloud_fetch, | ||
) | ||
|
||
def _handle_execute_response(self, resp, cursor): | ||
command_id = CommandId.from_thrift_handle(resp.operationHandle) | ||
|
@@ -1154,7 +1214,7 @@ def cancel_command(self, command_id: CommandId) -> None: | |
req = ttypes.TCancelOperationReq(thrift_handle) | ||
self.make_request(self._client.CancelOperation, req) | ||
|
||
def close_command(self, command_id: CommandId): | ||
def close_command(self, command_id: CommandId) -> None: | ||
thrift_handle = command_id.to_thrift_handle() | ||
if not thrift_handle: | ||
raise ValueError("Not a valid Thrift command ID") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,46 @@ | ||
from enum import Enum | ||
from typing import Dict, Optional, Any, Union | ||
from typing import Dict, Optional, Any | ||
import logging | ||
|
||
from databricks.sql.backend.utils import guid_to_hex_id | ||
from databricks.sql.thrift_api.TCLIService import ttypes | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CommandState(Enum): | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
PENDING = "PENDING" | ||
RUNNING = "RUNNING" | ||
SUCCEEDED = "SUCCEEDED" | ||
FAILED = "FAILED" | ||
CLOSED = "CLOSED" | ||
CANCELLED = "CANCELLED" | ||
|
||
@classmethod | ||
def from_thrift_state(cls, state: ttypes.TOperationState) -> "CommandState": | ||
if state in ( | ||
ttypes.TOperationState.INITIALIZED_STATE, | ||
ttypes.TOperationState.PENDING_STATE, | ||
): | ||
return cls.PENDING | ||
elif state == ttypes.TOperationState.RUNNING_STATE: | ||
return cls.RUNNING | ||
elif state == ttypes.TOperationState.FINISHED_STATE: | ||
return cls.SUCCEEDED | ||
elif state in ( | ||
ttypes.TOperationState.ERROR_STATE, | ||
ttypes.TOperationState.TIMEDOUT_STATE, | ||
ttypes.TOperationState.UKNOWN_STATE, | ||
): | ||
return cls.FAILED | ||
elif state == ttypes.TOperationState.CLOSED_STATE: | ||
return cls.CLOSED | ||
elif state == ttypes.TOperationState.CANCELED_STATE: | ||
return cls.CANCELLED | ||
else: | ||
raise ValueError(f"Unknown command state: {state}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is this categorisation coming from? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It comes from the JDBC implementation. |
||
|
||
|
||
class BackendType(Enum): | ||
""" | ||
Enum representing the type of backend | ||
|
@@ -211,6 +245,7 @@ def __str__(self) -> str: | |
Returns: | ||
A string representation of the command ID | ||
""" | ||
|
||
if self.backend_type == BackendType.SEA: | ||
return str(self.guid) | ||
elif self.backend_type == BackendType.THRIFT: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why changed to void/none? if you choose to return void, make sure the implementations throw appropriate errors for the calling code
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type is not used by any of the references, so I chose to convert it to a
None
return type.Any errors in the call will be raised by
make_request
. This structure is identical to the existingcancel_command
implementation (which returnsNone
as well).