Skip to content

Commit be17812

Browse files
Merge branch 'fetch-json-inline' into ext-links-sea
2 parents d68e4ea + 715cc13 commit be17812

File tree

4 files changed

+467
-348
lines changed

4 files changed

+467
-348
lines changed

examples/experimental/tests/test_sea_async_query.py

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -77,33 +77,12 @@ def test_sea_async_query_with_cloud_fetch():
7777

7878
logger.info("Query is no longer pending, getting results...")
7979
cursor.get_async_execution_result()
80-
81-
# Use a mix of fetch methods to retrieve all rows
82-
logger.info("Retrieving data using a mix of fetch methods")
83-
84-
# First, get one row with fetchone
85-
first_row = cursor.fetchone()
86-
if not first_row:
87-
logger.error("FAIL: fetchone returned None, expected a row")
88-
return False
89-
90-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
91-
retrieved_rows = [first_row]
92-
93-
# Then, get a batch of rows with fetchmany
94-
batch_size = 100
95-
batch_rows = cursor.fetchmany(batch_size)
96-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
97-
retrieved_rows.extend(batch_rows)
98-
99-
# Finally, get all remaining rows with fetchall
100-
remaining_rows = cursor.fetchall()
101-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
102-
retrieved_rows.extend(remaining_rows)
103-
104-
# Calculate total row count
105-
actual_row_count = len(retrieved_rows)
10680

81+
results = [cursor.fetchone()]
82+
results.extend(cursor.fetchmany(10))
83+
results.extend(cursor.fetchall())
84+
logger.info(f"{len(results)} rows retrieved against 100 requested")
85+
10786
logger.info(
10887
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
10988
)
@@ -200,33 +179,11 @@ def test_sea_async_query_without_cloud_fetch():
200179

201180
logger.info("Query is no longer pending, getting results...")
202181
cursor.get_async_execution_result()
182+
results = [cursor.fetchone()]
183+
results.extend(cursor.fetchmany(10))
184+
results.extend(cursor.fetchall())
185+
logger.info(f"{len(results)} rows retrieved against 100 requested")
203186

204-
# Use a mix of fetch methods to retrieve all rows
205-
logger.info("Retrieving data using a mix of fetch methods")
206-
207-
# First, get one row with fetchone
208-
first_row = cursor.fetchone()
209-
if not first_row:
210-
logger.error("FAIL: fetchone returned None, expected a row")
211-
return False
212-
213-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
214-
retrieved_rows = [first_row]
215-
216-
# Then, get a batch of rows with fetchmany
217-
batch_size = 10 # Smaller batch size for non-cloud fetch
218-
batch_rows = cursor.fetchmany(batch_size)
219-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
220-
retrieved_rows.extend(batch_rows)
221-
222-
# Finally, get all remaining rows with fetchall
223-
remaining_rows = cursor.fetchall()
224-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
225-
retrieved_rows.extend(remaining_rows)
226-
227-
# Calculate total row count
228-
actual_row_count = len(retrieved_rows)
229-
230187
logger.info(
231188
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
232189
)

examples/experimental/tests/test_sea_sync_query.py

Lines changed: 12 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -62,46 +62,10 @@ def test_sea_sync_query_with_cloud_fetch():
6262
logger.info(
6363
f"Executing synchronous query with cloud fetch to generate {requested_row_count} rows"
6464
)
65-
cursor.execute(query)
66-
67-
# Use a mix of fetch methods to retrieve all rows
68-
logger.info("Retrieving data using a mix of fetch methods")
69-
70-
# First, get one row with fetchone
71-
first_row = cursor.fetchone()
72-
if not first_row:
73-
logger.error("FAIL: fetchone returned None, expected a row")
74-
return False
75-
76-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
77-
retrieved_rows = [first_row]
78-
79-
# Then, get a batch of rows with fetchmany
80-
batch_size = 100
81-
batch_rows = cursor.fetchmany(batch_size)
82-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
83-
retrieved_rows.extend(batch_rows)
84-
85-
# Finally, get all remaining rows with fetchall
86-
remaining_rows = cursor.fetchall()
87-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
88-
retrieved_rows.extend(remaining_rows)
89-
90-
# Calculate total row count
91-
actual_row_count = len(retrieved_rows)
92-
93-
logger.info(
94-
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
95-
)
96-
97-
# Verify total row count
98-
if actual_row_count != requested_row_count:
99-
logger.error(
100-
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
101-
)
102-
return False
103-
104-
logger.info("PASS: Received correct number of rows with cloud fetch and all fetch methods work correctly")
65+
results = [cursor.fetchone()]
66+
results.extend(cursor.fetchmany(10))
67+
results.extend(cursor.fetchall())
68+
logger.info(f"{len(results)} rows retrieved against 100 requested")
10569

10670
# Close resources
10771
cursor.close()
@@ -163,56 +127,15 @@ def test_sea_sync_query_without_cloud_fetch():
163127
# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
164128
requested_row_count = 100
165129
cursor = connection.cursor()
166-
query = f"""
167-
SELECT
168-
id,
169-
concat('value_', repeat('a', 100)) as test_value
170-
FROM range(1, {requested_row_count} + 1) AS t(id)
171-
"""
172-
173-
logger.info(
174-
f"Executing synchronous query without cloud fetch to generate {requested_row_count} rows"
130+
logger.info("Executing synchronous query without cloud fetch: SELECT 100 rows")
131+
cursor.execute(
132+
"SELECT id, 'test_value_' || CAST(id as STRING) as test_value FROM range(1, 101)"
175133
)
176-
cursor.execute(query)
177-
178-
# Use a mix of fetch methods to retrieve all rows
179-
logger.info("Retrieving data using a mix of fetch methods")
180-
181-
# First, get one row with fetchone
182-
first_row = cursor.fetchone()
183-
if not first_row:
184-
logger.error("FAIL: fetchone returned None, expected a row")
185-
return False
186-
187-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
188-
retrieved_rows = [first_row]
189-
190-
# Then, get a batch of rows with fetchmany
191-
batch_size = 10 # Smaller batch size for non-cloud fetch
192-
batch_rows = cursor.fetchmany(batch_size)
193-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
194-
retrieved_rows.extend(batch_rows)
195-
196-
# Finally, get all remaining rows with fetchall
197-
remaining_rows = cursor.fetchall()
198-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
199-
retrieved_rows.extend(remaining_rows)
200-
201-
# Calculate total row count
202-
actual_row_count = len(retrieved_rows)
203-
204-
logger.info(
205-
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
206-
)
207-
208-
# Verify total row count
209-
if actual_row_count != requested_row_count:
210-
logger.error(
211-
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
212-
)
213-
return False
214-
215-
logger.info("PASS: Received correct number of rows without cloud fetch and all fetch methods work correctly")
134+
135+
results = [cursor.fetchone()]
136+
results.extend(cursor.fetchmany(10))
137+
results.extend(cursor.fetchall())
138+
logger.info(f"{len(results)} rows retrieved against 100 requested")
216139

217140
# Close resources
218141
cursor.close()

src/databricks/sql/result_set.py

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,6 @@ def fetchall(self) -> List[Row]:
155155
"""Fetch all remaining rows of a query result."""
156156
pass
157157

158-
@abstractmethod
159-
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
160-
"""Fetch the next set of rows as an Arrow table."""
161-
pass
162-
163-
@abstractmethod
164-
def fetchall_arrow(self) -> "pyarrow.Table":
165-
"""Fetch all remaining rows as an Arrow table."""
166-
pass
167-
168158
def close(self) -> None:
169159
"""
170160
Close the result set.
@@ -478,19 +468,13 @@ def __init__(
478468
# Build the results queue
479469
results_queue = None
480470

471+
results_queue = None
481472
if result_data:
482-
from typing import cast, List
483-
484-
# Convert description to the expected format
485-
desc = None
486-
if execute_response.description:
487-
desc = cast(List[Tuple[Any, ...]], execute_response.description)
488-
489473
results_queue = SeaResultSetQueueFactory.build_queue(
490474
result_data,
491475
manifest,
492-
str(self.statement_id),
493-
description=desc,
476+
str(execute_response.command_id.to_sea_statement_id()),
477+
description=execute_response.description,
494478
max_download_threads=sea_client.max_download_threads,
495479
ssl_options=sea_client.ssl_options,
496480
sea_client=sea_client,
@@ -536,38 +520,6 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
536520
n_remaining_rows = size - results.num_rows
537521
self._next_row_index += results.num_rows
538522

539-
while n_remaining_rows > 0:
540-
partial_results = self.results.next_n_rows(n_remaining_rows)
541-
results = pyarrow.concat_tables([results, partial_results])
542-
n_remaining_rows = n_remaining_rows - partial_results.num_rows
543-
self._next_row_index += partial_results.num_rows
544-
545-
return results
546-
547-
def fetchall_arrow(self) -> "pyarrow.Table":
548-
"""
549-
Fetch all remaining rows as an Arrow table.
550-
551-
Returns:
552-
PyArrow Table containing all remaining rows
553-
554-
Raises:
555-
ImportError: If PyArrow is not installed
556-
"""
557-
results = self.results.remaining_rows()
558-
self._next_row_index += results.num_rows
559-
560-
# If PyArrow is installed and we have a ColumnTable result, convert it to PyArrow Table
561-
# Valid only for metadata commands result set
562-
if isinstance(results, ColumnTable) and pyarrow:
563-
data = {
564-
name: col
565-
for name, col in zip(results.column_names, results.column_table)
566-
}
567-
return pyarrow.Table.from_pydict(data)
568-
569-
return results
570-
571523
def fetchmany_json(self, size: int):
572524
"""
573525
Fetch the next set of rows as a columnar table.
@@ -585,15 +537,8 @@ def fetchmany_json(self, size: int):
585537
raise ValueError(f"size argument for fetchmany is {size} but must be >= 0")
586538

587539
results = self.results.next_n_rows(size)
588-
n_remaining_rows = size - len(results)
589540
self._next_row_index += len(results)
590541

591-
while n_remaining_rows > 0:
592-
partial_results = self.results.next_n_rows(n_remaining_rows)
593-
results = results + partial_results
594-
n_remaining_rows = n_remaining_rows - len(partial_results)
595-
self._next_row_index += len(partial_results)
596-
597542
return results
598543

599544
def fetchall_json(self):

0 commit comments

Comments
 (0)