Skip to content

Commit 0220c42

Browse files
JCZuurmondgueniai
andauthored
Let PipelinesMigrator skip unfound jobs (#3554)
## Changes Let `PipelinesMigrator` skip unfound jobs. This might happen when jobs are deleted inbetween assessment and running pipelines migration ### Linked issues Resolves #3490 ### Functionality - [x] modified existing command: `databricks labs ucx migrate-dlt-pipelines` ### Tests - [x] added unit tests --------- Co-authored-by: Guenia Izquierdo Delgado <guenia.izquierdo@databricks.com>
1 parent 2286190 commit 0220c42

File tree

3 files changed

+36
-9
lines changed

3 files changed

+36
-9
lines changed

src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from databricks.labs.blueprint.parallel import Threads
66
from databricks.sdk import WorkspaceClient
7-
from databricks.sdk.errors import DatabricksError
7+
from databricks.sdk.errors import DatabricksError, NotFound
88
from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings
99

1010
from databricks.labs.ucx.assessment.jobs import JobsCrawler
@@ -53,7 +53,11 @@ def _populate_pipeline_job_tasks_mapping(self) -> None:
5353
if not job.job_id:
5454
continue
5555

56-
job_details = self._ws.jobs.get(int(job.job_id))
56+
try:
57+
job_details = self._ws.jobs.get(int(job.job_id))
58+
except NotFound:
59+
logger.warning(f"Skipping non-existing job: {job.job_id}")
60+
continue
5761
if not job_details.settings or not job_details.settings.tasks:
5862
continue
5963

tests/integration/hive_metastore/test_pipeline_migrate.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@
1111

1212

1313
def test_pipeline_migrate(
14-
ws, make_pipeline, make_random, watchdog_purge_suffix, make_directory, runtime_ctx, make_mounted_location
14+
ws,
15+
make_pipeline,
16+
make_random,
17+
watchdog_purge_suffix,
18+
make_directory,
19+
runtime_ctx,
20+
make_mounted_location,
1521
) -> None:
1622
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
1723

tests/unit/hive_metastore/test_pipeline_migrate.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from databricks.labs.lsql.backends import MockBackend
66
from databricks.sdk.service.jobs import BaseJob, JobSettings, Task, PipelineTask
7+
from databricks.sdk.errors import NotFound
78

89
from databricks.labs.ucx.assessment.jobs import JobsCrawler
910
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
@@ -91,14 +92,30 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, include_flag, e
9192
ws.api_client.do.assert_has_calls([api_calls])
9293

9394

94-
def test_migrate_pipelines_no_pipelines(
95-
ws,
96-
):
97-
errors = {}
98-
rows = {}
99-
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
95+
def test_migrate_pipelines_no_pipelines(ws) -> None:
96+
sql_backend = MockBackend()
10097
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
10198
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
10299
pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
103100
ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)]
104101
pipelines_migrator.migrate_pipelines()
102+
103+
104+
def test_migrate_pipelines_skips_not_found_job(caplog, ws) -> None:
105+
job_columns = MockBackend.rows("job_id", "success", "failures", "job_name", "creator")
106+
sql_backend = MockBackend(
107+
rows={
108+
"`hive_metastore`.`inventory_database`.`jobs`": job_columns[
109+
("536591785949415", 1, [], "single-job", "anonymous@databricks.com")
110+
]
111+
}
112+
)
113+
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
114+
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
115+
pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
116+
117+
ws.jobs.get.side_effect = NotFound
118+
119+
with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.pipelines_migrate"):
120+
pipelines_migrator.migrate_pipelines()
121+
assert "Skipping non-existing job: 536591785949415" in caplog.messages

0 commit comments

Comments
 (0)