Skip to content

Commit dae71bf

Browse files
Scope crawled jobs in JobsCrawler with include_job_ids (#3658)
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST --> ## Changes Check for include_job_ids in assess_jobs task <!-- Summary of your changes that are easy to understand. Add screenshots when necessary --> ### Linked issues #3656 <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Resolves #3656 ### Functionality - [x] modified existing workflow: jobs_crawler ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] added integration tests --------- Co-authored-by: pritishpai <pritish.pai@databricks.com>
1 parent 96913cc commit dae71bf

File tree

4 files changed

+24
-4
lines changed

4 files changed

+24
-4
lines changed

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]:
9494

9595

9696
class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin):
97-
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
97+
def __init__(
98+
self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, *, include_job_ids: list[int] | None = None
99+
):
98100
super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo)
99101
self._ws = ws
102+
self._include_job_ids = include_job_ids
100103

101104
def _crawl(self) -> Iterable[JobInfo]:
102105
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
@@ -109,6 +112,9 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
109112
job_id = job.job_id
110113
if not job_id:
111114
continue
115+
if self._include_job_ids is not None and job_id not in self._include_job_ids:
116+
logger.info(f"Skipping job_id={job_id}")
117+
continue
112118
cluster_details = ClusterDetails.from_dict(cluster_config.as_dict())
113119
cluster_failures = self._check_cluster_failures(cluster_details, "Job cluster")
114120
cluster_failures.extend(self._check_jar_task(job.settings.tasks))

src/databricks/labs/ucx/contexts/application.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,12 @@ def tables_crawler(self) -> TablesCrawler:
278278

279279
@cached_property
280280
def jobs_crawler(self) -> JobsCrawler:
281-
return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
281+
return JobsCrawler(
282+
self.workspace_client,
283+
self.sql_backend,
284+
self.inventory_database,
285+
include_job_ids=self.config.include_job_ids,
286+
)
282287

283288
@cached_property
284289
def table_ownership(self) -> TableOwnership:

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ def installation(self) -> Installation:
7070

7171
@cached_property
7272
def jobs_crawler(self) -> JobsCrawler:
73-
return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
73+
return JobsCrawler(
74+
self.workspace_client,
75+
self.sql_backend,
76+
self.inventory_database,
77+
include_job_ids=self.config.include_job_ids,
78+
)
7479

7580
@cached_property
7681
def job_ownership(self) -> JobOwnership:

tests/integration/assessment/test_jobs.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,18 @@
1515
@retried(on=[NotFound], timeout=timedelta(minutes=5))
1616
def test_job_crawler(ws, make_job, inventory_schema, sql_backend):
1717
new_job = make_job(spark_conf=_SPARK_CONF)
18-
job_crawler = JobsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema)
18+
skip_job = make_job(spark_conf=_SPARK_CONF)
19+
20+
job_crawler = JobsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema, include_job_ids=[new_job.job_id])
1921
jobs = job_crawler.snapshot()
2022
results = []
2123
for job in jobs:
2224
if job.success != 0:
2325
continue
2426
if int(job.job_id) == new_job.job_id:
2527
results.append(job)
28+
if int(job.job_id) == skip_job.job_id:
29+
assert False, "Job should have been skipped"
2630

2731
assert len(results) >= 1
2832
assert int(results[0].job_id) == new_job.job_id

0 commit comments

Comments
 (0)