Skip to content

Commit eb71c84

Browse files
committed
Add integration test, fetch parameter from current job run
1 parent dbea5ce commit eb71c84

File tree

3 files changed

+29
-6
lines changed

3 files changed

+29
-6
lines changed

src/databricks/labs/ucx/assessment/workflows.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from functools import cached_property
23

34
from databricks.sdk.service.jobs import JobParameterDefinition
45

@@ -11,8 +12,21 @@
1112

1213
class Assessment(Workflow): # pylint: disable=too-many-public-methods
1314
def __init__(self):
14-
self._force_refresh_param = JobParameterDefinition(name="force_refresh", default=False)
15-
super().__init__('assessment', [self._force_refresh_param])
15+
super().__init__('assessment', [JobParameterDefinition(name="force_refresh", default=False)])
16+
17+
@staticmethod
18+
def _get_force_refresh(ctx: RuntimeContext) -> bool:
19+
"""Extracts the force_refresh parameter from the job run parameters."""
20+
force_refresh = False
21+
job_id = ctx.install_state.jobs["assessment"]
22+
job_runs = [job_run for job_run in ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id)]
23+
job_parameters = job_runs[0].job_parameters if job_runs else []
24+
for job_parameter in job_parameters:
25+
if job_parameter.name == "force_refresh" and job_parameter.value is not None:
26+
force_refresh = job_parameter.value.lower() in {"true", "1"}
27+
logger.info("Found force_refresh parameter in job run: %s", force_refresh)
28+
break
29+
return force_refresh
1630

1731
@job_task
1832
def crawl_tables(self, ctx: RuntimeContext):
@@ -21,8 +35,7 @@ def crawl_tables(self, ctx: RuntimeContext):
2135
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
2236
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
2337
cannot easily be migrated to Unity Catalog."""
24-
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
25-
ctx.tables_crawler.snapshot(force_refresh=force_refresh)
38+
ctx.tables_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))
2639

2740
@job_task
2841
def crawl_udfs(self, ctx: RuntimeContext):

src/databricks/labs/ucx/installer/workflows.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,14 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState):
249249
self._ws = ws
250250
self._install_state = install_state
251251

252-
def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20)) -> int:
252+
def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20),
253+
named_parameters: dict[str, str] | None = None ) -> int:
253254
# this dunder variable is hiding this method from tracebacks, making it cleaner
254255
# for the user to see the actual error without too much noise.
255256
__tracebackhide__ = True # pylint: disable=unused-variable
256257
job_id = int(self._install_state.jobs[step])
257258
logger.debug(f"starting {step} job: {self._ws.config.host}#job/{job_id}")
258-
job_initial_run = self._ws.jobs.run_now(job_id)
259+
job_initial_run = self._ws.jobs.run_now(job_id, job_parameters=named_parameters)
259260
run_id = job_initial_run.run_id
260261
if not run_id:
261262
raise NotFound(f"job run not found for {step}")

tests/integration/assessment/test_workflows.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,12 @@ def test_running_real_assessment_job(
4949
query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
5050
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
5151
assert not workflow_problems_without_path
52+
53+
post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one")
54+
installation_ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"})
55+
assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}"
56+
expected_tables = {managed_table.name, external_table.name, tmp_table.name, view.name, non_delta.name, post_assessment_table.name}
57+
58+
query = f"SELECT * FROM {installation_ctx.inventory_database}.tables"
59+
tables_after_assessment_rerun = [table.name for table in sql_backend.fetch(query)]
60+
assert tables_after_assessment_rerun == expected_tables

0 commit comments

Comments
 (0)