diff --git a/.gitignore b/.gitignore index 466c618f0..12de8f342 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ remorph_transpile/ /linter/src/main/antlr4/library/gen/ .databricks-login.json .mypy_cache +.env diff --git a/labs.yml b/labs.yml index 0522a38c1..455fcd224 100644 --- a/labs.yml +++ b/labs.yml @@ -46,6 +46,19 @@ commands: {{range .}}{{.total_files_processed}}\t{{.total_queries_processed}}\t{{.analysis_error_count}}\t{{.parsing_error_count}}\t{{.validation_error_count}}\t{{.generation_error_count}}\t{{.error_log_file}} {{end}} + - name: llm-transpile + description: Transpile source code to Databricks using LLM Transpiler (Switch) + flags: + - name: input-source + description: Input Script Folder or File (local path) + default: null + - name: output-ws-folder + description: Output folder path (Databricks Workspace path starting with /Workspace/) + default: null + - name: source-dialect + description: Source dialect name (e.g., 'snowflake', 'teradata') + default: null + - name: reconcile description: Reconcile source and target data residing on Databricks diff --git a/pyproject.toml b/pyproject.toml index 525ab4d38..7a102a012 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -449,7 +449,7 @@ bad-functions = ["map", "input"] # ignored-parents = # Maximum number of arguments for function / method. -max-args = 12 +max-args = 13 # Maximum number of attributes for a class (see R0902). max-attributes = 13 diff --git a/src/databricks/labs/lakebridge/cli.py b/src/databricks/labs/lakebridge/cli.py index 30318393d..54d3bc536 100644 --- a/src/databricks/labs/lakebridge/cli.py +++ b/src/databricks/labs/lakebridge/cli.py @@ -36,9 +36,11 @@ from databricks.labs.lakebridge.transpiler.lsp.lsp_engine import LSPEngine from databricks.labs.lakebridge.transpiler.repository import TranspilerRepository from databricks.labs.lakebridge.transpiler.sqlglot.sqlglot_engine import SqlglotEngine +from databricks.labs.lakebridge.transpiler.switch_runner import SwitchRunner from databricks.labs.lakebridge.transpiler.transpile_engine import TranspileEngine from databricks.labs.lakebridge.transpiler.transpile_status import ErrorSeverity +from databricks.labs.switch.lsp import get_switch_dialects # Subclass to allow controlled access to protected methods. @@ -92,7 +94,7 @@ def _remove_warehouse(ws: WorkspaceClient, warehouse_id: str): @lakebridge.command -def transpile( # pylint: disable=too-many-arguments +def transpile( *, w: WorkspaceClient, transpiler_config_path: str | None = None, @@ -240,6 +242,14 @@ def use_transpiler_config_path(self, transpiler_config_path: str | None) -> None ) self._config = dataclasses.replace(self._config, transpiler_config_path=transpiler_config_path) + # Switch is installed inside "/Users/<>/.lakebridge/transpilers/Switch/lsp/config.yml + if ( + self._config.transpiler_config_path is not None + and Path(self._config.transpiler_config_path).parent.parent.name == "Switch" + ): + msg = "Switch transpiler is not supported through `transpile` run `llm-transpile` instead." + raise RuntimeError(msg) + def use_source_dialect(self, source_dialect: str | None) -> None: if source_dialect is not None: # Defer validation: depends on the transpiler config path, we'll deal with this later. @@ -637,6 +647,131 @@ def _override_workspace_client_config(ctx: ApplicationContext, overrides: dict[s ctx.connect_config.cluster_id = cluster_id +def _validate_llm_transpile_args( + input_source: str | None, + output_ws_folder: str | None, + source_dialect: str | None, + transpile_config: TranspileConfig | None, + prompts: Prompts, +) -> tuple[str, str, str]: + + if input_source is None: + input_source = transpile_config.input_source if transpile_config else None + if output_ws_folder is None: + output_ws_folder = transpile_config.output_folder if transpile_config else None + if source_dialect is None: + source_dialect = transpile_config.source_dialect if transpile_config else None + + _switch_dialects = get_switch_dialects() + + # Validate presence after attempting to source from config + if not input_source: + input_source = prompts.question("Enter input SQL path") + if not output_ws_folder: + output_ws_folder = prompts.question("Enter output workspace folder must start with /Workspace/") + if not source_dialect: + source_dialect = prompts.choice("Select the source dialect", sorted(_switch_dialects)) + + # Validate input_source path exists (local path) + if not Path(input_source).exists(): + raise_validation_exception(f"Invalid path for '--input-source': Path '{input_source}' does not exist.") + + # Validate output_ws_folder is a workspace path + if not str(output_ws_folder).startswith("/Workspace/"): + raise_validation_exception( + f"Invalid value for '--output-ws-folder': workspace output path must start with /Workspace/. Got: {output_ws_folder!r}" + ) + + if source_dialect not in _switch_dialects: + raise_validation_exception( + f"Invalid value for '--source-dialect': {source_dialect!r} must be one of: {', '.join(sorted(_switch_dialects))}" + ) + + return input_source, output_ws_folder, source_dialect + + +@lakebridge.command +def llm_transpile( + *, + w: WorkspaceClient, + input_source: str | None = None, + output_ws_folder: str | None = None, + source_dialect: str | None = None, + ctx: ApplicationContext | None = None, +) -> None: + """Transpile source code to Databricks using LLM Transpiler (Switch)""" + if ctx is None: + ctx = ApplicationContext(w) + del w + ctx.add_user_agent_extra("cmd", "transpile-switch") + user = ctx.current_user + logger.debug(f"User: {user}") + + prompts = ctx.prompts + transpile_config = ctx.transpile_config + # If CLI args are missing, try to read them from config.yml + input_source, output_ws_folder, source_dialect = _validate_llm_transpile_args( + input_source, + output_ws_folder, + source_dialect, + transpile_config, + prompts, + ) + + job_list = ctx.install_state.jobs + if "Switch" not in job_list: + raise RuntimeError( + "Switch Job ID not found. " + "Please run 'databricks labs lakebridge install-transpile --include-llm-transpiler true' first." + ) + logger.debug("Switch job ID found in InstallState") + job_id = int(job_list["Switch"]) + + # Upload File to Volume + transpile_config = ctx.transpile_config + if transpile_config is None or transpile_config.transpiler_options is None: + raise RuntimeError( + "Transpile configuration config.yml not found in workspace." + "Please run 'databricks labs lakebridge install-transpile --include-llm-transpiler true' first." + ) + + transpile_options = transpile_config.transpiler_options + logger.debug(f"Transpiler options: {transpile_options}") + if not isinstance(transpile_options, Mapping): + raise RuntimeError( + "Invalid transpile configuration: transpiler_options must be a mapping. " + "Please run 'databricks labs lakebridge install-transpile --include-llm-transpiler true' first." + ) + catalog = transpile_options.get("catalog", None) + schema = transpile_options.get("schema", None) + volume = transpile_options.get("volume", None) + + if catalog is None or schema is None or volume is None: + raise RuntimeError( + "Invalid transpile configuration: catalog, schema or volume is missing. " + "Please run 'databricks labs lakebridge install-transpile --include-llm-transpiler true' first." + ) + assert isinstance(catalog, str) + assert isinstance(schema, str) + assert isinstance(volume, str) + + try: + job_runner = SwitchRunner(ctx.workspace_client, ctx.installation) + volume_input_path = job_runner.upload_to_volume( + local_path=Path(input_source), catalog=catalog, schema=schema, volume=volume + ) + + response = job_runner.run( + volume_input_path=volume_input_path, + output_ws_folder=output_ws_folder, + source_tech=source_dialect, + job_id=job_id, + ) + json.dump(response, sys.stdout, indent=2) + except Exception as ex: + raise RuntimeError(ex) from ex + + @lakebridge.command def reconcile(*, w: WorkspaceClient) -> None: """[EXPERIMENTAL] Reconciles source to Databricks datasets""" diff --git a/src/databricks/labs/lakebridge/config.py b/src/databricks/labs/lakebridge/config.py index 346b3f745..a27e7b13d 100644 --- a/src/databricks/labs/lakebridge/config.py +++ b/src/databricks/labs/lakebridge/config.py @@ -166,6 +166,7 @@ class TranspileConfig: error_file_path: str | None = None sdk_config: dict[str, str] | None = None skip_validation: bool = False + include_llm: bool = False catalog_name: str = "remorph" schema_name: str = "transpiler" transpiler_options: JsonValue = None diff --git a/src/databricks/labs/lakebridge/install.py b/src/databricks/labs/lakebridge/install.py index 2ed10759e..cb47a8bf4 100644 --- a/src/databricks/labs/lakebridge/install.py +++ b/src/databricks/labs/lakebridge/install.py @@ -40,7 +40,7 @@ class WorkspaceInstaller: # TODO: Temporary suppression, is_interactive is pending removal. - def __init__( # pylint: disable=too-many-arguments + def __init__( self, ws: WorkspaceClient, prompts: Prompts, diff --git a/src/databricks/labs/lakebridge/transpiler/switch_runner.py b/src/databricks/labs/lakebridge/transpiler/switch_runner.py new file mode 100644 index 000000000..1fbdbf3c2 --- /dev/null +++ b/src/databricks/labs/lakebridge/transpiler/switch_runner.py @@ -0,0 +1,146 @@ +import io +import logging +import os +import random +import string +from datetime import datetime, timezone +from pathlib import Path + +from databricks.labs.blueprint.installation import Installation, RootJsonValue +from databricks.sdk import WorkspaceClient + +logger = logging.getLogger(__name__) + + +class SwitchRunner: + """Runner for Switch LLM transpilation jobs.""" + + def __init__( + self, + ws: WorkspaceClient, + installation: Installation, + ): + self._ws = ws + self._installation = installation + + def run( + self, + volume_input_path: str, + output_ws_folder: str, + source_tech: str, + job_id: int, + wait_for_completion: bool = False, + ) -> RootJsonValue: + """Upload local files to Volume and trigger Switch job.""" + + job_params = self._build_job_parameters( + input_dir=volume_input_path, + output_dir=output_ws_folder, + source_tech=source_tech, + ) + logger.info(f"Triggering Switch job with job_id: {job_id}") + + return self._run_job(job_id, job_params, wait_for_completion) + + def upload_to_volume( + self, + local_path: Path, + catalog: str, + schema: str, + volume: str, + ) -> str: + """Upload local files to UC Volume with unique timestamped path.""" + now = datetime.now(timezone.utc) + time_part = now.strftime("%Y%m%d%H%M%S") + random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) + timestamp_suffix = f"{time_part}_{random_part}" + volume_base_path = f"/Volumes/{catalog}/{schema}/{volume}" + volume_input_path = f"{volume_base_path}/input_{timestamp_suffix}" + + logger.info(f"Uploading {local_path} to {volume_input_path}...") + + # File upload + if local_path.is_file(): + volume_file_path = f"{volume_input_path}/{local_path.name}" + with open(local_path, 'rb') as f: + content = f.read() + self._ws.files.upload(file_path=volume_file_path, contents=io.BytesIO(content), overwrite=True) + logger.debug(f"Uploaded: {local_path} -> {volume_file_path}") + + # Directory upload + else: + for root, _, files in os.walk(local_path): + for file in files: + local_file = Path(root) / file + relative_path = local_file.relative_to(local_path) + volume_file_path = f"{volume_input_path}/{relative_path}" + + with open(local_file, 'rb') as f: + content = f.read() + + self._ws.files.upload(file_path=volume_file_path, contents=io.BytesIO(content), overwrite=True) + logger.debug(f"Uploaded: {local_file} -> {volume_file_path}") + + logger.info(f"Upload complete: {volume_input_path}") + return volume_input_path + + def _build_job_parameters( + self, + input_dir: str, + output_dir: str, + source_tech: str, + switch_options: dict | None = None, + ) -> dict[str, str]: + """Build Switch job parameters.""" + if switch_options is None: + switch_options = {} + return { + "input_dir": input_dir, + "output_dir": output_dir, + "source_tech": source_tech, + **switch_options, + } + + def _run_job( + self, + job_id: int, + job_params: dict[str, str], + wait_for_completion: bool, + ) -> RootJsonValue: + """Execute Switch job and return run information.""" + if wait_for_completion: + run = self._ws.jobs.run_now_and_wait(job_id, job_parameters=job_params) + + if not run.run_id: + raise SystemExit(f"Job {job_id} execution failed.") + + job_run_url = f"{self._ws.config.host}/jobs/{job_id}/runs/{run.run_id}" + logger.info(f"Switch LLM transpilation job completed: {job_run_url}") + + return [ + { + "job_id": job_id, + "run_id": run.run_id, + "run_url": job_run_url, + "state": ( + run.state.life_cycle_state.value if run.state and run.state.life_cycle_state else "UNKNOWN" + ), + "result_state": run.state.result_state.value if run.state and run.state.result_state else None, + } + ] + + wait = self._ws.jobs.run_now(job_id, job_parameters=job_params) + + if not wait.run_id: + raise SystemExit(f"Job {job_id} execution failed.") + + job_run_url = f"{self._ws.config.host}/jobs/{job_id}/runs/{wait.run_id}" + logger.info(f"Switch LLM transpilation job started: {job_run_url}") + + return [ + { + "job_id": job_id, + "run_id": wait.run_id, + "run_url": job_run_url, + } + ] diff --git a/tests/conftest.py b/tests/conftest.py index a6a3f114c..2b2419231 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -312,6 +312,21 @@ def morpheus_artifact() -> Path: return artifact +@pytest.fixture +def switch_artifact() -> Path: + """Get Switch wheel for testing.""" + artifact = ( + Path(__file__).parent + / "resources" + / "transpiler_configs" + / "switch" + / "wheel" + / "databricks_switch_plugin-0.1.2-py3-none-any.whl" + ) + assert artifact.exists(), f"Switch artifact not found: {artifact}" + return artifact + + class FakeDataSource(DataSource): def __init__(self, start_delimiter: str, end_delimiter: str): diff --git a/tests/unit/test_cli_llm_transpile.py b/tests/unit/test_cli_llm_transpile.py new file mode 100644 index 000000000..4e3ee97cf --- /dev/null +++ b/tests/unit/test_cli_llm_transpile.py @@ -0,0 +1,193 @@ +import json +from pathlib import Path +from unittest.mock import create_autospec +from typing import cast + +import pytest + +from databricks.labs.blueprint.installation import MockInstallation, RootJsonValue +from databricks.labs.blueprint.tui import MockPrompts +from databricks.labs.switch.lsp import get_switch_dialects + +from databricks.labs.lakebridge import cli +from databricks.labs.lakebridge.contexts.application import ApplicationContext +from databricks.sdk import WorkspaceClient + +_JOB_ID = 1234567890 +_RUN_ID = 123456789 +_switch_dialects = get_switch_dialects() + + +def make_mock_prompts(input_path: str, output_folder: str, source_dialect: str = "mssql") -> MockPrompts: + return MockPrompts( + { + r"Enter input SQL path": str(input_path), + r"Enter output workspace folder must start with /Workspace/": output_folder, + r"Select the source dialect": str(sorted(_switch_dialects).index(source_dialect)), + } + ) + + +def create_switch_workspace_client_mock() -> WorkspaceClient: + ws = create_autospec(spec=WorkspaceClient, instance=True) + + ws.config.host = 'https://workspace.databricks.com' + ws.files.upload.return_value = None + ws.jobs.run_now.return_value.run_id = _RUN_ID + ws.jobs.run_now_and_wait_result.return_value.run_id = _RUN_ID + + return ws + + +@pytest.fixture +def mock_installation_with_switch() -> MockInstallation: + """MockInstallation with Switch configuration state.""" + state: dict[str, RootJsonValue] = { + "config.yml": { + "version": 3, + "transpiler_config_path": str(Path.home() / ".lakebridge" / "Switch" / "lsp" / "config.yml"), + "transpiler_options": { + "catalog": "test_catalog", + "schema": "test_schema", + "volume": "test_volume", + "foundation_model": "databricks-claude-sonnet-4-5", + "transpiler_name": "Switch", + }, + "source_dialect": None, + "input_source": None, + "output_folder": None, + "sdk_config": None, + "skip_validation": False, + "catalog_name": "catalog", + "schema_name": "schema", + }, + "state.json": {"resources": {"jobs": {"Switch": f"{_JOB_ID}"}}, "version": 1}, + } + return MockInstallation(cast(dict[str, RootJsonValue], state)) + + +def test_llm_transpile_success( + mock_installation_with_switch: MockInstallation, + tmp_path: Path, + capsys, +) -> None: + """Test successful LLM transpile execution.""" + input_source = tmp_path / "input.sql" + input_source.write_text("SELECT * FROM table1;") + output_folder = "/Workspace/Users/test/output" + + # Use a dedicated WorkspaceClient mock tailored for SwitchRunner + mock_ws = create_switch_workspace_client_mock() + + ctx = ApplicationContext(mock_ws) + ctx.replace(installation=mock_installation_with_switch) + ctx.replace(add_user_agent_extra=lambda w, *args, **kwargs: w) + + cli.llm_transpile( + w=mock_ws, + input_source=str(input_source), + output_ws_folder=output_folder, + source_dialect="mssql", + ctx=ctx, + ) + + (out, _) = capsys.readouterr() + result = json.loads(out) + assert [ + { + "job_id": _JOB_ID, + "run_id": _RUN_ID, + "run_url": f"https://workspace.databricks.com/jobs/{_JOB_ID}/runs/{_RUN_ID}", + } + ] == result + + +def test_llm_transpile_without_parms( + mock_installation_with_switch: MockInstallation, + tmp_path: Path, + capsys, +) -> None: + """Test LLM transpile execution without parameters, relying on config file.""" + input_source = tmp_path / "input.sql" + input_source.write_text("SELECT * FROM table1;") + output_folder = "/Workspace/Users/test/output" + + mock_prompts = make_mock_prompts(str(input_source), output_folder, "mssql") + + # Use a dedicated WorkspaceClient mock tailored for SwitchRunner + mock_ws = create_switch_workspace_client_mock() + + ctx = ApplicationContext(mock_ws) + + ctx.replace( + installation=mock_installation_with_switch, + add_user_agent_extra=lambda w, *args, **kwargs: w, + prompts=mock_prompts, + ) + + cli.llm_transpile(w=mock_ws, ctx=ctx) + + (out, _) = capsys.readouterr() + result = json.loads(out) + assert [ + { + "job_id": _JOB_ID, + "run_id": _RUN_ID, + "run_url": f"https://workspace.databricks.com/jobs/{_JOB_ID}/runs/{_RUN_ID}", + } + ] == result + + +def test_llm_transpile_with_incorrect_output_parms( + mock_installation_with_switch: MockInstallation, + tmp_path: Path, +) -> None: + + input_source = tmp_path / "input.sql" + input_source.write_text("SELECT * FROM table1;") + output_folder = "/Users/test/output" + + mock_prompts = make_mock_prompts(str(input_source), output_folder, "mssql") + + # Use a dedicated WorkspaceClient mock tailored for SwitchRunner + mock_ws = create_switch_workspace_client_mock() + + ctx = ApplicationContext(mock_ws) + + ctx.replace( + installation=mock_installation_with_switch, + add_user_agent_extra=lambda w, *args, **kwargs: w, + prompts=mock_prompts, + ) + + error_msg = "Invalid value for '--output-ws-folder': workspace output path must start with /Workspace/. Got: '/Users/test/output'" + with pytest.raises(ValueError, match=rf"{error_msg}"): + cli.llm_transpile(w=mock_ws, output_ws_folder=output_folder, ctx=ctx) + + +def test_llm_transpile_with_incorrect_dialect( + mock_installation_with_switch: MockInstallation, + tmp_path: Path, +) -> None: + + input_source = tmp_path / "input.sql" + input_source.write_text("SELECT * FROM table1;") + output_folder = "/Workspace/Users/test/output" + + # passing this but will override below to mimic incorrect dialect scenario + mock_prompts = make_mock_prompts(str(input_source), output_folder, "mssql") + + # Use a dedicated WorkspaceClient mock tailored for SwitchRunner + mock_ws = create_switch_workspace_client_mock() + + ctx = ApplicationContext(mock_ws) + + ctx.replace( + installation=mock_installation_with_switch, + add_user_agent_extra=lambda w, *args, **kwargs: w, + prompts=mock_prompts, + ) + + error_msg = "Invalid value for '--source-dialect': 'agent_sql' must be one of: airflow, mssql, mysql, netezza, oracle, postgresql, redshift, snowflake, synapse, teradata" + with pytest.raises(ValueError, match=rf"{error_msg}"): + cli.llm_transpile(w=mock_ws, source_dialect="agent_sql", ctx=ctx) diff --git a/tests/unit/test_install.py b/tests/unit/test_install.py index 1b0e05ea1..e389e7c68 100644 --- a/tests/unit/test_install.py +++ b/tests/unit/test_install.py @@ -24,7 +24,9 @@ from databricks.labs.lakebridge.deployment.installation import WorkspaceInstallation from databricks.labs.lakebridge.install import WorkspaceInstaller from databricks.labs.lakebridge.reconcile.constants import ReconSourceType, ReconReportType -from databricks.labs.lakebridge.transpiler.installers import TranspilerInstaller +from databricks.labs.lakebridge.transpiler.installers import ( + TranspilerInstaller, +) from databricks.labs.lakebridge.transpiler.repository import TranspilerRepository from tests.unit.conftest import path_to_resource diff --git a/tests/unit/transpiler/test_switch_runner.py b/tests/unit/transpiler/test_switch_runner.py new file mode 100644 index 000000000..e69de29bb