-
Notifications
You must be signed in to change notification settings - Fork 113
SeaDatabricksClient: Add Metadata Commands #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 116 commits
138c2ae
3e3ab94
4a78165
0dac4aa
1b794c7
da5a6fe
686ade4
31e6c83
69ea238
66d7517
71feef9
ae9862f
d8aa69e
db139bc
b977b12
da615c0
0da04a6
ea9d456
8985c62
d9bcdbe
ee9fa1c
24c6152
67fd101
271fcaf
bf26ea3
ed7cf91
dae15e3
db5bbea
d5d3699
6137a3d
75b0773
4494dcd
4d0aeca
7cece5e
8977c06
0216d7a
4cb15fd
dee47f7
e385d5b
484064e
030edf8
30f8266
033ae73
33821f4
3e22c6c
787f1f7
165c4f3
a6e40d0
52e3088
641c09b
8bd12d8
ffded6e
227f6b3
68657a3
3940eec
37813ba
267c9f4
2967119
47fd60d
982fdf2
9e14d48
be1997e
e8e8ee7
05ee4e7
3ffa898
2952d8d
89e2aa0
cbace3f
c075b07
c62f76d
199402e
8ac574b
398ca70
b1acc5b
ef2a7ee
699942d
af8f74e
5540c5c
efe3881
36ab59b
1d57c99
df6dac2
ad0e527
ed446a0
38e4b5c
94879c0
1809956
da5260c
0385ffb
349c021
6229848
fd52356
64e58b0
0a2cdfd
90bb09c
cd22389
82e0f8b
e64b81b
5ab9bbe
1ab6e87
f469c24
68ec65f
ffd478e
f6d873d
28675f5
3578659
8713023
22dc252
390f592
35f1ef0
a515d26
59b1330
293e356
dd40beb
14057ac
a4d5bdb
e9b1314
8ede414
09a1b11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
ResultDisposition, | ||
ResultCompression, | ||
WaitTimeout, | ||
MetadataCommands, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -627,9 +628,22 @@ def get_catalogs( | |
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_catalogs is not yet implemented for SEA backend") | ||
) -> "ResultSet": | ||
"""Get available catalogs by executing 'SHOW CATALOGS'.""" | ||
result = self.execute_command( | ||
operation=MetadataCommands.SHOW_CATALOGS.value, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a thrift-specific param? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but it is a param passed to |
||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result | ||
|
||
def get_schemas( | ||
self, | ||
|
@@ -639,9 +653,30 @@ def get_schemas( | |
cursor: "Cursor", | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_schemas is not yet implemented for SEA backend") | ||
) -> "ResultSet": | ||
"""Get schemas by executing 'SHOW SCHEMAS IN catalog [LIKE pattern]'.""" | ||
if not catalog_name: | ||
raise ValueError("Catalog name is required for get_schemas") | ||
|
||
operation = MetadataCommands.SHOW_SCHEMAS.value.format(catalog_name) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(schema_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result | ||
|
||
def get_tables( | ||
self, | ||
|
@@ -653,9 +688,48 @@ def get_tables( | |
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
table_types: Optional[List[str]] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_tables is not yet implemented for SEA backend") | ||
) -> "ResultSet": | ||
"""Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'.""" | ||
operation = ( | ||
MetadataCommands.SHOW_TABLES_ALL_CATALOGS.value | ||
if catalog_name in [None, "*", "%"] | ||
else MetadataCommands.SHOW_TABLES.value.format( | ||
MetadataCommands.CATALOG_SPECIFIC.value.format(catalog_name) | ||
) | ||
) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
|
||
if table_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(table_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
|
||
from databricks.sql.result_set import SeaResultSet | ||
|
||
assert isinstance( | ||
result, SeaResultSet | ||
), "execute_command returned a non-SeaResultSet" | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Apply client-side filtering by table_types | ||
from databricks.sql.backend.sea.utils.filters import ResultSetFilter | ||
|
||
result = ResultSetFilter.filter_tables_by_type(result, table_types) | ||
|
||
return result | ||
|
||
def get_columns( | ||
self, | ||
|
@@ -667,6 +741,33 @@ def get_columns( | |
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
column_name: Optional[str] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_columns is not yet implemented for SEA backend") | ||
) -> "ResultSet": | ||
"""Get columns by executing 'SHOW COLUMNS IN CATALOG catalog [SCHEMA LIKE pattern] [TABLE LIKE pattern] [LIKE pattern]'.""" | ||
if not catalog_name: | ||
raise ValueError("Catalog name is required for get_columns") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so for the caller (client) code, it will appear as ValueError. is it okay? should we throw something else as per spec? how does the other backend throws error in first-class APIs like these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the catch, I changed it to raise |
||
|
||
operation = MetadataCommands.SHOW_COLUMNS.value.format(catalog_name) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
|
||
if table_name: | ||
operation += MetadataCommands.TABLE_LIKE_PATTERN.value.format(table_name) | ||
|
||
if column_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(column_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
""" | ||
Client-side filtering utilities for Databricks SQL connector. | ||
|
||
This module provides filtering capabilities for result sets returned by different backends. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import ( | ||
List, | ||
Optional, | ||
Any, | ||
Callable, | ||
cast, | ||
TYPE_CHECKING, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
from databricks.sql.result_set import SeaResultSet | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
from databricks.sql.backend.types import ExecuteResponse | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ResultSetFilter: | ||
""" | ||
A general-purpose filter for result sets. | ||
""" | ||
|
||
@staticmethod | ||
def _filter_sea_result_set( | ||
result_set: SeaResultSet, filter_func: Callable[[List[Any]], bool] | ||
) -> SeaResultSet: | ||
""" | ||
Filter a SEA result set using the provided filter function. | ||
|
||
Args: | ||
result_set: The SEA result set to filter | ||
filter_func: Function that takes a row and returns True if the row should be included | ||
|
||
Returns: | ||
A filtered SEA result set | ||
""" | ||
|
||
# Get all remaining rows | ||
all_rows = result_set.results.remaining_rows() | ||
|
||
# Filter rows | ||
filtered_rows = [row for row in all_rows if filter_func(row)] | ||
|
||
# Reuse the command_id from the original result set | ||
command_id = result_set.command_id | ||
|
||
# Create an ExecuteResponse with the filtered data | ||
execute_response = ExecuteResponse( | ||
command_id=command_id, | ||
status=result_set.status, | ||
description=result_set.description, | ||
has_been_closed_server_side=result_set.has_been_closed_server_side, | ||
lz4_compressed=result_set.lz4_compressed, | ||
arrow_schema_bytes=result_set._arrow_schema_bytes, | ||
is_staging_operation=False, | ||
) | ||
|
||
# Create a new ResultData object with filtered data | ||
from databricks.sql.backend.sea.models.base import ResultData | ||
|
||
result_data = ResultData(data=filtered_rows, external_links=None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so when you manually set the result data in result set, what will happen in the fetch phase? will it use the execute-response to re-fetch the data? are there any other examples in the codebase, where we manually set the result data or is this the first instance? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we construct the We also set it during the instantiation of the |
||
|
||
from databricks.sql.backend.sea.backend import SeaDatabricksClient | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
# Create a new SeaResultSet with the filtered data | ||
filtered_result_set = SeaResultSet( | ||
connection=result_set.connection, | ||
execute_response=execute_response, | ||
sea_client=cast(SeaDatabricksClient, result_set.backend), | ||
buffer_size_bytes=result_set.buffer_size_bytes, | ||
arraysize=result_set.arraysize, | ||
result_data=result_data, | ||
) | ||
|
||
return filtered_result_set | ||
|
||
@staticmethod | ||
def filter_by_column_values( | ||
result_set: SeaResultSet, | ||
column_index: int, | ||
allowed_values: List[str], | ||
case_sensitive: bool = False, | ||
) -> SeaResultSet: | ||
""" | ||
Filter a result set by values in a specific column. | ||
|
||
Args: | ||
result_set: The result set to filter | ||
column_index: The index of the column to filter on | ||
allowed_values: List of allowed values for the column | ||
case_sensitive: Whether to perform case-sensitive comparison | ||
|
||
Returns: | ||
A filtered result set | ||
""" | ||
|
||
# Convert to uppercase for case-insensitive comparison if needed | ||
if not case_sensitive: | ||
allowed_values = [v.upper() for v in allowed_values] | ||
|
||
return ResultSetFilter._filter_sea_result_set( | ||
result_set, | ||
lambda row: ( | ||
len(row) > column_index | ||
and isinstance(row[column_index], str) | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
and ( | ||
row[column_index].upper() | ||
if not case_sensitive | ||
else row[column_index] | ||
) | ||
in allowed_values | ||
), | ||
) | ||
|
||
@staticmethod | ||
def filter_tables_by_type( | ||
result_set: SeaResultSet, table_types: Optional[List[str]] = None | ||
) -> SeaResultSet: | ||
""" | ||
Filter a result set of tables by the specified table types. | ||
|
||
This is a client-side filter that processes the result set after it has been | ||
retrieved from the server. It filters out tables whose type does not match | ||
any of the types in the table_types list. | ||
|
||
Args: | ||
result_set: The original result set containing tables | ||
table_types: List of table types to include (e.g., ["TABLE", "VIEW"]) | ||
|
||
Returns: | ||
A filtered result set containing only tables of the specified types | ||
""" | ||
|
||
# Default table types if none specified | ||
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"] | ||
valid_types = ( | ||
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES | ||
) | ||
|
||
# Table type is the 6th column (index 5) | ||
return ResultSetFilter.filter_by_column_values( | ||
result_set, 5, valid_types, case_sensitive=True | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not using compression for metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a side effect of setting
use_cloud_fetch=False
: compression is not supported forINLINE
+JSON
in SEA.