From bd1758c0bb39911976840ae4bf354a928405048e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 10:19:37 +0100 Subject: [PATCH 1/9] Add docstring to workflow linter --- src/databricks/labs/ucx/source_code/linters/jobs.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 721fdc8a89..8087697283 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -38,6 +38,13 @@ class WorkflowLinter: + """Lint workflows for UC compatibility and references to data assets. + + Data assets linted: + - Table and view references (UsedTables) + - Direct file system access (DirectFsAccess) + """ + def __init__( self, ws: WorkspaceClient, From 5b943f421df38441beaab74ea7ce2ebc4f713385 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:10:02 +0100 Subject: [PATCH 2/9] Let WorkflowLinter be a subclass from BaseCrawler --- .../labs/ucx/assessment/workflows.py | 2 +- .../labs/ucx/contexts/application.py | 2 ++ src/databricks/labs/ucx/progress/workflows.py | 3 +- .../labs/ucx/source_code/linters/jobs.py | 32 +++++++++++++------ .../labs/ucx/source_code/linters/queries.py | 4 +++ .../source_code/test_directfs_access.py | 6 ++-- tests/integration/source_code/test_jobs.py | 2 +- tests/unit/progress/test_workflows.py | 24 +++++++------- tests/unit/source_code/test_jobs.py | 7 +++- 9 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 3c0efbd7af..4359344313 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -213,7 +213,7 @@ def assess_workflows(self, ctx: RuntimeContext): Also, stores direct filesystem accesses for display in the migration dashboard. """ - ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.workflow_linter.snapshot() class Failing(Workflow): diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 0965ffb583..5406cd52d6 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -589,6 +589,8 @@ def dependency_resolver(self) -> DependencyResolver: def workflow_linter(self) -> WorkflowLinter: return WorkflowLinter( self.workspace_client, + self.sql_backend, + self.inventory_database, self.jobs_crawler, self.dependency_resolver, self.path_lookup, diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index b0ffd45a86..64def77780 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -180,8 +180,7 @@ def update_lakeview_dashboards_history_log(self, ctx: RuntimeContext) -> None: def assess_workflows(self, ctx: RuntimeContext): """Scans all jobs for migration issues in notebooks. Also stores direct filesystem accesses for display in the migration dashboard.""" - # TODO: Ensure these are captured in the history log. - ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.workflow_linter.snapshot(force_refresh=True) @job_task( depends_on=[ diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 8087697283..801e3720a8 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -12,6 +12,8 @@ from databricks.sdk.errors import NotFound from databricks.sdk.service import jobs +from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.assessment.jobs import JobsCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.source_code.base import ( @@ -37,7 +39,7 @@ logger = logging.getLogger(__name__) -class WorkflowLinter: +class WorkflowLinter(CrawlerBase): """Lint workflows for UC compatibility and references to data assets. Data assets linted: @@ -48,6 +50,8 @@ class WorkflowLinter: def __init__( self, ws: WorkspaceClient, + sql_backend: SqlBackend, + schema: str, jobs_crawler: JobsCrawler, resolver: DependencyResolver, path_lookup: PathLookup, @@ -55,6 +59,8 @@ def __init__( directfs_crawler: DirectFsAccessCrawler, used_tables_crawler: UsedTablesCrawler, ): + super().__init__(sql_backend, "hive_metastore", schema, "workflow_problems", JobProblem) + self._ws = ws self._jobs_crawler = jobs_crawler self._resolver = resolver @@ -63,7 +69,21 @@ def __init__( self._directfs_crawler = directfs_crawler self._used_tables_crawler = used_tables_crawler - def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None: + def _try_fetch(self) -> Iterable[JobProblem]: + """Fetch all linting problems from the inventory table. + + If trying to fetch the linted data assets, use their respective crawlers. + """ + for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): + yield JobProblem(*row) + + def _crawl(self) -> Iterable[JobProblem]: + """Crawl the workflow jobs and lint them. + + Next to linted workflow problems, the crawler also collects: + - Table and view references (UsedTables) + - Direct file system access (DirectFsAccess) + """ tasks = [] for job in self._jobs_crawler.snapshot(): tasks.append(functools.partial(self.lint_job, job.job_id)) @@ -76,18 +96,12 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> No job_problems.extend(problems) job_dfsas.extend(dfsas) job_tables.extend(tables) - logger.info(f"Saving {len(job_problems)} linting problems...") - sql_backend.save_table( - f'{inventory_database}.workflow_problems', - job_problems, - JobProblem, - mode='overwrite', - ) self._directfs_crawler.dump_all(job_dfsas) self._used_tables_crawler.dump_all(job_tables) if len(errors) > 0: error_messages = "\n".join([str(error) for error in errors]) logger.warning(f"Errors occurred during linting:\n{error_messages}") + yield from job_problems def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: try: diff --git a/src/databricks/labs/ucx/source_code/linters/queries.py b/src/databricks/labs/ucx/source_code/linters/queries.py index aca7e6e0a2..7ca50070c2 100644 --- a/src/databricks/labs/ucx/source_code/linters/queries.py +++ b/src/databricks/labs/ucx/source_code/linters/queries.py @@ -40,6 +40,10 @@ class _ReportingContext: class QueryLinter: + """Lint queries + + TODO: Let QueryLinter inherit from `BaseCrawler` + """ def __init__( self, diff --git a/tests/integration/source_code/test_directfs_access.py b/tests/integration/source_code/test_directfs_access.py index fc4b966b9d..0edee2c30c 100644 --- a/tests/integration/source_code/test_directfs_access.py +++ b/tests/integration/source_code/test_directfs_access.py @@ -79,8 +79,6 @@ def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None: def test_path_dfsa_ownership( runtime_ctx, make_directory, - inventory_schema, - sql_backend, ) -> None: """Verify the ownership of a direct-fs record for a notebook/source path associated with a job.""" @@ -92,6 +90,8 @@ def test_path_dfsa_ownership( # Produce a DFSA record for the job. linter = WorkflowLinter( runtime_ctx.workspace_client, + runtime_ctx.sql_backend, + runtime_ctx.inventory_database, runtime_ctx.jobs_crawler, runtime_ctx.dependency_resolver, runtime_ctx.path_lookup, @@ -99,7 +99,7 @@ def test_path_dfsa_ownership( runtime_ctx.directfs_access_crawler_for_paths, runtime_ctx.used_tables_crawler_for_paths, ) - linter.refresh_report(sql_backend, inventory_schema) + linter.snapshot() # Find a record for our job. records = runtime_ctx.directfs_access_crawler_for_paths.snapshot() diff --git a/tests/integration/source_code/test_jobs.py b/tests/integration/source_code/test_jobs.py index 80522e7f83..086b72f173 100644 --- a/tests/integration/source_code/test_jobs.py +++ b/tests/integration/source_code/test_jobs.py @@ -36,7 +36,7 @@ def test_linter_from_context(simple_ctx, make_job) -> None: # Ensure we have at least 1 job that fails: "Deprecated file system path in call to: /mnt/things/e/f/g" job = make_job(content="spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n") simple_ctx.config.include_job_ids = [job.job_id] - simple_ctx.workflow_linter.refresh_report(simple_ctx.sql_backend, simple_ctx.inventory_database) + simple_ctx.workflow_linter.refresh_report() # Verify that the 'problems' table has content. cursor = simple_ctx.sql_backend.fetch( diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 3557257644..d734b6a6e9 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -3,6 +3,7 @@ from unittest.mock import create_autospec import pytest + from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher from databricks.labs.ucx.progress.history import ProgressEncoder @@ -12,6 +13,8 @@ from databricks.labs.ucx.progress.workflows import MigrationProgress from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.linters.queries import QueryLinter @pytest.mark.parametrize( @@ -98,21 +101,18 @@ def test_migration_progress_runtime_tables_refresh_update_history_log(run_workfl mock_history_log.append_inventory_snapshot.assert_called_once() -@pytest.mark.parametrize( - "task, linter", - ( - (MigrationProgress.assess_dashboards, RuntimeContext.query_linter), - (MigrationProgress.assess_workflows, RuntimeContext.workflow_linter), - ), -) -def test_linter_runtime_refresh(run_workflow, task, linter) -> None: - linter_class = get_type_hints(linter.func)["return"] - mock_linter = create_autospec(linter_class) - linter_name = linter.attrname - run_workflow(task, **{linter_name: mock_linter}) +def test_migration_progress_assess_dashboards_calls_query_linter_refresh_report(run_workflow) -> None: + mock_linter = create_autospec(QueryLinter) + run_workflow(MigrationProgress.assess_dashboards, query_linter=mock_linter) mock_linter.refresh_report.assert_called_once() +def test_migration_progress_assess_workflows_calls_workflow_linter_snapshot(run_workflow) -> None: + mock_linter = create_autospec(WorkflowLinter) + run_workflow(MigrationProgress.assess_workflows, workflow_linter=mock_linter) + mock_linter.snapshot.assert_called_once() + + def test_migration_progress_with_valid_prerequisites(run_workflow) -> None: ws = create_autospec(WorkspaceClient) ws.metastores.current.return_value = MetastoreAssignment(metastore_id="test", workspace_id=123456789) diff --git a/tests/unit/source_code/test_jobs.py b/tests/unit/source_code/test_jobs.py index 5024834eb0..e9fe507017 100644 --- a/tests/unit/source_code/test_jobs.py +++ b/tests/unit/source_code/test_jobs.py @@ -233,11 +233,14 @@ def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_l expected_message = "Found job problems:\nUNKNOWN:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library" ws = create_autospec(WorkspaceClient) + sql_backend = MockBackend() jobs_crawler = create_autospec(JobsCrawler) directfs_crawler = create_autospec(DirectFsAccessCrawler) used_tables_crawler = create_autospec(UsedTablesCrawler) linter = WorkflowLinter( ws, + sql_backend, + "test", jobs_crawler, dependency_resolver, mock_path_lookup, @@ -567,6 +570,8 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test") linter = WorkflowLinter( ws, + sql_backend, + "test", jobs_crawler, dependency_resolver, mock_path_lookup, @@ -574,7 +579,7 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m directfs_crawler, used_tables_crawler, ) - linter.refresh_report(sql_backend, 'test') + linter.snapshot() jobs_crawler.snapshot.assert_called_once() sql_backend.has_rows_written_for('test.workflow_problems') From b2428b7ac71ce1851102400fa7c278c2be4dad13 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:15:18 +0100 Subject: [PATCH 3/9] Append workflow problems snapshot to progress tracking in migration progress --- src/databricks/labs/ucx/progress/workflows.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 64def77780..18f892b5a6 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -182,6 +182,12 @@ def assess_workflows(self, ctx: RuntimeContext): Also stores direct filesystem accesses for display in the migration dashboard.""" ctx.workflow_linter.snapshot(force_refresh=True) + @job_task(depends_on=[assess_workflows], job_cluster="user_isolation") + def update_workflows_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest workflow snapshot.""" + workflow_problems_snapshot = ctx.workflow_linter.snapshot(force_refresh=False) + ctx.dashboards_progress.append_inventory_snapshot(workflow_problems_snapshot) + @job_task( depends_on=[ verify_prerequisites, From cfc813e2d6e9605fe604eaf0e34716d2680eb2c6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:26:18 +0100 Subject: [PATCH 4/9] Make crawl logic more concise --- .../labs/ucx/source_code/linters/jobs.py | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 801e3720a8..13507ab479 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -84,24 +84,16 @@ def _crawl(self) -> Iterable[JobProblem]: - Table and view references (UsedTables) - Direct file system access (DirectFsAccess) """ - tasks = [] - for job in self._jobs_crawler.snapshot(): - tasks.append(functools.partial(self.lint_job, job.job_id)) + tasks = [functools.partial(self.lint_job, job.job_id) for job in self._jobs_crawler.snapshot()] logger.info(f"Running {len(tasks)} linting tasks in parallel...") - job_results, errors = Threads.gather('linting workflows', tasks) - job_problems: list[JobProblem] = [] - job_dfsas: list[DirectFsAccess] = [] - job_tables: list[UsedTable] = [] - for problems, dfsas, tables in job_results: - job_problems.extend(problems) - job_dfsas.extend(dfsas) - job_tables.extend(tables) - self._directfs_crawler.dump_all(job_dfsas) - self._used_tables_crawler.dump_all(job_tables) - if len(errors) > 0: + results, errors = Threads.gather("linting workflows", tasks) + if errors: error_messages = "\n".join([str(error) for error in errors]) logger.warning(f"Errors occurred during linting:\n{error_messages}") - yield from job_problems + problems, dfsas, tables = zip(*results) + self._directfs_crawler.dump_all(dfsas) + self._used_tables_crawler.dump_all(tables) + yield from problems def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: try: From 5418bd2eecc721223e0b925c1520c3d950d0112b Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:26:49 +0100 Subject: [PATCH 5/9] Disable too many arguments for WorkflowLinter --- src/databricks/labs/ucx/source_code/linters/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 13507ab479..167577e8cf 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -47,7 +47,7 @@ class WorkflowLinter(CrawlerBase): - Direct file system access (DirectFsAccess) """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, ws: WorkspaceClient, sql_backend: SqlBackend, From 080392197980054bfe4cf473face33f15ffc23b6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:31:11 +0100 Subject: [PATCH 6/9] Return early in case there are no tasks to lint --- src/databricks/labs/ucx/source_code/linters/jobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 167577e8cf..0ff76928f0 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -85,6 +85,8 @@ def _crawl(self) -> Iterable[JobProblem]: - Direct file system access (DirectFsAccess) """ tasks = [functools.partial(self.lint_job, job.job_id) for job in self._jobs_crawler.snapshot()] + if not tasks: + return logger.info(f"Running {len(tasks)} linting tasks in parallel...") results, errors = Threads.gather("linting workflows", tasks) if errors: From 78da9ffdcc98bf3ce1439e939acaae8bdd255c9a Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 11:32:21 +0100 Subject: [PATCH 7/9] Only yield and dump if results --- src/databricks/labs/ucx/source_code/linters/jobs.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 0ff76928f0..5fa68905a4 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -92,10 +92,11 @@ def _crawl(self) -> Iterable[JobProblem]: if errors: error_messages = "\n".join([str(error) for error in errors]) logger.warning(f"Errors occurred during linting:\n{error_messages}") - problems, dfsas, tables = zip(*results) - self._directfs_crawler.dump_all(dfsas) - self._used_tables_crawler.dump_all(tables) - yield from problems + if results: + problems, dfsas, tables = zip(*results) + self._directfs_crawler.dump_all(dfsas) + self._used_tables_crawler.dump_all(tables) + yield from problems def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: try: From 63ee0e0782da2ab0cedae117fbec461956906298 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 13:25:37 +0100 Subject: [PATCH 8/9] Add job progress encoder with ownership --- .../labs/ucx/contexts/workflow_task.py | 21 ++++++++ src/databricks/labs/ucx/progress/workflows.py | 6 +-- src/databricks/labs/ucx/source_code/jobs.py | 50 +++++++++++++++++-- .../labs/ucx/source_code/linters/jobs.py | 6 +-- .../integration/assessment/test_workflows.py | 8 ++- tests/unit/progress/test_workflows.py | 4 ++ tests/unit/source_code/test_jobs.py | 2 +- 7 files changed, 84 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index bcd0555b95..86f9e01544 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -32,6 +32,7 @@ from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder +from databricks.labs.ucx.source_code.jobs import JobProblem, JobProblemOwnership # As with GlobalContext, service factories unavoidably have a lot of public methods. # pylint: disable=too-many-public-methods @@ -81,6 +82,15 @@ def jobs_crawler(self) -> JobsCrawler: def job_ownership(self) -> JobOwnership: return JobOwnership(self.administrator_locator) + @cached_property + def workflow_problem_ownership(self) -> JobProblemOwnership: + return JobProblemOwnership( + self.administrator_locator, + self.workspace_client, + self.workspace_path_ownership, + self.job_ownership, + ) + @cached_property def submit_runs_crawler(self) -> SubmitRunsCrawler: return SubmitRunsCrawler( @@ -217,6 +227,17 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]: self.config.ucx_catalog, ) + @cached_property + def workflow_problem_progress(self) -> ProgressEncoder[JobProblem]: + return ProgressEncoder( + self.sql_backend, + self.workflow_problem_ownership, + JobProblem, + self.parent_run_id, + self.workspace_id, + self.config.ucx_catalog, + ) + @cached_property def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: return ProgressEncoder( diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 18f892b5a6..3784293049 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -183,10 +183,10 @@ def assess_workflows(self, ctx: RuntimeContext): ctx.workflow_linter.snapshot(force_refresh=True) @job_task(depends_on=[assess_workflows], job_cluster="user_isolation") - def update_workflows_history_log(self, ctx: RuntimeContext) -> None: - """Update the history log with the latest workflow snapshot.""" + def update_workflow_problems_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest workflow problems snapshot.""" workflow_problems_snapshot = ctx.workflow_linter.snapshot(force_refresh=False) - ctx.dashboards_progress.append_inventory_snapshot(workflow_problems_snapshot) + ctx.workflow_problem_progress.append_inventory_snapshot(workflow_problems_snapshot) @job_task( depends_on=[ diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index bacc4554d3..8a827336d0 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -7,6 +7,7 @@ from importlib import metadata from pathlib import Path from urllib import parse +from typing import ClassVar from databricks.labs.blueprint.paths import DBFSPath from databricks.sdk import WorkspaceClient @@ -16,10 +17,10 @@ from databricks.sdk.service.jobs import Source from databricks.labs.ucx.assessment.crawlers import runtime_version_tuple +from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache, InvalidPath -from databricks.labs.ucx.source_code.base import ( - LineageAtom, -) +from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.graph import ( Dependency, DependencyGraph, @@ -45,10 +46,25 @@ class JobProblem: end_line: int end_col: int + __id_attributes__: ClassVar[tuple[str, ...]] = ( + "job_id", + "task_key", + "path", + "code", + "start_line", + "start_col", + "end_line", + "end_col", + ) + def as_message(self) -> str: message = f"{self.path}:{self.start_line} [{self.code}] {self.message}" return message + def has_missing_path(self) -> bool: + """Flag if the path is missing, or not.""" + return self.path == Path("") # Reusing flag from DependencyProblem + class WorkflowTask(Dependency): def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): @@ -358,3 +374,31 @@ def _new_job_cluster_metadata(self, new_cluster) -> Iterable[DependencyProblem]: self._spark_version = new_cluster.spark_version self._data_security_mode = new_cluster.data_security_mode return [] + + +class JobProblemOwnership(Ownership[JobProblem]): + """Determine ownership of job (workflow) problems. + + This is the job creator (if known). + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + ws: WorkspaceClient, + workspace_path_ownership: WorkspacePathOwnership, + job_ownership: JobOwnership, + ) -> None: + super().__init__(administrator_locator) + self._ws = ws + self._workspace_path_ownership = workspace_path_ownership + self._job_ownership = job_ownership + + def _maybe_direct_owner(self, record: JobProblem) -> str | None: + if not record.has_missing_path(): + return self._workspace_path_ownership.owner_of_path(record.path) + try: + job = self._ws.jobs.get(record.job_id) + return self._job_ownership.owner_of(JobInfo.from_job(job)) + except DatabricksError: + return None diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 5fa68905a4..4a09a7eb17 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -4,7 +4,6 @@ from collections.abc import Iterable from datetime import datetime, timezone -from pathlib import Path from databricks.labs.blueprint.parallel import Threads from databricks.labs.lsql.backends import SqlBackend @@ -111,8 +110,6 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], logger.warning(f"Found job problems:\n{problem_messages}") return problems, dfsas, tables - _UNKNOWN = Path('') - def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: problems: list[JobProblem] = [] dfsas: list[DirectFsAccess] = [] @@ -127,12 +124,11 @@ def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAcces if not advices: advices = self._lint_task(graph, session_state) for advice in advices: - absolute_path = "UNKNOWN" if advice.has_missing_path() else advice.path.absolute().as_posix() job_problem = JobProblem( job_id=job.job_id, job_name=job.settings.name, task_key=task.task_key, - path=absolute_path, + path=advice.path.as_posix(), code=advice.advice.code, message=advice.advice.message, start_line=advice.advice.start_line, diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index d9d0e53e91..4239be3a4e 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -4,6 +4,8 @@ from databricks.sdk.retries import retried from databricks.sdk.service.iam import PermissionLevel +from databricks.labs.ucx.source_code.jobs import JobProblem + @retried(on=[NotFound, InvalidParameterValue]) def test_running_real_assessment_job( @@ -47,5 +49,9 @@ def test_running_real_assessment_job( assert actual_tables == expected_tables query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems" - workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"] + workflow_problems_without_path = [] + for record in sql_backend.fetch(query): + job_problem = JobProblem(**record.asDict()) + if job_problem.has_missing_path(): + workflow_problems_without_path.append(job_problem) assert not workflow_problems_without_path diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index d734b6a6e9..e0bafd875b 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -109,7 +109,11 @@ def test_migration_progress_assess_dashboards_calls_query_linter_refresh_report( def test_migration_progress_assess_workflows_calls_workflow_linter_snapshot(run_workflow) -> None: mock_linter = create_autospec(WorkflowLinter) + mock_history_log = create_autospec(ProgressEncoder) run_workflow(MigrationProgress.assess_workflows, workflow_linter=mock_linter) + run_workflow( + MigrationProgress.update_workflow_problems_history_log, workflow_progress=mock_history_log, parent_run_id=1234 + ) mock_linter.snapshot.assert_called_once() diff --git a/tests/unit/source_code/test_jobs.py b/tests/unit/source_code/test_jobs.py index e9fe507017..96e3078e4f 100644 --- a/tests/unit/source_code/test_jobs.py +++ b/tests/unit/source_code/test_jobs.py @@ -230,7 +230,7 @@ def test_workflow_task_container_builds_dependency_graph_spark_python_task( def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_lookup, empty_index, caplog) -> None: - expected_message = "Found job problems:\nUNKNOWN:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library" + expected_message = "Found job problems:\n:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library" ws = create_autospec(WorkspaceClient) sql_backend = MockBackend() From 8b13c924a04c4fc45f5b01e5c22ba00852a4b116 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 25 Feb 2025 13:39:20 +0100 Subject: [PATCH 9/9] Note JobProblem.path being string --- src/databricks/labs/ucx/source_code/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index 8a827336d0..e5b2c5ee2e 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -38,7 +38,7 @@ class JobProblem: job_id: int job_name: str task_key: str - path: str + path: str # str for legacy support code: str message: str start_line: int @@ -63,7 +63,7 @@ def as_message(self) -> str: def has_missing_path(self) -> bool: """Flag if the path is missing, or not.""" - return self.path == Path("") # Reusing flag from DependencyProblem + return self.path == Path("").as_posix() # Reusing flag from DependencyProblem class WorkflowTask(Dependency):