1
+ from __future__ import annotations
2
+
1
3
import logging
2
4
import time
3
5
import re
10
12
ResultDisposition ,
11
13
ResultCompression ,
12
14
WaitTimeout ,
15
+ MetadataCommands ,
13
16
)
14
17
15
18
if TYPE_CHECKING :
16
19
from databricks .sql .client import Cursor
17
- from databricks .sql .result_set import ResultSet
20
+ from databricks .sql .result_set import SeaResultSet
18
21
19
22
from databricks .sql .backend .databricks_client import DatabricksClient
20
23
from databricks .sql .backend .types import (
24
27
BackendType ,
25
28
ExecuteResponse ,
26
29
)
27
- from databricks .sql .exc import DatabaseError , ServerOperationError
30
+ from databricks .sql .exc import DatabaseError , ProgrammingError , ServerOperationError
28
31
from databricks .sql .backend .sea .utils .http_client import SeaHttpClient
29
32
from databricks .sql .types import SSLOptions
30
33
@@ -169,7 +172,7 @@ def _extract_warehouse_id(self, http_path: str) -> str:
169
172
f"Note: SEA only works for warehouses."
170
173
)
171
174
logger .error (error_message )
172
- raise ValueError (error_message )
175
+ raise ProgrammingError (error_message )
173
176
174
177
@property
175
178
def max_download_threads (self ) -> int :
@@ -241,14 +244,14 @@ def close_session(self, session_id: SessionId) -> None:
241
244
session_id: The session identifier returned by open_session()
242
245
243
246
Raises:
244
- ValueError : If the session ID is invalid
247
+ ProgrammingError : If the session ID is invalid
245
248
OperationalError: If there's an error closing the session
246
249
"""
247
250
248
251
logger .debug ("SeaDatabricksClient.close_session(session_id=%s)" , session_id )
249
252
250
253
if session_id .backend_type != BackendType .SEA :
251
- raise ValueError ("Not a valid SEA session ID" )
254
+ raise ProgrammingError ("Not a valid SEA session ID" )
252
255
sea_session_id = session_id .to_sea_session_id ()
253
256
254
257
request_data = DeleteSessionRequest (
@@ -400,12 +403,12 @@ def execute_command(
400
403
max_rows : int ,
401
404
max_bytes : int ,
402
405
lz4_compression : bool ,
403
- cursor : " Cursor" ,
406
+ cursor : Cursor ,
404
407
use_cloud_fetch : bool ,
405
408
parameters : List [Dict [str , Any ]],
406
409
async_op : bool ,
407
410
enforce_embedded_schema_correctness : bool ,
408
- ) -> Union ["ResultSet" , None ]:
411
+ ) -> Union [SeaResultSet , None ]:
409
412
"""
410
413
Execute a SQL command using the SEA backend.
411
414
@@ -426,7 +429,7 @@ def execute_command(
426
429
"""
427
430
428
431
if session_id .backend_type != BackendType .SEA :
429
- raise ValueError ("Not a valid SEA session ID" )
432
+ raise ProgrammingError ("Not a valid SEA session ID" )
430
433
431
434
sea_session_id = session_id .to_sea_session_id ()
432
435
@@ -501,11 +504,11 @@ def cancel_command(self, command_id: CommandId) -> None:
501
504
command_id: Command identifier to cancel
502
505
503
506
Raises:
504
- ValueError : If the command ID is invalid
507
+ ProgrammingError : If the command ID is invalid
505
508
"""
506
509
507
510
if command_id .backend_type != BackendType .SEA :
508
- raise ValueError ("Not a valid SEA command ID" )
511
+ raise ProgrammingError ("Not a valid SEA command ID" )
509
512
510
513
sea_statement_id = command_id .to_sea_statement_id ()
511
514
@@ -524,11 +527,11 @@ def close_command(self, command_id: CommandId) -> None:
524
527
command_id: Command identifier to close
525
528
526
529
Raises:
527
- ValueError : If the command ID is invalid
530
+ ProgrammingError : If the command ID is invalid
528
531
"""
529
532
530
533
if command_id .backend_type != BackendType .SEA :
531
- raise ValueError ("Not a valid SEA command ID" )
534
+ raise ProgrammingError ("Not a valid SEA command ID" )
532
535
533
536
sea_statement_id = command_id .to_sea_statement_id ()
534
537
@@ -550,7 +553,7 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
550
553
CommandState: The current state of the command
551
554
552
555
Raises:
553
- ValueError : If the command ID is invalid
556
+ ProgrammingError : If the command ID is invalid
554
557
"""
555
558
556
559
if command_id .backend_type != BackendType .SEA :
@@ -572,8 +575,8 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
572
575
def get_execution_result (
573
576
self ,
574
577
command_id : CommandId ,
575
- cursor : " Cursor" ,
576
- ) -> "ResultSet" :
578
+ cursor : Cursor ,
579
+ ) -> SeaResultSet :
577
580
"""
578
581
Get the result of a command execution.
579
582
@@ -582,14 +585,14 @@ def get_execution_result(
582
585
cursor: Cursor executing the command
583
586
584
587
Returns:
585
- ResultSet : A SeaResultSet instance with the execution results
588
+ SeaResultSet : A SeaResultSet instance with the execution results
586
589
587
590
Raises:
588
591
ValueError: If the command ID is invalid
589
592
"""
590
593
591
594
if command_id .backend_type != BackendType .SEA :
592
- raise ValueError ("Not a valid SEA command ID" )
595
+ raise ProgrammingError ("Not a valid SEA command ID" )
593
596
594
597
sea_statement_id = command_id .to_sea_statement_id ()
595
598
@@ -626,47 +629,141 @@ def get_catalogs(
626
629
session_id : SessionId ,
627
630
max_rows : int ,
628
631
max_bytes : int ,
629
- cursor : "Cursor" ,
630
- ):
631
- """Not implemented yet."""
632
- raise NotImplementedError ("get_catalogs is not yet implemented for SEA backend" )
632
+ cursor : Cursor ,
633
+ ) -> SeaResultSet :
634
+ """Get available catalogs by executing 'SHOW CATALOGS'."""
635
+ result = self .execute_command (
636
+ operation = MetadataCommands .SHOW_CATALOGS .value ,
637
+ session_id = session_id ,
638
+ max_rows = max_rows ,
639
+ max_bytes = max_bytes ,
640
+ lz4_compression = False ,
641
+ cursor = cursor ,
642
+ use_cloud_fetch = False ,
643
+ parameters = [],
644
+ async_op = False ,
645
+ enforce_embedded_schema_correctness = False ,
646
+ )
647
+ assert result is not None , "execute_command returned None in synchronous mode"
648
+ return result
633
649
634
650
def get_schemas (
635
651
self ,
636
652
session_id : SessionId ,
637
653
max_rows : int ,
638
654
max_bytes : int ,
639
- cursor : " Cursor" ,
655
+ cursor : Cursor ,
640
656
catalog_name : Optional [str ] = None ,
641
657
schema_name : Optional [str ] = None ,
642
- ):
643
- """Not implemented yet."""
644
- raise NotImplementedError ("get_schemas is not yet implemented for SEA backend" )
658
+ ) -> SeaResultSet :
659
+ """Get schemas by executing 'SHOW SCHEMAS IN catalog [LIKE pattern]'."""
660
+ if not catalog_name :
661
+ raise DatabaseError ("Catalog name is required for get_schemas" )
662
+
663
+ operation = MetadataCommands .SHOW_SCHEMAS .value .format (catalog_name )
664
+
665
+ if schema_name :
666
+ operation += MetadataCommands .LIKE_PATTERN .value .format (schema_name )
667
+
668
+ result = self .execute_command (
669
+ operation = operation ,
670
+ session_id = session_id ,
671
+ max_rows = max_rows ,
672
+ max_bytes = max_bytes ,
673
+ lz4_compression = False ,
674
+ cursor = cursor ,
675
+ use_cloud_fetch = False ,
676
+ parameters = [],
677
+ async_op = False ,
678
+ enforce_embedded_schema_correctness = False ,
679
+ )
680
+ assert result is not None , "execute_command returned None in synchronous mode"
681
+ return result
645
682
646
683
def get_tables (
647
684
self ,
648
685
session_id : SessionId ,
649
686
max_rows : int ,
650
687
max_bytes : int ,
651
- cursor : " Cursor" ,
688
+ cursor : Cursor ,
652
689
catalog_name : Optional [str ] = None ,
653
690
schema_name : Optional [str ] = None ,
654
691
table_name : Optional [str ] = None ,
655
692
table_types : Optional [List [str ]] = None ,
656
- ):
657
- """Not implemented yet."""
658
- raise NotImplementedError ("get_tables is not yet implemented for SEA backend" )
693
+ ) -> SeaResultSet :
694
+ """Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'."""
695
+ operation = (
696
+ MetadataCommands .SHOW_TABLES_ALL_CATALOGS .value
697
+ if catalog_name in [None , "*" , "%" ]
698
+ else MetadataCommands .SHOW_TABLES .value .format (
699
+ MetadataCommands .CATALOG_SPECIFIC .value .format (catalog_name )
700
+ )
701
+ )
702
+
703
+ if schema_name :
704
+ operation += MetadataCommands .SCHEMA_LIKE_PATTERN .value .format (schema_name )
705
+
706
+ if table_name :
707
+ operation += MetadataCommands .LIKE_PATTERN .value .format (table_name )
708
+
709
+ result = self .execute_command (
710
+ operation = operation ,
711
+ session_id = session_id ,
712
+ max_rows = max_rows ,
713
+ max_bytes = max_bytes ,
714
+ lz4_compression = False ,
715
+ cursor = cursor ,
716
+ use_cloud_fetch = False ,
717
+ parameters = [],
718
+ async_op = False ,
719
+ enforce_embedded_schema_correctness = False ,
720
+ )
721
+ assert result is not None , "execute_command returned None in synchronous mode"
722
+
723
+ # Apply client-side filtering by table_types
724
+ from databricks .sql .backend .sea .utils .filters import ResultSetFilter
725
+
726
+ result = ResultSetFilter .filter_tables_by_type (result , table_types )
727
+
728
+ return result
659
729
660
730
def get_columns (
661
731
self ,
662
732
session_id : SessionId ,
663
733
max_rows : int ,
664
734
max_bytes : int ,
665
- cursor : " Cursor" ,
735
+ cursor : Cursor ,
666
736
catalog_name : Optional [str ] = None ,
667
737
schema_name : Optional [str ] = None ,
668
738
table_name : Optional [str ] = None ,
669
739
column_name : Optional [str ] = None ,
670
- ):
671
- """Not implemented yet."""
672
- raise NotImplementedError ("get_columns is not yet implemented for SEA backend" )
740
+ ) -> SeaResultSet :
741
+ """Get columns by executing 'SHOW COLUMNS IN CATALOG catalog [SCHEMA LIKE pattern] [TABLE LIKE pattern] [LIKE pattern]'."""
742
+ if not catalog_name :
743
+ raise DatabaseError ("Catalog name is required for get_columns" )
744
+
745
+ operation = MetadataCommands .SHOW_COLUMNS .value .format (catalog_name )
746
+
747
+ if schema_name :
748
+ operation += MetadataCommands .SCHEMA_LIKE_PATTERN .value .format (schema_name )
749
+
750
+ if table_name :
751
+ operation += MetadataCommands .TABLE_LIKE_PATTERN .value .format (table_name )
752
+
753
+ if column_name :
754
+ operation += MetadataCommands .LIKE_PATTERN .value .format (column_name )
755
+
756
+ result = self .execute_command (
757
+ operation = operation ,
758
+ session_id = session_id ,
759
+ max_rows = max_rows ,
760
+ max_bytes = max_bytes ,
761
+ lz4_compression = False ,
762
+ cursor = cursor ,
763
+ use_cloud_fetch = False ,
764
+ parameters = [],
765
+ async_op = False ,
766
+ enforce_embedded_schema_correctness = False ,
767
+ )
768
+ assert result is not None , "execute_command returned None in synchronous mode"
769
+ return result
0 commit comments