Skip to content

Commit bb3f15a

Browse files
Introduce preliminary SEA Result Set (#588)
* [squash from exec-sea] bring over execution phase changes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove excess test Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * add docstring Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remvoe exec func in sea backend Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove excess files Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove excess models Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove excess sea backend tests Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * cleanup Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * re-introduce get_schema_desc Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove SeaResultSet Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * clean imports and attributes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * pass CommandId to ExecResp Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove changes in types Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * add back essential types (ExecResponse, from_sea_state) Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * fix fetch types Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * excess imports Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * reduce diff by maintaining logs Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * fix int test types Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * [squashed from exec-sea] init execution func Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove irrelevant changes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove ResultSetFilter functionality Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove more irrelevant changes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove more irrelevant changes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * even more irrelevant changes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove sea response as init option Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * move guid_to_hex_id import to utils Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * reduce diff in guid utils import Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * move arrow_schema_bytes back into ExecuteResult Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * maintain log Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove un-necessary assignment Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove un-necessary tuple response Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove un-ncessary verbose mocking Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * move Queue construction to ResultSert Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * move description to List[Tuple] Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * frmatting (black) Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * reduce diff (remove explicit tuple conversion) Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove has_more_rows from ExecuteResponse Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove un-necessary has_more_rows aclc Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * default has_more_rows to True Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * return has_more_rows from ExecResponse conversion during GetRespMetadata Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove unnecessary replacement Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * better mocked backend naming Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove has_more_rows test in ExecuteResponse Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * introduce replacement of original has_more_rows read test Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * call correct method in test_use_arrow_schema Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * call correct method in test_fall_back_to_hive_schema Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * re-introduce result response read test Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * simplify test Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove excess fetch_results mocks Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * more minimal changes to thrift_backend tests Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * move back to old table types Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove outdated arrow_schema_bytes return Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * align SeaResultSet with new structure Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * correct sea res set tests Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove duplicate import Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * rephrase model docstrings to explicitly denote that they are representations and not used over the wire Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * has_more_rows -> is_direct_results Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * switch docstring format to align with Connection class Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * has_more_rows -> is_direct_results Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * fix type errors with arrow_schema_bytes Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * spaces after multi line pydocs Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove duplicate queue init (merge artifact) Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * reduce diff (remove newlines) Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * remove un-necessary changes covered by #588 anyway Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * Revert "remove un-necessary changes" This reverts commit a70a6ce. Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> * b"" -> None Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com> --------- Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent ba8d9fd commit bb3f15a

File tree

2 files changed

+288
-2
lines changed

2 files changed

+288
-2
lines changed

src/databricks/sql/result_set.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def __init__(
4545
results_queue=None,
4646
description=None,
4747
is_staging_operation: bool = False,
48+
lz4_compressed: bool = False,
49+
arrow_schema_bytes: Optional[bytes] = None,
4850
):
4951
"""
5052
A ResultSet manages the results of a single command.
@@ -75,6 +77,8 @@ def __init__(
7577
self.is_direct_results = is_direct_results
7678
self.results = results_queue
7779
self._is_staging_operation = is_staging_operation
80+
self.lz4_compressed = lz4_compressed
81+
self._arrow_schema_bytes = arrow_schema_bytes
7882

7983
def __iter__(self):
8084
while True:
@@ -177,10 +181,10 @@ def __init__(
177181
:param ssl_options: SSL options for cloud fetch
178182
:param is_direct_results: Whether there are more rows to fetch
179183
"""
184+
180185
# Initialize ThriftResultSet-specific attributes
181-
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
182186
self._use_cloud_fetch = use_cloud_fetch
183-
self.lz4_compressed = execute_response.lz4_compressed
187+
self.is_direct_results = is_direct_results
184188

185189
# Build the results queue if t_row_set is provided
186190
results_queue = None
@@ -211,6 +215,8 @@ def __init__(
211215
results_queue=results_queue,
212216
description=execute_response.description,
213217
is_staging_operation=execute_response.is_staging_operation,
218+
lz4_compressed=execute_response.lz4_compressed,
219+
arrow_schema_bytes=execute_response.arrow_schema_bytes,
214220
)
215221

216222
# Initialize results queue if not provided
@@ -438,3 +444,82 @@ def map_col_type(type_):
438444
(column.name, map_col_type(column.datatype), None, None, None, None, None)
439445
for column in table_schema_message.columns
440446
]
447+
448+
449+
class SeaResultSet(ResultSet):
450+
"""ResultSet implementation for SEA backend."""
451+
452+
def __init__(
453+
self,
454+
connection: "Connection",
455+
execute_response: "ExecuteResponse",
456+
sea_client: "SeaDatabricksClient",
457+
buffer_size_bytes: int = 104857600,
458+
arraysize: int = 10000,
459+
result_data=None,
460+
manifest=None,
461+
):
462+
"""
463+
Initialize a SeaResultSet with the response from a SEA query execution.
464+
465+
Args:
466+
connection: The parent connection
467+
execute_response: Response from the execute command
468+
sea_client: The SeaDatabricksClient instance for direct access
469+
buffer_size_bytes: Buffer size for fetching results
470+
arraysize: Default number of rows to fetch
471+
result_data: Result data from SEA response (optional)
472+
manifest: Manifest from SEA response (optional)
473+
"""
474+
475+
super().__init__(
476+
connection=connection,
477+
backend=sea_client,
478+
arraysize=arraysize,
479+
buffer_size_bytes=buffer_size_bytes,
480+
command_id=execute_response.command_id,
481+
status=execute_response.status,
482+
has_been_closed_server_side=execute_response.has_been_closed_server_side,
483+
description=execute_response.description,
484+
is_staging_operation=execute_response.is_staging_operation,
485+
lz4_compressed=execute_response.lz4_compressed,
486+
arrow_schema_bytes=execute_response.arrow_schema_bytes,
487+
)
488+
489+
def _fill_results_buffer(self):
490+
"""Fill the results buffer from the backend."""
491+
raise NotImplementedError(
492+
"_fill_results_buffer is not implemented for SEA backend"
493+
)
494+
495+
def fetchone(self) -> Optional[Row]:
496+
"""
497+
Fetch the next row of a query result set, returning a single sequence,
498+
or None when no more data is available.
499+
"""
500+
501+
raise NotImplementedError("fetchone is not implemented for SEA backend")
502+
503+
def fetchmany(self, size: Optional[int] = None) -> List[Row]:
504+
"""
505+
Fetch the next set of rows of a query result, returning a list of rows.
506+
507+
An empty sequence is returned when no more rows are available.
508+
"""
509+
510+
raise NotImplementedError("fetchmany is not implemented for SEA backend")
511+
512+
def fetchall(self) -> List[Row]:
513+
"""
514+
Fetch all (remaining) rows of a query result, returning them as a list of rows.
515+
"""
516+
517+
raise NotImplementedError("fetchall is not implemented for SEA backend")
518+
519+
def fetchmany_arrow(self, size: int) -> Any:
520+
"""Fetch the next set of rows as an Arrow table."""
521+
raise NotImplementedError("fetchmany_arrow is not implemented for SEA backend")
522+
523+
def fetchall_arrow(self) -> Any:
524+
"""Fetch all remaining rows as an Arrow table."""
525+
raise NotImplementedError("fetchall_arrow is not implemented for SEA backend")

tests/unit/test_sea_result_set.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""
2+
Tests for the SeaResultSet class.
3+
4+
This module contains tests for the SeaResultSet class, which implements
5+
the result set functionality for the SEA (Statement Execution API) backend.
6+
"""
7+
8+
import pytest
9+
from unittest.mock import patch, MagicMock, Mock
10+
11+
from databricks.sql.result_set import SeaResultSet
12+
from databricks.sql.backend.types import CommandId, CommandState, BackendType
13+
14+
15+
class TestSeaResultSet:
16+
"""Test suite for the SeaResultSet class."""
17+
18+
@pytest.fixture
19+
def mock_connection(self):
20+
"""Create a mock connection."""
21+
connection = Mock()
22+
connection.open = True
23+
return connection
24+
25+
@pytest.fixture
26+
def mock_sea_client(self):
27+
"""Create a mock SEA client."""
28+
return Mock()
29+
30+
@pytest.fixture
31+
def execute_response(self):
32+
"""Create a sample execute response."""
33+
mock_response = Mock()
34+
mock_response.command_id = CommandId.from_sea_statement_id("test-statement-123")
35+
mock_response.status = CommandState.SUCCEEDED
36+
mock_response.has_been_closed_server_side = False
37+
mock_response.is_direct_results = False
38+
mock_response.results_queue = None
39+
mock_response.description = [
40+
("test_value", "INT", None, None, None, None, None)
41+
]
42+
mock_response.is_staging_operation = False
43+
return mock_response
44+
45+
def test_init_with_execute_response(
46+
self, mock_connection, mock_sea_client, execute_response
47+
):
48+
"""Test initializing SeaResultSet with an execute response."""
49+
result_set = SeaResultSet(
50+
connection=mock_connection,
51+
execute_response=execute_response,
52+
sea_client=mock_sea_client,
53+
buffer_size_bytes=1000,
54+
arraysize=100,
55+
)
56+
57+
# Verify basic properties
58+
assert result_set.command_id == execute_response.command_id
59+
assert result_set.status == CommandState.SUCCEEDED
60+
assert result_set.connection == mock_connection
61+
assert result_set.backend == mock_sea_client
62+
assert result_set.buffer_size_bytes == 1000
63+
assert result_set.arraysize == 100
64+
assert result_set.description == execute_response.description
65+
66+
def test_close(self, mock_connection, mock_sea_client, execute_response):
67+
"""Test closing a result set."""
68+
result_set = SeaResultSet(
69+
connection=mock_connection,
70+
execute_response=execute_response,
71+
sea_client=mock_sea_client,
72+
buffer_size_bytes=1000,
73+
arraysize=100,
74+
)
75+
76+
# Close the result set
77+
result_set.close()
78+
79+
# Verify the backend's close_command was called
80+
mock_sea_client.close_command.assert_called_once_with(result_set.command_id)
81+
assert result_set.has_been_closed_server_side is True
82+
assert result_set.status == CommandState.CLOSED
83+
84+
def test_close_when_already_closed_server_side(
85+
self, mock_connection, mock_sea_client, execute_response
86+
):
87+
"""Test closing a result set that has already been closed server-side."""
88+
result_set = SeaResultSet(
89+
connection=mock_connection,
90+
execute_response=execute_response,
91+
sea_client=mock_sea_client,
92+
buffer_size_bytes=1000,
93+
arraysize=100,
94+
)
95+
result_set.has_been_closed_server_side = True
96+
97+
# Close the result set
98+
result_set.close()
99+
100+
# Verify the backend's close_command was NOT called
101+
mock_sea_client.close_command.assert_not_called()
102+
assert result_set.has_been_closed_server_side is True
103+
assert result_set.status == CommandState.CLOSED
104+
105+
def test_close_when_connection_closed(
106+
self, mock_connection, mock_sea_client, execute_response
107+
):
108+
"""Test closing a result set when the connection is closed."""
109+
mock_connection.open = False
110+
result_set = SeaResultSet(
111+
connection=mock_connection,
112+
execute_response=execute_response,
113+
sea_client=mock_sea_client,
114+
buffer_size_bytes=1000,
115+
arraysize=100,
116+
)
117+
118+
# Close the result set
119+
result_set.close()
120+
121+
# Verify the backend's close_command was NOT called
122+
mock_sea_client.close_command.assert_not_called()
123+
assert result_set.has_been_closed_server_side is True
124+
assert result_set.status == CommandState.CLOSED
125+
126+
def test_unimplemented_methods(
127+
self, mock_connection, mock_sea_client, execute_response
128+
):
129+
"""Test that unimplemented methods raise NotImplementedError."""
130+
result_set = SeaResultSet(
131+
connection=mock_connection,
132+
execute_response=execute_response,
133+
sea_client=mock_sea_client,
134+
buffer_size_bytes=1000,
135+
arraysize=100,
136+
)
137+
138+
# Test each unimplemented method individually with specific error messages
139+
with pytest.raises(
140+
NotImplementedError, match="fetchone is not implemented for SEA backend"
141+
):
142+
result_set.fetchone()
143+
144+
with pytest.raises(
145+
NotImplementedError, match="fetchmany is not implemented for SEA backend"
146+
):
147+
result_set.fetchmany(10)
148+
149+
with pytest.raises(
150+
NotImplementedError, match="fetchmany is not implemented for SEA backend"
151+
):
152+
# Test with default parameter value
153+
result_set.fetchmany()
154+
155+
with pytest.raises(
156+
NotImplementedError, match="fetchall is not implemented for SEA backend"
157+
):
158+
result_set.fetchall()
159+
160+
with pytest.raises(
161+
NotImplementedError,
162+
match="fetchmany_arrow is not implemented for SEA backend",
163+
):
164+
result_set.fetchmany_arrow(10)
165+
166+
with pytest.raises(
167+
NotImplementedError,
168+
match="fetchall_arrow is not implemented for SEA backend",
169+
):
170+
result_set.fetchall_arrow()
171+
172+
with pytest.raises(
173+
NotImplementedError, match="fetchone is not implemented for SEA backend"
174+
):
175+
# Test iteration protocol (calls fetchone internally)
176+
next(iter(result_set))
177+
178+
with pytest.raises(
179+
NotImplementedError, match="fetchone is not implemented for SEA backend"
180+
):
181+
# Test using the result set in a for loop
182+
for row in result_set:
183+
pass
184+
185+
def test_fill_results_buffer_not_implemented(
186+
self, mock_connection, mock_sea_client, execute_response
187+
):
188+
"""Test that _fill_results_buffer raises NotImplementedError."""
189+
result_set = SeaResultSet(
190+
connection=mock_connection,
191+
execute_response=execute_response,
192+
sea_client=mock_sea_client,
193+
buffer_size_bytes=1000,
194+
arraysize=100,
195+
)
196+
197+
with pytest.raises(
198+
NotImplementedError,
199+
match="_fill_results_buffer is not implemented for SEA backend",
200+
):
201+
result_set._fill_results_buffer()

0 commit comments

Comments
 (0)