Skip to content

Commit 3944e39

Browse files
Merge branch 'fetch-json-inline' into ext-links-sea
2 parents b99d0c4 + bb015e6 commit 3944e39

15 files changed

+857
-688
lines changed

examples/experimental/sea_connector_test.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
44
This script runs all the individual test modules and displays
55
a summary of test results with visual indicators.
6+
7+
In order to run the script, the following environment variables need to be set:
8+
- DATABRICKS_SERVER_HOSTNAME: The hostname of the Databricks server
9+
- DATABRICKS_HTTP_PATH: The HTTP path of the Databricks server
10+
- DATABRICKS_TOKEN: The token to use for authentication
611
"""
12+
713
import os
814
import sys
915
import logging

examples/experimental/tests/test_sea_async_query.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def test_sea_async_query_with_cloud_fetch():
1717
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.
1818
1919
This function connects to a Databricks SQL endpoint using the SEA backend,
20-
executes a query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
20+
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
2121
"""
2222
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
2323
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
@@ -82,9 +82,6 @@ def test_sea_async_query_with_cloud_fetch():
8282
results.extend(cursor.fetchmany(10))
8383
results.extend(cursor.fetchall())
8484
actual_row_count = len(results)
85-
logger.info(
86-
f"{actual_row_count} rows retrieved against {requested_row_count} requested"
87-
)
8885

8986
logger.info(
9087
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
@@ -123,7 +120,7 @@ def test_sea_async_query_without_cloud_fetch():
123120
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.
124121
125122
This function connects to a Databricks SQL endpoint using the SEA backend,
126-
executes a query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
123+
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
127124
"""
128125
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
129126
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
@@ -188,9 +185,6 @@ def test_sea_async_query_without_cloud_fetch():
188185
results.extend(cursor.fetchmany(10))
189186
results.extend(cursor.fetchall())
190187
actual_row_count = len(results)
191-
logger.info(
192-
f"{actual_row_count} rows retrieved against {requested_row_count} requested"
193-
)
194188

195189
logger.info(
196190
f"Requested {requested_row_count} rows, received {actual_row_count} rows"

examples/experimental/tests/test_sea_sync_query.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
from databricks.sql.client import Connection
88

9-
logging.basicConfig(level=logging.DEBUG)
9+
logging.basicConfig(level=logging.INFO)
1010
logger = logging.getLogger(__name__)
1111

1212

@@ -15,7 +15,7 @@ def test_sea_sync_query_with_cloud_fetch():
1515
Test executing a query synchronously using the SEA backend with cloud fetch enabled.
1616
1717
This function connects to a Databricks SQL endpoint using the SEA backend,
18-
executes a query with cloud fetch enabled, and verifies that execution completes successfully.
18+
executes a simple query with cloud fetch enabled, and verifies that execution completes successfully.
1919
"""
2020
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
2121
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
@@ -93,7 +93,7 @@ def test_sea_sync_query_without_cloud_fetch():
9393
Test executing a query synchronously using the SEA backend with cloud fetch disabled.
9494
9595
This function connects to a Databricks SQL endpoint using the SEA backend,
96-
executes a query with cloud fetch disabled, and verifies that execution completes successfully.
96+
executes a simple query with cloud fetch disabled, and verifies that execution completes successfully.
9797
"""
9898
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
9999
http_path = os.environ.get("DATABRICKS_HTTP_PATH")

src/databricks/sql/backend/sea/backend.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
MetadataCommands,
1616
)
1717

18-
from databricks.sql.thrift_api.TCLIService import ttypes
19-
2018
if TYPE_CHECKING:
2119
from databricks.sql.client import Cursor
2220
from databricks.sql.result_set import SeaResultSet
@@ -409,7 +407,7 @@ def execute_command(
409407
lz4_compression: bool,
410408
cursor: Cursor,
411409
use_cloud_fetch: bool,
412-
parameters: List[ttypes.TSparkParameter],
410+
parameters: List[Dict[str, Any]],
413411
async_op: bool,
414412
enforce_embedded_schema_correctness: bool,
415413
) -> Union[SeaResultSet, None]:
@@ -443,9 +441,9 @@ def execute_command(
443441
for param in parameters:
444442
sea_parameters.append(
445443
StatementParameter(
446-
name=param.name,
447-
value=param.value.stringValue,
448-
type=param.type,
444+
name=param["name"],
445+
value=param["value"],
446+
type=param["type"] if "type" in param else None,
449447
)
450448
)
451449

@@ -620,10 +618,10 @@ def get_execution_result(
620618
connection=cursor.connection,
621619
execute_response=execute_response,
622620
sea_client=self,
623-
buffer_size_bytes=cursor.buffer_size_bytes,
624-
arraysize=cursor.arraysize,
625621
result_data=response.result,
626622
manifest=response.manifest,
623+
buffer_size_bytes=cursor.buffer_size_bytes,
624+
arraysize=cursor.arraysize,
627625
)
628626

629627
def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""
2+
Type conversion utilities for the Databricks SQL Connector.
3+
4+
This module provides functionality to convert string values from SEA Inline results
5+
to appropriate Python types based on column metadata.
6+
"""
7+
8+
import datetime
9+
import decimal
10+
import logging
11+
from dateutil import parser
12+
from typing import Any, Callable, Dict, Optional, Union
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class SqlType:
18+
"""
19+
SQL type constants
20+
21+
The list of types can be found in the SEA REST API Reference:
22+
https://docs.databricks.com/api/workspace/statementexecution/executestatement
23+
"""
24+
25+
# Numeric types
26+
BYTE = "byte"
27+
SHORT = "short"
28+
INT = "int"
29+
LONG = "long"
30+
FLOAT = "float"
31+
DOUBLE = "double"
32+
DECIMAL = "decimal"
33+
34+
# Boolean type
35+
BOOLEAN = "boolean"
36+
37+
# Date/Time types
38+
DATE = "date"
39+
TIMESTAMP = "timestamp"
40+
INTERVAL = "interval"
41+
42+
# String types
43+
CHAR = "char"
44+
STRING = "string"
45+
46+
# Binary type
47+
BINARY = "binary"
48+
49+
# Complex types
50+
ARRAY = "array"
51+
MAP = "map"
52+
STRUCT = "struct"
53+
54+
# Other types
55+
NULL = "null"
56+
USER_DEFINED_TYPE = "user_defined_type"
57+
58+
59+
class SqlTypeConverter:
60+
"""
61+
Utility class for converting SQL types to Python types.
62+
Based on the types supported by the Databricks SDK.
63+
"""
64+
65+
# SQL type to conversion function mapping
66+
# TODO: complex types
67+
TYPE_MAPPING: Dict[str, Callable] = {
68+
# Numeric types
69+
SqlType.BYTE: lambda v: int(v),
70+
SqlType.SHORT: lambda v: int(v),
71+
SqlType.INT: lambda v: int(v),
72+
SqlType.LONG: lambda v: int(v),
73+
SqlType.FLOAT: lambda v: float(v),
74+
SqlType.DOUBLE: lambda v: float(v),
75+
SqlType.DECIMAL: lambda v, p=None, s=None: (
76+
decimal.Decimal(v).quantize(
77+
decimal.Decimal(f'0.{"0" * s}'), context=decimal.Context(prec=p)
78+
)
79+
if p is not None and s is not None
80+
else decimal.Decimal(v)
81+
),
82+
# Boolean type
83+
SqlType.BOOLEAN: lambda v: v.lower() in ("true", "t", "1", "yes", "y"),
84+
# Date/Time types
85+
SqlType.DATE: lambda v: datetime.date.fromisoformat(v),
86+
SqlType.TIMESTAMP: lambda v: parser.parse(v),
87+
SqlType.INTERVAL: lambda v: v, # Keep as string for now
88+
# String types - no conversion needed
89+
SqlType.CHAR: lambda v: v,
90+
SqlType.STRING: lambda v: v,
91+
# Binary type
92+
SqlType.BINARY: lambda v: bytes.fromhex(v),
93+
# Other types
94+
SqlType.NULL: lambda v: None,
95+
# Complex types and user-defined types return as-is
96+
SqlType.USER_DEFINED_TYPE: lambda v: v,
97+
}
98+
99+
@staticmethod
100+
def convert_value(
101+
value: Any,
102+
sql_type: str,
103+
precision: Optional[int] = None,
104+
scale: Optional[int] = None,
105+
) -> Any:
106+
"""
107+
Convert a string value to the appropriate Python type based on SQL type.
108+
109+
Args:
110+
value: The string value to convert
111+
sql_type: The SQL type (e.g., 'int', 'decimal')
112+
precision: Optional precision for decimal types
113+
scale: Optional scale for decimal types
114+
115+
Returns:
116+
The converted value in the appropriate Python type
117+
"""
118+
119+
if value is None:
120+
return None
121+
122+
sql_type = sql_type.lower().strip()
123+
124+
if sql_type not in SqlTypeConverter.TYPE_MAPPING:
125+
return value
126+
127+
converter_func = SqlTypeConverter.TYPE_MAPPING[sql_type]
128+
try:
129+
if sql_type == SqlType.DECIMAL:
130+
return converter_func(value, precision, scale)
131+
else:
132+
return converter_func(value)
133+
except (ValueError, TypeError, decimal.InvalidOperation) as e:
134+
logger.warning(f"Error converting value '{value}' to {sql_type}: {e}")
135+
return value

src/databricks/sql/backend/sea/utils/filters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ def _filter_sea_result_set(
7777
connection=result_set.connection,
7878
execute_response=execute_response,
7979
sea_client=cast(SeaDatabricksClient, result_set.backend),
80+
result_data=result_data,
8081
buffer_size_bytes=result_set.buffer_size_bytes,
8182
arraysize=result_set.arraysize,
82-
result_data=result_data,
8383
)
8484

8585
return filtered_result_set

0 commit comments

Comments
 (0)