Skip to content

[experiment] improve job definitions #2695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pathlib import Path
from typing import Any

from databricks.bundles.resource import resource_generator
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.parallel import ManyError
Expand Down Expand Up @@ -95,7 +96,7 @@
print(__version__)
"""

TEST_RUNNER_NOTEBOOK = """# Databricks notebook source
RUNNER_NOTEBOOK = """# Databricks notebook source
# MAGIC %pip install {remote_wheel}
dbutils.library.restartPython()

Expand Down Expand Up @@ -439,6 +440,13 @@ def _infer_task_exception(haystack: str) -> Exception:
return Unknown(haystack)


from databricks.bundles.jobs.models.email_notifications import EmailNotifications
from databricks.bundles.jobs.models.task import Task as BundleTask
from databricks.bundles.jobs.models.task_dependency import TaskDependency
from databricks.bundles.jobs.models.job_cluster import JobCluster, ClusterSpec
from databricks.bundles.jobs.models.job import Job as BundleJob


class WorkflowsDeployment(InstallationMixin):
def __init__(
self,
Expand All @@ -461,12 +469,20 @@ def __init__(
super().__init__(config, installation, ws)

def create_jobs(self) -> None:
managed_jobs = self._resource_generator()
# ...


self.remove_jobs(keep=desired_workflows)
self._install_state.save()
self._create_debug(remote_wheels)
self._create_readme()

@resource_generator
def _resource_generator(self) -> Iterator[BundleJob]:
remote_wheels = self._upload_wheel()
desired_workflows = {t.workflow for t in self._tasks if t.cloud_compatible(self._ws.config)}

wheel_runner = ""
if self._config.override_clusters:
wheel_runner = self._upload_wheel_runner(remote_wheels)
wheel_runner = self._upload_wheel_runner(remote_wheels)
for workflow_name in desired_workflows:
settings = self._job_settings(workflow_name, remote_wheels)
if self._config.override_clusters:
Expand All @@ -476,12 +492,14 @@ def create_jobs(self) -> None:
self._config.override_clusters,
wheel_runner,
)
self._deploy_workflow(workflow_name, settings)

self.remove_jobs(keep=desired_workflows)
self._install_state.save()
self._create_debug(remote_wheels)
self._create_readme()
yield BundleJob(
resource_name=workflow_name,
name=settings["name"],
tags=settings["tags"],
job_clusters=settings["job_clusters"],
email_notifications=settings.get("email_notifications"),
tasks=settings["tasks"],
)

def remove_jobs(self, *, keep: set[str] | None = None) -> None:
for workflow_name, job_id in self._install_state.jobs.items():
Expand Down Expand Up @@ -607,6 +625,7 @@ def _job_cluster_spark_conf(self, cluster_key: str):
return conf_from_installation

# Workflow creation might fail on an InternalError with no message
# TODO: remove as we're doing bundle
@retried(on=[InternalError], timeout=timedelta(minutes=2))
def _deploy_workflow(self, step_name: str, settings):
if step_name in self._install_state.jobs:
Expand Down Expand Up @@ -649,8 +668,8 @@ def _upload_wheel(self):
def _upload_wheel_runner(self, remote_wheels: list[str]) -> str:
# TODO: we have to be doing this workaround until ES-897453 is solved in the platform
remote_wheels_str = " ".join(remote_wheels)
code = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheels_str, config_file=self._config_file).encode("utf8")
return self._installation.upload(f"wheels/wheel-test-runner-{self._product_info.version()}.py", code)
code = RUNNER_NOTEBOOK.format(remote_wheel=remote_wheels_str, config_file=self._config_file).encode("utf8")
return self._installation.upload(f"wheels/runner.py", code)

@staticmethod
def _apply_cluster_overrides(
Expand Down Expand Up @@ -678,7 +697,7 @@ def _job_settings(self, step_name: str, remote_wheels: list[str]) -> dict[str, A
if not self._config.override_clusters and "@" in self._my_username:
# set email notifications only if we're running the real
# installation and not the integration test.
email_notifications = jobs.JobEmailNotifications(
email_notifications = EmailNotifications(
on_success=[self._my_username], on_failure=[self._my_username]
)

Expand All @@ -705,35 +724,16 @@ def _job_settings(self, step_name: str, remote_wheels: list[str]) -> dict[str, A
"tasks": job_tasks,
}

def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task:
jobs_task = jobs.Task(
def _job_task(self, task: Task, remote_wheels: list[str]) -> BundleTask:
jobs_task = BundleTask(
task_key=task.name,
job_cluster_key=task.job_cluster,
depends_on=[jobs.TaskDependency(task_key=d) for d in task.dependencies()],
depends_on=[TaskDependency(task_key=d) for d in task.dependencies()],
)
if task.notebook:
return self._job_notebook_task(jobs_task, task)
return self._job_wheel_task(jobs_task, task.workflow, remote_wheels)

def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
assert task.notebook is not None
local_notebook = self._this_file.parent.parent / task.notebook
with local_notebook.open("rb") as f:
remote_notebook = self._installation.upload(local_notebook.name, f.read())
return replace(
jobs_task,
notebook_task=jobs.NotebookTask(
notebook_path=remote_notebook,
# ES-872211: currently, we cannot read WSFS files from Scala context
base_parameters={
"task": task.name,
"config": f"/Workspace{self._config_file}",
}
| EXTRA_TASK_PARAMS,
),
)

def _job_wheel_task(self, jobs_task: jobs.Task, workflow: str, remote_wheels: list[str]) -> jobs.Task:
# TODO: use notebook-scoped libraries only, skip wheel task, as it has problems with internet connectivity
libraries = []
for wheel in remote_wheels:
libraries.append(compute.Library(whl=wheel))
Expand All @@ -756,9 +756,9 @@ def _job_clusters(self, names: set[str]):
clusters = []
if "main" in names:
clusters.append(
jobs.JobCluster(
JobCluster(
job_cluster_key="main",
new_cluster=compute.ClusterSpec(
new_cluster=ClusterSpec(
data_security_mode=compute.DataSecurityMode.LEGACY_SINGLE_USER_STANDARD,
spark_conf=self._job_cluster_spark_conf("main"),
custom_tags={"ResourceClass": "SingleNode"},
Expand All @@ -769,9 +769,9 @@ def _job_clusters(self, names: set[str]):
)
if "tacl" in names:
clusters.append(
jobs.JobCluster(
JobCluster(
job_cluster_key="tacl",
new_cluster=compute.ClusterSpec(
new_cluster=ClusterSpec(
data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL,
spark_conf=self._job_cluster_spark_conf("tacl"),
num_workers=1, # ShowPermissionsCommand needs a worker
Expand All @@ -782,13 +782,13 @@ def _job_clusters(self, names: set[str]):
if "table_migration" in names:
# TODO: rename to "user-isolation", so that we can use it in group migration workflows
clusters.append(
jobs.JobCluster(
JobCluster(
job_cluster_key="table_migration",
new_cluster=compute.ClusterSpec(
new_cluster=ClusterSpec(
data_security_mode=compute.DataSecurityMode.USER_ISOLATION,
spark_conf=self._job_cluster_spark_conf("table_migration"),
policy_id=self._config.policy_id,
autoscale=compute.AutoScale(
autoscale=AutoScale(
max_workers=self._config.max_workers,
min_workers=self._config.min_workers,
),
Expand All @@ -797,12 +797,12 @@ def _job_clusters(self, names: set[str]):
)
return clusters

def _job_parse_logs_task(self, job_tasks: list[jobs.Task], workflow: str, remote_wheels: list[str]) -> jobs.Task:
jobs_task = jobs.Task(
def _job_parse_logs_task(self, job_tasks: list[BundleTask], workflow: str, remote_wheels: list[str]) -> BundleTask:
jobs_task = BundleTask(
task_key="parse_logs",
job_cluster_key=Task.job_cluster,
# The task dependents on all previous tasks.
depends_on=[jobs.TaskDependency(task_key=task.task_key) for task in job_tasks],
depends_on=[TaskDependency(task_key=task.task_key) for task in job_tasks],
run_if=jobs.RunIf.ALL_DONE,
)
return self._job_wheel_task(jobs_task, workflow, remote_wheels)
Expand Down
Loading