From 533a63eb8eae6058af6243effea88e9e7f749d5a Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Fri, 20 Sep 2024 17:51:56 +0200 Subject: [PATCH 1/2] [experiment] improve job definitions --- .../labs/ucx/installer/workflows.py | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index e6148a0da5..7d04aba407 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -439,6 +439,12 @@ def _infer_task_exception(haystack: str) -> Exception: return Unknown(haystack) +from databricks.bundles.jobs.models.email_notifications import EmailNotifications +from databricks.bundles.jobs.models.task import Task as BundleTask +from databricks.bundles.jobs.models.task_dependency import TaskDependency +from databricks.bundles.jobs.models.job_cluster import JobCluster, ClusterSpec + + class WorkflowsDeployment(InstallationMixin): def __init__( self, @@ -678,7 +684,7 @@ def _job_settings(self, step_name: str, remote_wheels: list[str]) -> dict[str, A if not self._config.override_clusters and "@" in self._my_username: # set email notifications only if we're running the real # installation and not the integration test. - email_notifications = jobs.JobEmailNotifications( + email_notifications = EmailNotifications( on_success=[self._my_username], on_failure=[self._my_username] ) @@ -705,35 +711,16 @@ def _job_settings(self, step_name: str, remote_wheels: list[str]) -> dict[str, A "tasks": job_tasks, } - def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task: - jobs_task = jobs.Task( + def _job_task(self, task: Task, remote_wheels: list[str]) -> BundleTask: + jobs_task = BundleTask( task_key=task.name, job_cluster_key=task.job_cluster, - depends_on=[jobs.TaskDependency(task_key=d) for d in task.dependencies()], + depends_on=[TaskDependency(task_key=d) for d in task.dependencies()], ) - if task.notebook: - return self._job_notebook_task(jobs_task, task) return self._job_wheel_task(jobs_task, task.workflow, remote_wheels) - def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task: - assert task.notebook is not None - local_notebook = self._this_file.parent.parent / task.notebook - with local_notebook.open("rb") as f: - remote_notebook = self._installation.upload(local_notebook.name, f.read()) - return replace( - jobs_task, - notebook_task=jobs.NotebookTask( - notebook_path=remote_notebook, - # ES-872211: currently, we cannot read WSFS files from Scala context - base_parameters={ - "task": task.name, - "config": f"/Workspace{self._config_file}", - } - | EXTRA_TASK_PARAMS, - ), - ) - def _job_wheel_task(self, jobs_task: jobs.Task, workflow: str, remote_wheels: list[str]) -> jobs.Task: + # TODO: use notebook-scoped libraries only, skip wheel task, as it has problems with internet connectivity libraries = [] for wheel in remote_wheels: libraries.append(compute.Library(whl=wheel)) @@ -756,9 +743,9 @@ def _job_clusters(self, names: set[str]): clusters = [] if "main" in names: clusters.append( - jobs.JobCluster( + JobCluster( job_cluster_key="main", - new_cluster=compute.ClusterSpec( + new_cluster=ClusterSpec( data_security_mode=compute.DataSecurityMode.LEGACY_SINGLE_USER_STANDARD, spark_conf=self._job_cluster_spark_conf("main"), custom_tags={"ResourceClass": "SingleNode"}, @@ -769,9 +756,9 @@ def _job_clusters(self, names: set[str]): ) if "tacl" in names: clusters.append( - jobs.JobCluster( + JobCluster( job_cluster_key="tacl", - new_cluster=compute.ClusterSpec( + new_cluster=ClusterSpec( data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL, spark_conf=self._job_cluster_spark_conf("tacl"), num_workers=1, # ShowPermissionsCommand needs a worker @@ -782,13 +769,13 @@ def _job_clusters(self, names: set[str]): if "table_migration" in names: # TODO: rename to "user-isolation", so that we can use it in group migration workflows clusters.append( - jobs.JobCluster( + JobCluster( job_cluster_key="table_migration", - new_cluster=compute.ClusterSpec( + new_cluster=ClusterSpec( data_security_mode=compute.DataSecurityMode.USER_ISOLATION, spark_conf=self._job_cluster_spark_conf("table_migration"), policy_id=self._config.policy_id, - autoscale=compute.AutoScale( + autoscale=AutoScale( max_workers=self._config.max_workers, min_workers=self._config.min_workers, ), @@ -797,12 +784,12 @@ def _job_clusters(self, names: set[str]): ) return clusters - def _job_parse_logs_task(self, job_tasks: list[jobs.Task], workflow: str, remote_wheels: list[str]) -> jobs.Task: - jobs_task = jobs.Task( + def _job_parse_logs_task(self, job_tasks: list[BundleTask], workflow: str, remote_wheels: list[str]) -> BundleTask: + jobs_task = BundleTask( task_key="parse_logs", job_cluster_key=Task.job_cluster, # The task dependents on all previous tasks. - depends_on=[jobs.TaskDependency(task_key=task.task_key) for task in job_tasks], + depends_on=[TaskDependency(task_key=task.task_key) for task in job_tasks], run_if=jobs.RunIf.ALL_DONE, ) return self._job_wheel_task(jobs_task, workflow, remote_wheels) From 1826054ce77aed31e95bcb6d4dcf97af38072cdc Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Wed, 25 Sep 2024 14:48:45 +0200 Subject: [PATCH 2/2] ... --- .../labs/ucx/installer/workflows.py | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 7d04aba407..391df30ba8 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -12,6 +12,7 @@ from pathlib import Path from typing import Any +from databricks.bundles.resource import resource_generator from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.installer import InstallState from databricks.labs.blueprint.parallel import ManyError @@ -95,7 +96,7 @@ print(__version__) """ -TEST_RUNNER_NOTEBOOK = """# Databricks notebook source +RUNNER_NOTEBOOK = """# Databricks notebook source # MAGIC %pip install {remote_wheel} dbutils.library.restartPython() @@ -443,6 +444,7 @@ def _infer_task_exception(haystack: str) -> Exception: from databricks.bundles.jobs.models.task import Task as BundleTask from databricks.bundles.jobs.models.task_dependency import TaskDependency from databricks.bundles.jobs.models.job_cluster import JobCluster, ClusterSpec +from databricks.bundles.jobs.models.job import Job as BundleJob class WorkflowsDeployment(InstallationMixin): @@ -467,12 +469,20 @@ def __init__( super().__init__(config, installation, ws) def create_jobs(self) -> None: + managed_jobs = self._resource_generator() + # ... + + + self.remove_jobs(keep=desired_workflows) + self._install_state.save() + self._create_debug(remote_wheels) + self._create_readme() + + @resource_generator + def _resource_generator(self) -> Iterator[BundleJob]: remote_wheels = self._upload_wheel() desired_workflows = {t.workflow for t in self._tasks if t.cloud_compatible(self._ws.config)} - - wheel_runner = "" - if self._config.override_clusters: - wheel_runner = self._upload_wheel_runner(remote_wheels) + wheel_runner = self._upload_wheel_runner(remote_wheels) for workflow_name in desired_workflows: settings = self._job_settings(workflow_name, remote_wheels) if self._config.override_clusters: @@ -482,12 +492,14 @@ def create_jobs(self) -> None: self._config.override_clusters, wheel_runner, ) - self._deploy_workflow(workflow_name, settings) - - self.remove_jobs(keep=desired_workflows) - self._install_state.save() - self._create_debug(remote_wheels) - self._create_readme() + yield BundleJob( + resource_name=workflow_name, + name=settings["name"], + tags=settings["tags"], + job_clusters=settings["job_clusters"], + email_notifications=settings.get("email_notifications"), + tasks=settings["tasks"], + ) def remove_jobs(self, *, keep: set[str] | None = None) -> None: for workflow_name, job_id in self._install_state.jobs.items(): @@ -613,6 +625,7 @@ def _job_cluster_spark_conf(self, cluster_key: str): return conf_from_installation # Workflow creation might fail on an InternalError with no message + # TODO: remove as we're doing bundle @retried(on=[InternalError], timeout=timedelta(minutes=2)) def _deploy_workflow(self, step_name: str, settings): if step_name in self._install_state.jobs: @@ -655,8 +668,8 @@ def _upload_wheel(self): def _upload_wheel_runner(self, remote_wheels: list[str]) -> str: # TODO: we have to be doing this workaround until ES-897453 is solved in the platform remote_wheels_str = " ".join(remote_wheels) - code = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheels_str, config_file=self._config_file).encode("utf8") - return self._installation.upload(f"wheels/wheel-test-runner-{self._product_info.version()}.py", code) + code = RUNNER_NOTEBOOK.format(remote_wheel=remote_wheels_str, config_file=self._config_file).encode("utf8") + return self._installation.upload(f"wheels/runner.py", code) @staticmethod def _apply_cluster_overrides(