Skip to content

Commit 44183db

Browse files
use new backend structure
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 9ef5fad commit 44183db

File tree

1 file changed

+21
-32
lines changed

1 file changed

+21
-32
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,10 @@
55
from typing import Dict, Tuple, List, Optional, Any, Union, TYPE_CHECKING, Set
66

77
from databricks.sql.backend.sea.models.base import ExternalLink
8-
from databricks.sql.backend.sea.models.responses import (
9-
parse_manifest,
10-
parse_result,
11-
parse_status,
12-
)
8+
139
from databricks.sql.backend.sea.utils.constants import (
1410
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP,
11+
MetadataCommands,
1512
ResultFormat,
1613
ResultDisposition,
1714
ResultCompression,
@@ -359,7 +356,7 @@ def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
359356

360357
return link
361358

362-
def _results_message_to_execute_response(self, sea_response, command_id):
359+
def _results_message_to_execute_response(self, response: GetStatementResponse, command_id: CommandId):
363360
"""
364361
Convert a SEA response to an ExecuteResponse and extract result data.
365362
@@ -372,29 +369,24 @@ def _results_message_to_execute_response(self, sea_response, command_id):
372369
result data object, and manifest object
373370
"""
374371

375-
# Parse the response
376-
status = parse_status(sea_response)
377-
manifest_obj = parse_manifest(sea_response)
378-
result_data_obj = parse_result(sea_response)
379-
380372
# Extract description from manifest schema
381-
description = self._extract_description_from_manifest(manifest_obj)
373+
description = self._extract_description_from_manifest(response.manifest)
382374

383375
# Check for compression
384-
lz4_compressed = manifest_obj.result_compression == "LZ4_FRAME"
376+
lz4_compressed = response.manifest.result_compression == ResultCompression.LZ4_FRAME.value
385377

386378
execute_response = ExecuteResponse(
387379
command_id=command_id,
388-
status=status.state,
380+
status=response.status.state,
389381
description=description,
390382
has_been_closed_server_side=False,
391383
lz4_compressed=lz4_compressed,
392384
is_staging_operation=False,
393385
arrow_schema_bytes=None,
394-
result_format=manifest_obj.format,
386+
result_format=response.manifest.format,
395387
)
396388

397-
return execute_response, result_data_obj, manifest_obj
389+
return execute_response
398390

399391
def _check_command_not_in_failed_or_closed_state(
400392
self, state: CommandState, command_id: CommandId
@@ -641,25 +633,22 @@ def get_execution_result(
641633
path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id),
642634
data=request.to_dict(),
643635
)
636+
response = GetStatementResponse.from_dict(response_data)
644637

645638
# Create and return a SeaResultSet
646639
from databricks.sql.result_set import SeaResultSet
647640

648641
# Convert the response to an ExecuteResponse and extract result data
649-
(
650-
execute_response,
651-
result_data,
652-
manifest,
653-
) = self._results_message_to_execute_response(response_data, command_id)
642+
execute_response = self._results_message_to_execute_response(response, command_id)
654643

655644
return SeaResultSet(
656645
connection=cursor.connection,
657646
execute_response=execute_response,
658647
sea_client=self,
659648
buffer_size_bytes=cursor.buffer_size_bytes,
660649
arraysize=cursor.arraysize,
661-
result_data=result_data,
662-
manifest=manifest,
650+
result_data=response.result,
651+
manifest=response.manifest,
663652
)
664653

665654
# == Metadata Operations ==
@@ -673,7 +662,7 @@ def get_catalogs(
673662
) -> "ResultSet":
674663
"""Get available catalogs by executing 'SHOW CATALOGS'."""
675664
result = self.execute_command(
676-
operation="SHOW CATALOGS",
665+
operation=MetadataCommands.SHOW_CATALOGS.value,
677666
session_id=session_id,
678667
max_rows=max_rows,
679668
max_bytes=max_bytes,
@@ -700,7 +689,7 @@ def get_schemas(
700689
if not catalog_name:
701690
raise ValueError("Catalog name is required for get_schemas")
702691

703-
operation = f"SHOW SCHEMAS IN `{catalog_name}`"
692+
operation = MetadataCommands.SHOW_SCHEMAS.value.format(catalog_name)
704693

705694
if schema_name:
706695
operation += f" LIKE '{schema_name}'"
@@ -735,10 +724,10 @@ def get_tables(
735724
if not catalog_name:
736725
raise ValueError("Catalog name is required for get_tables")
737726

738-
operation = "SHOW TABLES IN " + (
739-
"ALL CATALOGS"
727+
operation = MetadataCommands.SHOW_TABLES.value.format(
728+
MetadataCommands.SHOW_TABLES_ALL_CATALOGS.value
740729
if catalog_name in [None, "*", "%"]
741-
else f"CATALOG `{catalog_name}`"
730+
else MetadataCommands.CATALOG_SPECIFIC.value.format(catalog_name)
742731
)
743732

744733
if schema_name:
@@ -783,16 +772,16 @@ def get_columns(
783772
if not catalog_name:
784773
raise ValueError("Catalog name is required for get_columns")
785774

786-
operation = f"SHOW COLUMNS IN CATALOG `{catalog_name}`"
775+
operation = MetadataCommands.SHOW_COLUMNS.value.format(catalog_name)
787776

788777
if schema_name:
789-
operation += f" SCHEMA LIKE '{schema_name}'"
778+
operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name)
790779

791780
if table_name:
792-
operation += f" TABLE LIKE '{table_name}'"
781+
operation += MetadataCommands.TABLE_LIKE_PATTERN.value.format(table_name)
793782

794783
if column_name:
795-
operation += f" LIKE '{column_name}'"
784+
operation += MetadataCommands.LIKE_PATTERN.value.format(column_name)
796785

797786
result = self.execute_command(
798787
operation=operation,

0 commit comments

Comments
 (0)