-
Notifications
You must be signed in to change notification settings - Fork 113
Introduce manual SEA test scripts for Exec Phase #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+632
−56
Merged
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
138c2ae
[squash from exec-sea] bring over execution phase changes
varun-edachali-dbx 3e3ab94
remove excess test
varun-edachali-dbx 4a78165
add docstring
varun-edachali-dbx 0dac4aa
remvoe exec func in sea backend
varun-edachali-dbx 1b794c7
remove excess files
varun-edachali-dbx da5a6fe
remove excess models
varun-edachali-dbx 686ade4
remove excess sea backend tests
varun-edachali-dbx 31e6c83
cleanup
varun-edachali-dbx 69ea238
re-introduce get_schema_desc
varun-edachali-dbx 66d7517
remove SeaResultSet
varun-edachali-dbx 71feef9
clean imports and attributes
varun-edachali-dbx ae9862f
pass CommandId to ExecResp
varun-edachali-dbx d8aa69e
remove changes in types
varun-edachali-dbx db139bc
add back essential types (ExecResponse, from_sea_state)
varun-edachali-dbx b977b12
fix fetch types
varun-edachali-dbx da615c0
excess imports
varun-edachali-dbx 0da04a6
reduce diff by maintaining logs
varun-edachali-dbx ea9d456
fix int test types
varun-edachali-dbx 8985c62
[squashed from exec-sea] init execution func
varun-edachali-dbx d9bcdbe
remove irrelevant changes
varun-edachali-dbx ee9fa1c
remove ResultSetFilter functionality
varun-edachali-dbx 24c6152
remove more irrelevant changes
varun-edachali-dbx 67fd101
remove more irrelevant changes
varun-edachali-dbx 271fcaf
even more irrelevant changes
varun-edachali-dbx bf26ea3
remove sea response as init option
varun-edachali-dbx ed7cf91
exec test example scripts
varun-edachali-dbx dae15e3
formatting (black)
varun-edachali-dbx 3e22c6c
change to valid table name
varun-edachali-dbx 787f1f7
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx 165c4f3
remove un-necessary changes
varun-edachali-dbx a6e40d0
simplify test module
varun-edachali-dbx 52e3088
logging -> debug level
varun-edachali-dbx 641c09b
change table name in log
varun-edachali-dbx 8509a54
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx 30f500b
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx 697c047
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx aa7b542
add basic documentation on env vars to be set
varun-edachali-dbx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,115 @@ | ||
""" | ||
Main script to run all SEA connector tests. | ||
|
||
This script runs all the individual test modules and displays | ||
a summary of test results with visual indicators. | ||
""" | ||
import os | ||
import sys | ||
import logging | ||
from databricks.sql.client import Connection | ||
import subprocess | ||
from typing import List, Tuple | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(__name__) | ||
|
||
def test_sea_session(): | ||
""" | ||
Test opening and closing a SEA session using the connector. | ||
|
||
This function connects to a Databricks SQL endpoint using the SEA backend, | ||
opens a session, and then closes it. | ||
|
||
Required environment variables: | ||
- DATABRICKS_SERVER_HOSTNAME: Databricks server hostname | ||
- DATABRICKS_HTTP_PATH: HTTP path for the SQL endpoint | ||
- DATABRICKS_TOKEN: Personal access token for authentication | ||
""" | ||
|
||
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME") | ||
http_path = os.environ.get("DATABRICKS_HTTP_PATH") | ||
access_token = os.environ.get("DATABRICKS_TOKEN") | ||
catalog = os.environ.get("DATABRICKS_CATALOG") | ||
|
||
if not all([server_hostname, http_path, access_token]): | ||
logger.error("Missing required environment variables.") | ||
logger.error("Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.") | ||
sys.exit(1) | ||
|
||
logger.info(f"Connecting to {server_hostname}") | ||
logger.info(f"HTTP Path: {http_path}") | ||
if catalog: | ||
logger.info(f"Using catalog: {catalog}") | ||
|
||
try: | ||
logger.info("Creating connection with SEA backend...") | ||
connection = Connection( | ||
server_hostname=server_hostname, | ||
http_path=http_path, | ||
access_token=access_token, | ||
catalog=catalog, | ||
schema="default", | ||
use_sea=True, | ||
user_agent_entry="SEA-Test-Client" # add custom user agent | ||
TEST_MODULES = [ | ||
"test_sea_session", | ||
"test_sea_sync_query", | ||
"test_sea_async_query", | ||
"test_sea_metadata", | ||
] | ||
|
||
|
||
def run_test_module(module_name: str) -> bool: | ||
"""Run a test module and return success status.""" | ||
module_path = os.path.join( | ||
os.path.dirname(os.path.abspath(__file__)), "tests", f"{module_name}.py" | ||
) | ||
|
||
# Simply run the module as a script - each module handles its own test execution | ||
result = subprocess.run( | ||
[sys.executable, module_path], capture_output=True, text=True | ||
) | ||
|
||
# Log the output from the test module | ||
if result.stdout: | ||
for line in result.stdout.strip().split("\n"): | ||
logger.info(line) | ||
|
||
if result.stderr: | ||
for line in result.stderr.strip().split("\n"): | ||
logger.error(line) | ||
|
||
return result.returncode == 0 | ||
|
||
|
||
def run_tests() -> List[Tuple[str, bool]]: | ||
"""Run all tests and return results.""" | ||
results = [] | ||
|
||
for module_name in TEST_MODULES: | ||
try: | ||
logger.info(f"\n{'=' * 50}") | ||
logger.info(f"Running test: {module_name}") | ||
logger.info(f"{'-' * 50}") | ||
|
||
success = run_test_module(module_name) | ||
results.append((module_name, success)) | ||
|
||
status = "✅ PASSED" if success else "❌ FAILED" | ||
logger.info(f"Test {module_name}: {status}") | ||
|
||
except Exception as e: | ||
logger.error(f"Error loading or running test {module_name}: {str(e)}") | ||
import traceback | ||
|
||
logger.error(traceback.format_exc()) | ||
results.append((module_name, False)) | ||
|
||
return results | ||
|
||
|
||
def print_summary(results: List[Tuple[str, bool]]) -> None: | ||
"""Print a summary of test results.""" | ||
logger.info(f"\n{'=' * 50}") | ||
logger.info("TEST SUMMARY") | ||
logger.info(f"{'-' * 50}") | ||
|
||
passed = sum(1 for _, success in results if success) | ||
total = len(results) | ||
|
||
for module_name, success in results: | ||
status = "✅ PASSED" if success else "❌ FAILED" | ||
logger.info(f"{status} - {module_name}") | ||
|
||
logger.info(f"{'-' * 50}") | ||
logger.info(f"Total: {total} | Passed: {passed} | Failed: {total - passed}") | ||
logger.info(f"{'=' * 50}") | ||
|
||
|
||
if __name__ == "__main__": | ||
# Check if required environment variables are set | ||
required_vars = [ | ||
"DATABRICKS_SERVER_HOSTNAME", | ||
"DATABRICKS_HTTP_PATH", | ||
"DATABRICKS_TOKEN", | ||
] | ||
missing_vars = [var for var in required_vars if not os.environ.get(var)] | ||
|
||
if missing_vars: | ||
logger.error( | ||
f"Missing required environment variables: {', '.join(missing_vars)}" | ||
) | ||
|
||
logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}") | ||
logger.info(f"backend type: {type(connection.session.backend)}") | ||
|
||
# Close the connection | ||
logger.info("Closing the SEA session...") | ||
connection.close() | ||
logger.info("Successfully closed SEA session") | ||
|
||
except Exception as e: | ||
logger.error(f"Error testing SEA session: {str(e)}") | ||
import traceback | ||
logger.error(traceback.format_exc()) | ||
logger.error("Please set these variables before running the tests.") | ||
sys.exit(1) | ||
|
||
logger.info("SEA session test completed successfully") | ||
|
||
if __name__ == "__main__": | ||
test_sea_session() | ||
# Run all tests | ||
results = run_tests() | ||
|
||
# Print summary | ||
print_summary(results) | ||
|
||
# Exit with appropriate status code | ||
all_passed = all(success for _, success in results) | ||
sys.exit(0 if all_passed else 1) |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
""" | ||
Test for SEA asynchronous query execution functionality. | ||
""" | ||
import os | ||
import sys | ||
import logging | ||
import time | ||
from databricks.sql.client import Connection | ||
from databricks.sql.backend.types import CommandState | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def test_sea_async_query_with_cloud_fetch(): | ||
""" | ||
Test executing a query asynchronously using the SEA backend with cloud fetch enabled. | ||
|
||
This function connects to a Databricks SQL endpoint using the SEA backend, | ||
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully. | ||
""" | ||
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME") | ||
http_path = os.environ.get("DATABRICKS_HTTP_PATH") | ||
access_token = os.environ.get("DATABRICKS_TOKEN") | ||
catalog = os.environ.get("DATABRICKS_CATALOG") | ||
|
||
if not all([server_hostname, http_path, access_token]): | ||
logger.error("Missing required environment variables.") | ||
logger.error( | ||
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN." | ||
) | ||
return False | ||
|
||
try: | ||
# Create connection with cloud fetch enabled | ||
logger.info( | ||
"Creating connection for asynchronous query execution with cloud fetch enabled" | ||
) | ||
connection = Connection( | ||
server_hostname=server_hostname, | ||
http_path=http_path, | ||
access_token=access_token, | ||
catalog=catalog, | ||
schema="default", | ||
use_sea=True, | ||
user_agent_entry="SEA-Test-Client", | ||
use_cloud_fetch=True, | ||
) | ||
|
||
logger.info( | ||
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}" | ||
) | ||
|
||
# Execute a simple query asynchronously | ||
cursor = connection.cursor() | ||
logger.info( | ||
"Executing asynchronous query with cloud fetch: SELECT 1 as test_value" | ||
) | ||
cursor.execute_async("SELECT 1 as test_value") | ||
logger.info( | ||
"Asynchronous query submitted successfully with cloud fetch enabled" | ||
) | ||
|
||
# Check query state | ||
logger.info("Checking query state...") | ||
while cursor.is_query_pending(): | ||
logger.info("Query is still pending, waiting...") | ||
time.sleep(1) | ||
|
||
logger.info("Query is no longer pending, getting results...") | ||
cursor.get_async_execution_result() | ||
logger.info( | ||
"Successfully retrieved asynchronous query results with cloud fetch enabled" | ||
) | ||
|
||
# Close resources | ||
cursor.close() | ||
connection.close() | ||
logger.info("Successfully closed SEA session") | ||
|
||
return True | ||
|
||
except Exception as e: | ||
logger.error( | ||
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}" | ||
) | ||
import traceback | ||
|
||
logger.error(traceback.format_exc()) | ||
return False | ||
|
||
|
||
def test_sea_async_query_without_cloud_fetch(): | ||
""" | ||
Test executing a query asynchronously using the SEA backend with cloud fetch disabled. | ||
|
||
This function connects to a Databricks SQL endpoint using the SEA backend, | ||
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully. | ||
""" | ||
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME") | ||
http_path = os.environ.get("DATABRICKS_HTTP_PATH") | ||
access_token = os.environ.get("DATABRICKS_TOKEN") | ||
catalog = os.environ.get("DATABRICKS_CATALOG") | ||
|
||
if not all([server_hostname, http_path, access_token]): | ||
logger.error("Missing required environment variables.") | ||
logger.error( | ||
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN." | ||
) | ||
return False | ||
|
||
try: | ||
# Create connection with cloud fetch disabled | ||
logger.info( | ||
"Creating connection for asynchronous query execution with cloud fetch disabled" | ||
) | ||
connection = Connection( | ||
server_hostname=server_hostname, | ||
http_path=http_path, | ||
access_token=access_token, | ||
catalog=catalog, | ||
schema="default", | ||
use_sea=True, | ||
user_agent_entry="SEA-Test-Client", | ||
use_cloud_fetch=False, | ||
enable_query_result_lz4_compression=False, | ||
) | ||
|
||
logger.info( | ||
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}" | ||
) | ||
|
||
# Execute a simple query asynchronously | ||
cursor = connection.cursor() | ||
logger.info( | ||
"Executing asynchronous query without cloud fetch: SELECT 1 as test_value" | ||
) | ||
cursor.execute_async("SELECT 1 as test_value") | ||
logger.info( | ||
"Asynchronous query submitted successfully with cloud fetch disabled" | ||
) | ||
|
||
# Check query state | ||
logger.info("Checking query state...") | ||
while cursor.is_query_pending(): | ||
logger.info("Query is still pending, waiting...") | ||
time.sleep(1) | ||
|
||
logger.info("Query is no longer pending, getting results...") | ||
cursor.get_async_execution_result() | ||
logger.info( | ||
"Successfully retrieved asynchronous query results with cloud fetch disabled" | ||
) | ||
|
||
# Close resources | ||
cursor.close() | ||
connection.close() | ||
logger.info("Successfully closed SEA session") | ||
|
||
return True | ||
|
||
except Exception as e: | ||
logger.error( | ||
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}" | ||
) | ||
import traceback | ||
|
||
logger.error(traceback.format_exc()) | ||
return False | ||
|
||
|
||
def test_sea_async_query_exec(): | ||
""" | ||
Run both asynchronous query tests and return overall success. | ||
""" | ||
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch() | ||
logger.info( | ||
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}" | ||
) | ||
|
||
without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch() | ||
logger.info( | ||
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}" | ||
) | ||
|
||
return with_cloud_fetch_success and without_cloud_fetch_success | ||
|
||
|
||
if __name__ == "__main__": | ||
success = test_sea_async_query_exec() | ||
sys.exit(0 if success else 1) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.