20
20
from databricks .sql .thrift_api .TCLIService import ttypes
21
21
from databricks .sql .types import Row
22
22
from databricks .sql .exc import Error , RequestError , CursorAlreadyClosedError
23
- from databricks .sql .utils import ColumnTable , ColumnQueue , JsonQueue , SeaResultSetQueueFactory
23
+ from databricks .sql .utils import (
24
+ ColumnTable ,
25
+ ColumnQueue ,
26
+ JsonQueue ,
27
+ SeaResultSetQueueFactory ,
28
+ )
24
29
from databricks .sql .backend .types import CommandId , CommandState , ExecuteResponse
25
30
26
31
logger = logging .getLogger (__name__ )
@@ -469,8 +474,8 @@ def __init__(
469
474
sea_client : "SeaDatabricksClient" ,
470
475
buffer_size_bytes : int = 104857600 ,
471
476
arraysize : int = 10000 ,
472
- result_data = None ,
473
- manifest = None ,
477
+ result_data : Optional [ ResultData ] = None ,
478
+ manifest : Optional [ ResultManifest ] = None ,
474
479
):
475
480
"""
476
481
Initialize a SeaResultSet with the response from a SEA query execution.
@@ -485,13 +490,17 @@ def __init__(
485
490
manifest: Manifest from SEA response (optional)
486
491
"""
487
492
488
- queue = SeaResultSetQueueFactory .build_queue (
489
- sea_result_data = execute_response .results_data ,
490
- manifest = execute_response .results_manifest ,
491
- statement_id = execute_response .command_id .to_sea_statement_id (),
492
- description = execute_response .description ,
493
- schema_bytes = execute_response .arrow_schema_bytes ,
494
- )
493
+ if result_data :
494
+ queue = SeaResultSetQueueFactory .build_queue (
495
+ sea_result_data = result_data ,
496
+ manifest = manifest ,
497
+ statement_id = execute_response .command_id .to_sea_statement_id (),
498
+ description = execute_response .description ,
499
+ schema_bytes = execute_response .arrow_schema_bytes ,
500
+ )
501
+ else :
502
+ logger .warning ("No result data provided for SEA result set" )
503
+ queue = JsonQueue ([])
495
504
496
505
super ().__init__ (
497
506
connection = connection ,
@@ -501,12 +510,13 @@ def __init__(
501
510
command_id = execute_response .command_id ,
502
511
status = execute_response .status ,
503
512
has_been_closed_server_side = execute_response .has_been_closed_server_side ,
513
+ results_queue = queue ,
504
514
description = execute_response .description ,
505
515
is_staging_operation = execute_response .is_staging_operation ,
506
516
lz4_compressed = execute_response .lz4_compressed ,
507
517
arrow_schema_bytes = execute_response .arrow_schema_bytes ,
508
518
)
509
-
519
+
510
520
def _convert_to_row_objects (self , rows ):
511
521
"""
512
522
Convert raw data rows to Row objects with named columns based on description.
@@ -526,9 +536,7 @@ def _convert_to_row_objects(self, rows):
526
536
527
537
def _fill_results_buffer (self ):
528
538
"""Fill the results buffer from the backend."""
529
- raise NotImplementedError (
530
- "_fill_results_buffer is not implemented for SEA backend"
531
- )
539
+ return None
532
540
533
541
def fetchone (self ) -> Optional [Row ]:
534
542
"""
@@ -572,8 +580,15 @@ def fetchall(self) -> List[Row]:
572
580
"""
573
581
Fetch all (remaining) rows of a query result, returning them as a list of rows.
574
582
"""
583
+ # Note: We check for the specific queue type to maintain consistency with ThriftResultSet
584
+ if isinstance (self .results , JsonQueue ):
585
+ rows = self .results .remaining_rows ()
586
+ self ._next_row_index += len (rows )
575
587
576
- raise NotImplementedError ("fetchall is not implemented for SEA backend" )
588
+ # Convert to Row objects
589
+ return self ._convert_to_row_objects (rows )
590
+ else :
591
+ raise NotImplementedError ("Unsupported queue type" )
577
592
578
593
def fetchmany_arrow (self , size : int ) -> Any :
579
594
"""Fetch the next set of rows as an Arrow table."""
@@ -606,4 +621,3 @@ def fetchall_arrow(self) -> Any:
606
621
return self ._convert_rows_to_arrow_table (rows )
607
622
else :
608
623
raise NotImplementedError ("Unsupported queue type" )
609
-
0 commit comments