-
Notifications
You must be signed in to change notification settings - Fork 113
Generalise Backend Layer #604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
varun-edachali-dbx
wants to merge
25
commits into
main
Choose a base branch
from
backend-refactors
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,120
−962
Open
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx 57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx 75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx 450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx a926f02
remove un-necessary line break s
varun-edachali-dbx 55ad001
more un-necessary line breaks
varun-edachali-dbx fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx 019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx 726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx bf6d41c
ensure command_id is not None
varun-edachali-dbx 5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx 63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx 13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx 1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx 94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx c20058e
use lazy logging
varun-edachali-dbx fe3acb1
replace getters with property tag
varun-edachali-dbx 9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx 61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx 64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx 9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,342 @@ | ||
""" | ||
Abstract client interface for interacting with Databricks SQL services. | ||
|
||
Implementations of this class are responsible for: | ||
- Managing connections to Databricks SQL services | ||
- Executing SQL queries and commands | ||
- Retrieving query results | ||
- Fetching metadata about catalogs, schemas, tables, and columns | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from abc import ABC, abstractmethod | ||
from typing import Dict, List, Optional, Any, Union, TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from databricks.sql.client import Cursor | ||
from databricks.sql.result_set import ResultSet | ||
|
||
from databricks.sql.thrift_api.TCLIService import ttypes | ||
from databricks.sql.backend.types import SessionId, CommandId, CommandState | ||
|
||
|
||
class DatabricksClient(ABC): | ||
# == Connection and Session Management == | ||
@abstractmethod | ||
def open_session( | ||
self, | ||
session_configuration: Optional[Dict[str, Any]], | ||
catalog: Optional[str], | ||
schema: Optional[str], | ||
) -> SessionId: | ||
""" | ||
Opens a new session with the Databricks SQL service. | ||
|
||
This method establishes a new session with the server and returns a session | ||
identifier that can be used for subsequent operations. | ||
|
||
Args: | ||
session_configuration: Optional dictionary of configuration parameters for the session | ||
catalog: Optional catalog name to use as the initial catalog for the session | ||
schema: Optional schema name to use as the initial schema for the session | ||
|
||
Returns: | ||
SessionId: A session identifier object that can be used for subsequent operations | ||
|
||
Raises: | ||
Error: If the session configuration is invalid | ||
OperationalError: If there's an error establishing the session | ||
InvalidServerResponseError: If the server response is invalid or unexpected | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def close_session(self, session_id: SessionId) -> None: | ||
""" | ||
Closes an existing session with the Databricks SQL service. | ||
|
||
This method terminates the session identified by the given session ID and | ||
releases any resources associated with it. | ||
|
||
Args: | ||
session_id: The session identifier returned by open_session() | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error closing the session | ||
""" | ||
pass | ||
|
||
# == Query Execution, Command Management == | ||
@abstractmethod | ||
def execute_command( | ||
self, | ||
operation: str, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
lz4_compression: bool, | ||
cursor: Cursor, | ||
use_cloud_fetch: bool, | ||
parameters: List[ttypes.TSparkParameter], | ||
async_op: bool, | ||
enforce_embedded_schema_correctness: bool, | ||
) -> Union[ResultSet, None]: | ||
""" | ||
Executes a SQL command or query within the specified session. | ||
|
||
This method sends a SQL command to the server for execution and handles | ||
the response. It can operate in both synchronous and asynchronous modes. | ||
|
||
Args: | ||
operation: The SQL command or query to execute | ||
session_id: The session identifier in which to execute the command | ||
max_rows: Maximum number of rows to fetch in a single fetch batch | ||
max_bytes: Maximum number of bytes to fetch in a single fetch batch | ||
lz4_compression: Whether to use LZ4 compression for result data | ||
cursor: The cursor object that will handle the results | ||
use_cloud_fetch: Whether to use cloud fetch for retrieving large result sets | ||
parameters: List of parameters to bind to the query | ||
async_op: Whether to execute the command asynchronously | ||
enforce_embedded_schema_correctness: Whether to enforce schema correctness | ||
|
||
Returns: | ||
If async_op is False, returns a ResultSet object containing the | ||
query results and metadata. If async_op is True, returns None and the | ||
results must be fetched later using get_execution_result(). | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error executing the command | ||
ServerOperationError: If the server encounters an error during execution | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def cancel_command(self, command_id: CommandId) -> None: | ||
""" | ||
Cancels a running command or query. | ||
|
||
This method attempts to cancel a command that is currently being executed. | ||
It can be called from a different thread than the one executing the command. | ||
|
||
Args: | ||
command_id: The command identifier to cancel | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
OperationalError: If there's an error canceling the command | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def close_command(self, command_id: CommandId) -> None: | ||
""" | ||
Closes a command and releases associated resources. | ||
|
||
This method informs the server that the client is done with the command | ||
and any resources associated with it can be released. | ||
|
||
Args: | ||
command_id: The command identifier to close | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
OperationalError: If there's an error closing the command | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def get_query_state(self, command_id: CommandId) -> CommandState: | ||
""" | ||
Gets the current state of a query or command. | ||
|
||
This method retrieves the current execution state of a command from the server. | ||
|
||
Args: | ||
command_id: The command identifier to check | ||
|
||
Returns: | ||
CommandState: The current state of the command | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
OperationalError: If there's an error retrieving the state | ||
ServerOperationError: If the command is in an error state | ||
DatabaseError: If the command has been closed unexpectedly | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def get_execution_result( | ||
self, | ||
command_id: CommandId, | ||
cursor: Cursor, | ||
) -> ResultSet: | ||
""" | ||
Retrieves the results of a previously executed command. | ||
|
||
This method fetches the results of a command that was executed asynchronously | ||
or retrieves additional results from a command that has more rows available. | ||
|
||
Args: | ||
command_id: The command identifier for which to retrieve results | ||
cursor: The cursor object that will handle the results | ||
|
||
Returns: | ||
ResultSet: An object containing the query results and metadata | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
OperationalError: If there's an error retrieving the results | ||
""" | ||
pass | ||
|
||
# == Metadata Operations == | ||
@abstractmethod | ||
def get_catalogs( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: Cursor, | ||
) -> ResultSet: | ||
""" | ||
Retrieves a list of available catalogs. | ||
|
||
This method fetches metadata about all catalogs available in the current | ||
session's context. | ||
|
||
Args: | ||
session_id: The session identifier | ||
max_rows: Maximum number of rows to fetch in a single batch | ||
max_bytes: Maximum number of bytes to fetch in a single batch | ||
cursor: The cursor object that will handle the results | ||
|
||
Returns: | ||
ResultSet: An object containing the catalog metadata | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error retrieving the catalogs | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def get_schemas( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
) -> ResultSet: | ||
""" | ||
Retrieves a list of schemas, optionally filtered by catalog and schema name patterns. | ||
|
||
This method fetches metadata about schemas available in the specified catalog | ||
or all catalogs if no catalog is specified. | ||
|
||
Args: | ||
session_id: The session identifier | ||
max_rows: Maximum number of rows to fetch in a single batch | ||
max_bytes: Maximum number of bytes to fetch in a single batch | ||
cursor: The cursor object that will handle the results | ||
catalog_name: Optional catalog name pattern to filter by | ||
schema_name: Optional schema name pattern to filter by | ||
|
||
Returns: | ||
ResultSet: An object containing the schema metadata | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error retrieving the schemas | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def get_tables( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
table_types: Optional[List[str]] = None, | ||
) -> ResultSet: | ||
""" | ||
Retrieves a list of tables, optionally filtered by catalog, schema, table name, and table types. | ||
|
||
This method fetches metadata about tables available in the specified catalog | ||
and schema, or all catalogs and schemas if not specified. | ||
|
||
Args: | ||
session_id: The session identifier | ||
max_rows: Maximum number of rows to fetch in a single batch | ||
max_bytes: Maximum number of bytes to fetch in a single batch | ||
cursor: The cursor object that will handle the results | ||
catalog_name: Optional catalog name pattern to filter by | ||
schema_name: Optional schema name pattern to filter by | ||
table_name: Optional table name pattern to filter by | ||
table_types: Optional list of table types to filter by (e.g., ['TABLE', 'VIEW']) | ||
|
||
Returns: | ||
ResultSet: An object containing the table metadata | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error retrieving the tables | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def get_columns( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
column_name: Optional[str] = None, | ||
) -> ResultSet: | ||
""" | ||
Retrieves a list of columns, optionally filtered by catalog, schema, table, and column name patterns. | ||
|
||
This method fetches metadata about columns available in the specified table, | ||
or all tables if not specified. | ||
|
||
Args: | ||
session_id: The session identifier | ||
max_rows: Maximum number of rows to fetch in a single batch | ||
max_bytes: Maximum number of bytes to fetch in a single batch | ||
cursor: The cursor object that will handle the results | ||
catalog_name: Optional catalog name pattern to filter by | ||
schema_name: Optional schema name pattern to filter by | ||
table_name: Optional table name pattern to filter by | ||
column_name: Optional column name pattern to filter by | ||
|
||
Returns: | ||
ResultSet: An object containing the column metadata | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
OperationalError: If there's an error retrieving the columns | ||
""" | ||
pass | ||
|
||
@property | ||
@abstractmethod | ||
def max_download_threads(self) -> int: | ||
""" | ||
Gets the maximum number of download threads for cloud fetch operations. | ||
|
||
Returns: | ||
int: The maximum number of download threads | ||
""" | ||
pass |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.