Skip to content

Commit 0615c40

Browse files
Merge branch 'fetch-json-inline' into cloudfetchq-sea
2 parents f90b4d4 + 715cc13 commit 0615c40

File tree

4 files changed

+473
-354
lines changed

4 files changed

+473
-354
lines changed

examples/experimental/tests/test_sea_async_query.py

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -77,33 +77,12 @@ def test_sea_async_query_with_cloud_fetch():
7777

7878
logger.info("Query is no longer pending, getting results...")
7979
cursor.get_async_execution_result()
80-
81-
# Use a mix of fetch methods to retrieve all rows
82-
logger.info("Retrieving data using a mix of fetch methods")
83-
84-
# First, get one row with fetchone
85-
first_row = cursor.fetchone()
86-
if not first_row:
87-
logger.error("FAIL: fetchone returned None, expected a row")
88-
return False
89-
90-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
91-
retrieved_rows = [first_row]
92-
93-
# Then, get a batch of rows with fetchmany
94-
batch_size = 100
95-
batch_rows = cursor.fetchmany(batch_size)
96-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
97-
retrieved_rows.extend(batch_rows)
98-
99-
# Finally, get all remaining rows with fetchall
100-
remaining_rows = cursor.fetchall()
101-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
102-
retrieved_rows.extend(remaining_rows)
103-
104-
# Calculate total row count
105-
actual_row_count = len(retrieved_rows)
10680

81+
results = [cursor.fetchone()]
82+
results.extend(cursor.fetchmany(10))
83+
results.extend(cursor.fetchall())
84+
logger.info(f"{len(results)} rows retrieved against 100 requested")
85+
10786
logger.info(
10887
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
10988
)
@@ -200,33 +179,11 @@ def test_sea_async_query_without_cloud_fetch():
200179

201180
logger.info("Query is no longer pending, getting results...")
202181
cursor.get_async_execution_result()
182+
results = [cursor.fetchone()]
183+
results.extend(cursor.fetchmany(10))
184+
results.extend(cursor.fetchall())
185+
logger.info(f"{len(results)} rows retrieved against 100 requested")
203186

204-
# Use a mix of fetch methods to retrieve all rows
205-
logger.info("Retrieving data using a mix of fetch methods")
206-
207-
# First, get one row with fetchone
208-
first_row = cursor.fetchone()
209-
if not first_row:
210-
logger.error("FAIL: fetchone returned None, expected a row")
211-
return False
212-
213-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
214-
retrieved_rows = [first_row]
215-
216-
# Then, get a batch of rows with fetchmany
217-
batch_size = 10 # Smaller batch size for non-cloud fetch
218-
batch_rows = cursor.fetchmany(batch_size)
219-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
220-
retrieved_rows.extend(batch_rows)
221-
222-
# Finally, get all remaining rows with fetchall
223-
remaining_rows = cursor.fetchall()
224-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
225-
retrieved_rows.extend(remaining_rows)
226-
227-
# Calculate total row count
228-
actual_row_count = len(retrieved_rows)
229-
230187
logger.info(
231188
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
232189
)

examples/experimental/tests/test_sea_sync_query.py

Lines changed: 12 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -62,46 +62,10 @@ def test_sea_sync_query_with_cloud_fetch():
6262
logger.info(
6363
f"Executing synchronous query with cloud fetch to generate {requested_row_count} rows"
6464
)
65-
cursor.execute(query)
66-
67-
# Use a mix of fetch methods to retrieve all rows
68-
logger.info("Retrieving data using a mix of fetch methods")
69-
70-
# First, get one row with fetchone
71-
first_row = cursor.fetchone()
72-
if not first_row:
73-
logger.error("FAIL: fetchone returned None, expected a row")
74-
return False
75-
76-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
77-
retrieved_rows = [first_row]
78-
79-
# Then, get a batch of rows with fetchmany
80-
batch_size = 100
81-
batch_rows = cursor.fetchmany(batch_size)
82-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
83-
retrieved_rows.extend(batch_rows)
84-
85-
# Finally, get all remaining rows with fetchall
86-
remaining_rows = cursor.fetchall()
87-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
88-
retrieved_rows.extend(remaining_rows)
89-
90-
# Calculate total row count
91-
actual_row_count = len(retrieved_rows)
92-
93-
logger.info(
94-
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
95-
)
96-
97-
# Verify total row count
98-
if actual_row_count != requested_row_count:
99-
logger.error(
100-
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
101-
)
102-
return False
103-
104-
logger.info("PASS: Received correct number of rows with cloud fetch and all fetch methods work correctly")
65+
results = [cursor.fetchone()]
66+
results.extend(cursor.fetchmany(10))
67+
results.extend(cursor.fetchall())
68+
logger.info(f"{len(results)} rows retrieved against 100 requested")
10569

10670
# Close resources
10771
cursor.close()
@@ -163,56 +127,15 @@ def test_sea_sync_query_without_cloud_fetch():
163127
# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
164128
requested_row_count = 100
165129
cursor = connection.cursor()
166-
query = f"""
167-
SELECT
168-
id,
169-
concat('value_', repeat('a', 100)) as test_value
170-
FROM range(1, {requested_row_count} + 1) AS t(id)
171-
"""
172-
173-
logger.info(
174-
f"Executing synchronous query without cloud fetch to generate {requested_row_count} rows"
130+
logger.info("Executing synchronous query without cloud fetch: SELECT 100 rows")
131+
cursor.execute(
132+
"SELECT id, 'test_value_' || CAST(id as STRING) as test_value FROM range(1, 101)"
175133
)
176-
cursor.execute(query)
177-
178-
# Use a mix of fetch methods to retrieve all rows
179-
logger.info("Retrieving data using a mix of fetch methods")
180-
181-
# First, get one row with fetchone
182-
first_row = cursor.fetchone()
183-
if not first_row:
184-
logger.error("FAIL: fetchone returned None, expected a row")
185-
return False
186-
187-
logger.info(f"Successfully retrieved first row with ID: {first_row[0]}")
188-
retrieved_rows = [first_row]
189-
190-
# Then, get a batch of rows with fetchmany
191-
batch_size = 10 # Smaller batch size for non-cloud fetch
192-
batch_rows = cursor.fetchmany(batch_size)
193-
logger.info(f"Successfully retrieved {len(batch_rows)} rows with fetchmany")
194-
retrieved_rows.extend(batch_rows)
195-
196-
# Finally, get all remaining rows with fetchall
197-
remaining_rows = cursor.fetchall()
198-
logger.info(f"Successfully retrieved {len(remaining_rows)} rows with fetchall")
199-
retrieved_rows.extend(remaining_rows)
200-
201-
# Calculate total row count
202-
actual_row_count = len(retrieved_rows)
203-
204-
logger.info(
205-
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
206-
)
207-
208-
# Verify total row count
209-
if actual_row_count != requested_row_count:
210-
logger.error(
211-
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
212-
)
213-
return False
214-
215-
logger.info("PASS: Received correct number of rows without cloud fetch and all fetch methods work correctly")
134+
135+
results = [cursor.fetchone()]
136+
results.extend(cursor.fetchmany(10))
137+
results.extend(cursor.fetchall())
138+
logger.info(f"{len(results)} rows retrieved against 100 requested")
216139

217140
# Close resources
218141
cursor.close()

src/databricks/sql/result_set.py

Lines changed: 9 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,6 @@ def fetchall(self) -> List[Row]:
155155
"""Fetch all remaining rows of a query result."""
156156
pass
157157

158-
@abstractmethod
159-
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
160-
"""Fetch the next set of rows as an Arrow table."""
161-
pass
162-
163-
@abstractmethod
164-
def fetchall_arrow(self) -> "pyarrow.Table":
165-
"""Fetch all remaining rows as an Arrow table."""
166-
pass
167-
168158
def close(self) -> None:
169159
"""
170160
Close the result set.
@@ -466,19 +456,13 @@ def __init__(
466456
# Build the results queue
467457
results_queue = None
468458

459+
results_queue = None
469460
if result_data:
470-
from typing import cast, List
471-
472-
# Convert description to the expected format
473-
desc = None
474-
if execute_response.description:
475-
desc = cast(List[Tuple[Any, ...]], execute_response.description)
476-
477461
results_queue = SeaResultSetQueueFactory.build_queue(
478462
result_data,
479463
manifest,
480-
str(self.statement_id),
481-
description=desc,
464+
str(execute_response.command_id.to_sea_statement_id()),
465+
description=execute_response.description,
482466
max_download_threads=sea_client.max_download_threads,
483467
ssl_options=sea_client.ssl_options,
484468
sea_client=sea_client,
@@ -524,38 +508,6 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
524508
n_remaining_rows = size - results.num_rows
525509
self._next_row_index += results.num_rows
526510

527-
while n_remaining_rows > 0:
528-
partial_results = self.results.next_n_rows(n_remaining_rows)
529-
results = pyarrow.concat_tables([results, partial_results])
530-
n_remaining_rows = n_remaining_rows - partial_results.num_rows
531-
self._next_row_index += partial_results.num_rows
532-
533-
return results
534-
535-
def fetchall_arrow(self) -> "pyarrow.Table":
536-
"""
537-
Fetch all remaining rows as an Arrow table.
538-
539-
Returns:
540-
PyArrow Table containing all remaining rows
541-
542-
Raises:
543-
ImportError: If PyArrow is not installed
544-
"""
545-
results = self.results.remaining_rows()
546-
self._next_row_index += results.num_rows
547-
548-
# If PyArrow is installed and we have a ColumnTable result, convert it to PyArrow Table
549-
# Valid only for metadata commands result set
550-
if isinstance(results, ColumnTable) and pyarrow:
551-
data = {
552-
name: col
553-
for name, col in zip(results.column_names, results.column_table)
554-
}
555-
return pyarrow.Table.from_pydict(data)
556-
557-
return results
558-
559511
def fetchmany_json(self, size: int):
560512
"""
561513
Fetch the next set of rows as a columnar table.
@@ -573,15 +525,8 @@ def fetchmany_json(self, size: int):
573525
raise ValueError(f"size argument for fetchmany is {size} but must be >= 0")
574526

575527
results = self.results.next_n_rows(size)
576-
n_remaining_rows = size - len(results)
577528
self._next_row_index += len(results)
578529

579-
while n_remaining_rows > 0:
580-
partial_results = self.results.next_n_rows(n_remaining_rows)
581-
results = results + partial_results
582-
n_remaining_rows = n_remaining_rows - len(partial_results)
583-
self._next_row_index += len(partial_results)
584-
585530
return results
586531

587532
def fetchall_json(self):
@@ -605,9 +550,9 @@ def fetchone(self) -> Optional[Row]:
605550
A single Row object or None if no more rows are available
606551
"""
607552
if isinstance(self.results, JsonQueue):
608-
res = self.fetchmany_json(1)
553+
res = self._convert_json_table(self.fetchmany_json(1))
609554
else:
610-
res = self._convert_arrow_table(self.fetchmany_arrow(1))
555+
raise NotImplementedError("fetchone only supported for JSON data")
611556

612557
return res[0] if res else None
613558

@@ -625,9 +570,9 @@ def fetchmany(self, size: int) -> List[Row]:
625570
ValueError: If size is negative
626571
"""
627572
if isinstance(self.results, JsonQueue):
628-
return self.fetchmany_json(size)
573+
return self._convert_json_table(self.fetchmany_json(size))
629574
else:
630-
return self._convert_arrow_table(self.fetchmany_arrow(size))
575+
raise NotImplementedError("fetchmany only supported for JSON data")
631576

632577
def fetchall(self) -> List[Row]:
633578
"""
@@ -637,6 +582,6 @@ def fetchall(self) -> List[Row]:
637582
List of Row objects containing all remaining rows
638583
"""
639584
if isinstance(self.results, JsonQueue):
640-
return self.fetchall_json()
585+
return self._convert_json_table(self.fetchall_json())
641586
else:
642-
return self._convert_arrow_table(self.fetchall_arrow())
587+
raise NotImplementedError("fetchall only supported for JSON data")

0 commit comments

Comments
 (0)