Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5585328
Add local dashboard classes.
goodwillpunning Sep 8, 2025
07caf36
Update job deployer with profiler ingestion job.
goodwillpunning Sep 9, 2025
d03d81e
Add initial integration test.
goodwillpunning Sep 11, 2025
f8982dd
Add method to upload DuckDB files to Unity Catalog Volume with tests
radhikaathalye-db Sep 16, 2025
a4d2520
Update app context to call dashboard manager with WorkspaceClient.
goodwillpunning Sep 25, 2025
4ebb53e
Add LSQL definitions for Synapse Profiler Dashboard
goodwillpunning Sep 25, 2025
8370ef6
Merge latest from feature/add_local_dashboards into feature/upload_du…
radhikaathalye-db Sep 29, 2025
72c3f87
refactor: use workspaceClient instead of requests; fix error logging
radhikaathalye-db Sep 29, 2025
03ff5bf
Add more specific exception handling.
goodwillpunning Oct 23, 2025
2aeab84
Update dedicated SQL pool LSQL widgets.
goodwillpunning Sep 29, 2025
c34394d
Replace LSQL dashboards with Python SDK.
goodwillpunning Oct 1, 2025
ac81031
Add private functions for creating/replacing profiler dashboard.
goodwillpunning Oct 15, 2025
6070973
Add more specific error handling to dashboard manager.
goodwillpunning Oct 15, 2025
fb9eb00
Update args for CLI command.
goodwillpunning Oct 15, 2025
ac7c806
Remove profiler extract ingestion job deployer.
goodwillpunning Oct 17, 2025
a094691
Remove unit tests for profiler ingestion job.
goodwillpunning Oct 20, 2025
f8f11aa
Add method to upload DuckDB files to Unity Catalog Volume with tests
radhikaathalye-db Sep 16, 2025
56be197
Merge upstream changes and update test cases.
goodwillpunning Oct 24, 2025
136f115
Add more specific exception handling.
goodwillpunning Oct 23, 2025
5fec3c6
Remove unnecessary params in dashboard manager.
goodwillpunning Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import os
import json

import requests
import logging
from typing import Dict, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DashboardTemplateLoader:
"""
Class for loading the JSON representation of a Databricks dashboard
according to the source system.
"""

def __init__(self, templates_dir: str = "templates"):
self.templates_dir = templates_dir

def load(self, source_system: str) -> Dict:
"""
Loads a profiler summary dashboard.
:param source_system: - the name of the source data warehouse
"""
filename = f"{source_system.lower()}_dashboard.json"
filepath = os.path.join(self.templates_dir, filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f"Could not find dashboard template matching '{source_system}'.")
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)


class DashboardManager:
"""
Class for managing the lifecycle of a profiler dashboard summary, a.k.a. "local dashboards"
"""

def __init__(self, workspace_url: str, token: str, warehouse_id: str, databricks_username: str):
self.warehouse_id = warehouse_id
self.token = token
if not workspace_url.startswith("http"):
workspace_url = f"https://{workspace_url}"
self.workspace_url = workspace_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Bearer {token}", "Content-Type": "application/json"})
self.databricks_username = databricks_username
self.dashboard_location = f"/Workspace/Users/{databricks_username}/Lakebridge/Dashboards"
self.dashboard_name = "Lakebridge Profiler Assessment"

def _handle_response(self, resp: requests.Response) -> Dict[str, Any]:
"""Handle API responses with logging and error handling."""
try:
resp.raise_for_status()
if resp.status_code == 204:
return {"status": "success", "message": "No content"}
return resp.json()
except requests.exceptions.HTTPError as e:
logger.error("API call failed: %s - %s", resp.status_code, resp.text)
raise RuntimeError(f"Databricks API Error {resp.status_code}: {resp.text}") from e
except Exception:
logger.exception("Unexpected error during API call")
raise

def draft_dashboard(
self, display_name: str, serialized_dashboard: str, parent_path: str, warehouse_id: str
) -> Dict[str, Any]:
"""Create a new dashboard in Databricks Lakeview."""
url = f"{self.workspace_url}/api/2.0/lakeview/dashboards"
payload = {
"display_name": display_name,
"warehouse_id": warehouse_id,
"serialized_dashboard": serialized_dashboard,
"parent_path": parent_path,
}
resp = self.session.post(url, json=payload)
return self._handle_response(resp)

def delete_dashboard(self, dashboard_id: str) -> Dict[str, Any]:
"""Delete a dashboard by ID."""
url = f"{self.workspace_url}/api/2.0/lakeview/dashboards/{dashboard_id}"
resp = self.session.delete(url)
return self._handle_response(resp)

def publish_dashboard(self, dashboard_id: str) -> Dict[str, Any]:
"""Publish a dashboard by ID."""
url = f"{self.workspace_url}/api/2.0/lakeview/dashboards/{dashboard_id}/published"
resp = self.session.post(url)
return self._handle_response(resp)

def unpublish_dashboard(self, dashboard_id: str) -> Dict[str, Any]:
"""Unpublish a dashboard by ID."""
url = f"{self.workspace_url}/api/2.0/lakeview/dashboards/{dashboard_id}/published"
resp = self.session.delete(url)
return self._handle_response(resp)

def get_unpublished_dashboard_serialized(self, dashboard_id: str) -> str:
"""
Get the serialized_dashboard of an unpublished dashboard.

Workflow:
- First unpublish the dashboard
- Then fetch the dashboard details
"""
logger.info("Unpublishing dashboard %s before fetching details", dashboard_id)
self.unpublish_dashboard(dashboard_id)

url = f"{self.workspace_url}/api/2.0/lakeview/dashboards/{dashboard_id}"
resp = self.session.get(url)
data = self._handle_response(resp)

serialized = data.get("serialized_dashboard")
if not serialized:
raise RuntimeError(f"Dashboard {dashboard_id} has no serialized_dashboard field")
return serialized

def create_profiler_summary_dashboard(self, source_system: str):
# TODO: check if the dashboard exists
# if it does, unpublish it and delete
# create new dashboard
json_dashboard = DashboardTemplateLoader("templates").load(source_system)
dashboard_manager = DashboardManager(
self.workspace_url, self.token, self.warehouse_id, self.databricks_username
)
response = dashboard_manager.draft_dashboard(
dashboard_manager.dashboard_name,
json.dumps(json_dashboard),
parent_path=dashboard_manager.dashboard_location,
warehouse_id=dashboard_manager.warehouse_id,
)
return response.get("dashboard_id")

def upload_duckdb_to_uc_volume(self, workspace_url, access_token, local_file_path, volume_path):
"""
Upload a DuckDB file to Unity Catalog Volume using PUT method

Args:
workspace_url (str): Databricks workspace URL (e.g., 'https://your-workspace.cloud.databricks.com')
access_token (str): Personal access token for authentication
local_file_path (str): Local path to the DuckDB file
volume_path (str): Target path in UC Volume (e.g., '/Volumes/catalog/schema/volume/myfile.duckdb')

Returns:
bool: True if successful, False otherwise
"""

# Validate inputs
if not os.path.exists(local_file_path):
print(f"Error: Local file not found: {local_file_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to use logger.error() statements here as opposed to printing to the console. Here is a good reference: https://github.com/databrickslabs/lakebridge/blob/main/src/databricks/labs/lakebridge/assessments/configure_assessment.py#L43

return False

if not volume_path.startswith('/Volumes/'):
print("Error: Volume path must start with '/Volumes/'")
return False

headers = {
'Authorization': f'Bearer {access_token}'
}

workspace_url = workspace_url.rstrip('/')

try:
# Use PUT method to upload directly to the volume path
url = f"{workspace_url}/api/2.0/fs/files{volume_path}"

with open(local_file_path, 'rb') as f:
response = requests.put(url, headers=headers, data=f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@radhikaathalye-db could you pull the upstream changes from the branch feature/add_local_dashboards. We'll want to use the Python SDK (WorkspaceClient) to upload files to the UC volume vs. using the REST API. Thanks!


if response.status_code in [200, 201, 204]:
print(f"Successfully uploaded {local_file_path} to {volume_path}")
return True
else:
print(f"Upload failed: {response.status_code} - {response.text}")
return False

except Exception as e:
print(f"Upload failed: {str(e)}")
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"datasets": [
{
"name": "3696faf2",
"displayName": "synapse_dsp_dwu_utilization",
"queryLines": [
"select\n",
" name,\n",
" date(`timestamp`) as date,\n",
" max(`average`) as avg,\n",
" avg(`maximum`) as avg_max,\n",
" max(`maximum`) as max_max\n",
"from\n",
" IDENTIFIER(:target_catalog || '.' || :target_schema || '.metrics_dedicated_pool_metrics')\n",
"where\n",
" name in ('DWUUsedPercent', 'DWU used percentage', 'DWU percentage')\n",
"group by\n",
" name,\n",
" date(`timestamp`)\n",
"order by\n",
" name"
],
"parameters": [
{
"displayName": "target_catalog",
"keyword": "target_catalog",
"dataType": "STRING",
"defaultSelection": {
"value": "lakebridge_profiler"
}
},
{
"displayName": "target_schema",
"keyword": "target_schema",
"dataType": "STRING",
"defaultSelection": {
"value": "run_1"
}
}
]
}
],
"pages": [
{
"name": "97000e02",
"displayName": "Profiler Summary",
"layout": [
{
"widget": {
"name": "8bdbc278",
"queries": [
{
"name": "875149cfd327490fac2aac2a05f6c004",
"query": {
"datasetName": "3696faf2",
"fields": [
{
"name": "date",
"expression": "`date`"
},
{
"name": "name",
"expression": "`name`"
},
{
"name": "column_15729dcf2867",
"expression": "AVG(`avg_max`)"
},
{
"name": "column_35784ae317028",
"expression": "MAX(`avg`)"
}
],
"disaggregated": false
}
}
],
"spec": {
"version": 0,
"viz_spec": {
"display_name": "SQL Pool Utilization (DWU Used Percentage)",
"description": "",
"viz_type": "CHART",
"serialized_options": "{\"version\": 2, \"globalSeriesType\": \"line\", \"sortX\": true, \"sortY\": true, \"legend\": {\"traceorder\": \"normal\"}, \"xAxis\": {\"type\": \"-\", \"labels\": {\"enabled\": true}, \"title\": {\"text\": \"Date\"}}, \"yAxis\": [{\"type\": \"-\", \"title\": {\"text\": \"Utilization (Percent)\"}}, {\"type\": \"-\", \"opposite\": true}], \"alignYAxesAtZero\": true, \"error_y\": {\"type\": \"data\", \"visible\": true}, \"series\": {\"stacking\": null, \"error_y\": {\"type\": \"data\", \"visible\": true}}, \"seriesOptions\": {\"column_939b6abd5915\": {\"name\": \"avg\", \"yAxis\": 0, \"type\": \"line\"}, \"CPUPercent\": {\"name\": \"CPU Used\", \"type\": \"line\"}, \"DWUUsedPercent\": {\"name\": \"DWU Used\", \"type\": \"line\"}, \"column_15729dcf2867\": {\"yAxis\": 0, \"type\": \"line\"}, \"BPAZE1IEDNADW01\": {\"name\": \"Avg of Max DWU Utilized\"}, \"column_35784ae317028\": {\"yAxis\": 0, \"type\": \"line\"}}, \"valuesOptions\": {}, \"direction\": {\"type\": \"counterclockwise\"}, \"sizemode\": \"diameter\", \"coefficient\": 1, \"numberFormat\": \"0,0[.]\", \"percentFormat\": \"0[.]00%\", \"textFormat\": \"\", \"missingValuesAsZero\": true, \"useAggregationsUi\": true, \"swappedAxes\": false, \"dateTimeFormat\": \"YYYY-MM-DD HH:mm:ss\", \"showDataLabels\": true, \"columnConfigurationMap\": {\"x\": {\"column\": \"date\", \"id\": \"column_939b6abd5913\"}, \"series\": {\"column\": \"pool_name\", \"id\": \"column_5178fbd140032\"}, \"y\": [{\"id\": \"column_15729dcf2867\", \"column\": \"avg_max\", \"transform\": \"AVG\"}, {\"id\": \"column_35784ae317028\", \"column\": \"avg\", \"transform\": \"MAX\"}]}, \"isAggregationOn\": true, \"condensed\": true, \"withRowNumber\": true}",
"query_name": "875149cfd327490fac2aac2a05f6c004"
}
}
},
"position": {
"x": 1,
"y": 93,
"width": 5,
"height": 8
}
}
],
"pageType": "PAGE_TYPE_CANVAS"
}
],
"uiSettings": {
"theme": {
"widgetHeaderAlignment": "ALIGNMENT_UNSPECIFIED"
}
}
}
88 changes: 87 additions & 1 deletion src/databricks/labs/lakebridge/deployment/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.service import compute
from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings, JobParameterDefinition
from databricks.sdk.service.jobs import (
Task,
PythonWheelTask,
JobCluster,
JobSettings,
JobParameterDefinition,
NotebookTask,
Source,
)

from databricks.labs.lakebridge.config import ReconcileConfig
from databricks.labs.lakebridge.reconcile.constants import ReconSourceType
Expand Down Expand Up @@ -145,3 +153,81 @@ def _get_default_node_type_id(self) -> str:
def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
return f"{prefix.upper()}_{name}".replace(" ", "_")

def deploy_profiler_ingestion_job(
self, name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
):
logger.info("Deploying profiler ingestion job.")
job_id = self._update_or_create_profiler_ingestion_job(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)
logger.info(f"Profiler ingestion job deployed with job_id={job_id}")
logger.info(f"Job URL: {self._ws.config.host}#job/{job_id}")
self._install_state.save()

def _update_or_create_profiler_ingestion_job(
self, name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
) -> str:
job_settings = self._profiler_ingestion_job_settings(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)
if name in self._install_state.jobs:
try:
job_id = int(self._install_state.jobs[name])
logger.info(f"Updating configuration for job `{name}`, job_id={job_id}")
self._ws.jobs.reset(job_id, JobSettings(**job_settings))
return str(job_id)
except InvalidParameterValue:
del self._install_state.jobs[name]
logger.warning(f"Job `{name}` does not exist anymore for some reason")
return self._update_or_create_profiler_ingestion_job(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)

logger.info(f"Creating new job configuration for job `{name}`")
new_job = self._ws.jobs.create(**job_settings)
assert new_job.job_id is not None
self._install_state.jobs[name] = str(new_job.job_id)
return str(new_job.job_id)

def _profiler_ingestion_job_settings(
self, job_name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
) -> dict[str, Any]:
latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True)
version = self._product_info.version()
version = version if not self._ws.config.is_gcp else version.replace("+", "-")
tags = {"version": f"v{version}"}
if self._is_testing():
# Add RemoveAfter tag for test job cleanup
date_to_remove = self._get_test_purge_time()
tags.update({"RemoveAfter": date_to_remove})

return {
"name": self._name_with_prefix(job_name),
"tags": tags,
"job_clusters": [
JobCluster(
job_cluster_key="Lakebridge_Profiler_Ingestion_Cluster",
new_cluster=compute.ClusterSpec(
data_security_mode=compute.DataSecurityMode.USER_ISOLATION,
spark_conf={},
node_type_id=self._get_default_node_type_id(),
autoscale=compute.AutoScale(min_workers=2, max_workers=3),
spark_version=latest_lts_spark,
),
)
],
"tasks": [
NotebookTask(
notebook_path=f"/Workspace/{databricks_user}/Lakebridge/profiler/load_extracted_tables.py",
base_parameters={
"extract_location": volume_upload_location,
"profiler_type": source_tech,
"target_catalog": target_catalog,
},
source=Source("WORKSPACE"),
),
],
"max_concurrent_runs": 2,
"parameters": [JobParameterDefinition(name="operation_name", default="reconcile")],
}
Loading
Loading