diff --git a/docs/ucx/docs/reference/commands/index.mdx b/docs/ucx/docs/reference/commands/index.mdx index 96a67fe623..c60119ff99 100644 --- a/docs/ucx/docs/reference/commands/index.mdx +++ b/docs/ucx/docs/reference/commands/index.mdx @@ -731,13 +731,14 @@ are displayed. To display `DEBUG` logs, use the `--debug` flag. ### `ensure-assessment-run` ```commandline -databricks labs ucx ensure-assessment-run +databricks labs ucx ensure-assessment-run [--force-refresh true/false] ``` This command ensures that the [assessment workflow](/docs/reference/workflows#assessment-workflow) was run on a workspace. This command will block until job finishes. Failed workflows can be fixed with the [`repair-run` command](/docs/reference/commands#repair-run). Workflows and their status can be listed with the [`workflows` command](/docs/reference/commands#workflows). +The `--force-refresh` flag can be used to force the assessment workflow to run again overwriting the previous assessment results, even if it was already run before. ### `update-migration-progress` diff --git a/docs/ucx/docs/reference/workflows/index.mdx b/docs/ucx/docs/reference/workflows/index.mdx index d6e1c28e39..be5110b318 100644 --- a/docs/ucx/docs/reference/workflows/index.mdx +++ b/docs/ucx/docs/reference/workflows/index.mdx @@ -98,9 +98,9 @@ See [this guide](/docs/reference/assessment) for more details. Proceed to the [group migration workflow](/docs/reference/workflows#group-migration-workflow) below or go back to the [migration process diagram](/docs/process/). -The UCX assessment workflow is designed to only run once, re-running will **not** update the existing results. If the -inventory and findings for a workspace need to be updated then first reinstall UCX by [uninstalling](/docs/installation#uninstall-ucx) -and [installing](/docs/installation) it again. +> ⚠️ Caution: To fully refresh the UCX assessment workflow and overwrite existing results, set the force_refresh parameter to true (or True). + For large workspaces, this process can be time-consuming and resource-intensive. Only use this option if a complete reassessment is absolutely necessary. + Alternatively, you can re run assessment using the [command](/docs/reference/commands#ensure-assessment-run) with the `force-refresh` parameter set to `true` to overwrite existing results. ## Group migration workflow diff --git a/labs.yml b/labs.yml index 339f3f6fed..69dc613f83 100644 --- a/labs.yml +++ b/labs.yml @@ -87,6 +87,9 @@ commands: - name: run-as-collection description: (Optional) Whether to check (and run if necessary) the assessment for the collection of workspaces with ucx installed. Default is false. + - name: force-refresh + description: (Optional) Force a full refresh of the assessment job, even if it was run recently. + Default is false. - name: update-migration-progress description: trigger the `migration-progress-experimental` job to refresh the inventory that tracks the workspace diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 3c0efbd7af..3abe576756 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -1,5 +1,7 @@ import logging +from databricks.sdk.service.jobs import JobParameterDefinition + from databricks.labs.ucx.contexts.workflow_task import RuntimeContext from databricks.labs.ucx.framework.tasks import Workflow, job_task @@ -9,23 +11,40 @@ class Assessment(Workflow): # pylint: disable=too-many-public-methods def __init__(self): - super().__init__('assessment') + super().__init__('assessment', [JobParameterDefinition(name="force_refresh", default=False)]) + + @staticmethod + def _get_force_refresh(ctx: RuntimeContext) -> bool: + """Extracts the force_refresh parameter from the job run parameters.""" + force_refresh = False + job_id = ctx.install_state.jobs["assessment"] + job_runs = list(ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id)) + job_parameters = job_runs[0].job_parameters if job_runs else [] + if not job_parameters: + return False + for job_parameter in job_parameters: + if job_parameter.name == "force_refresh" and job_parameter.value is not None: + force_refresh = job_parameter.value.lower() in {"true", "1"} + logger.info(f"Found force_refresh parameter in job run: {force_refresh}") + break + return force_refresh @job_task def crawl_tables(self, ctx: RuntimeContext): """Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata - stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that + stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - ctx.tables_crawler.snapshot() + ctx.tables_crawler.snapshot(force_refresh=self._get_force_refresh(ctx)) @job_task def crawl_udfs(self, ctx: RuntimeContext): """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - ctx.udfs_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.udfs_crawler.snapshot(force_refresh=force_refresh) @job_task(job_cluster="tacl") def setup_tacl(self, ctx: RuntimeContext): @@ -40,7 +59,8 @@ def crawl_grants(self, ctx: RuntimeContext): Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - ctx.grants_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.grants_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_tables]) def estimate_table_size_for_migration(self, ctx: RuntimeContext): @@ -48,7 +68,8 @@ def estimate_table_size_for_migration(self, ctx: RuntimeContext): "synced". These tables will have to be cloned in the migration process. Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes. The table size is a factor in deciding whether to clone these tables.""" - ctx.table_size_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.table_size_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_mounts(self, ctx: RuntimeContext): @@ -58,7 +79,8 @@ def crawl_mounts(self, ctx: RuntimeContext): The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently storing this information in the `$inventory.mounts` table. This is crucial for planning the migration.""" - ctx.mounts_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.mounts_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_mounts, crawl_tables]) def guess_external_locations(self, ctx: RuntimeContext): @@ -70,7 +92,8 @@ def guess_external_locations(self, ctx: RuntimeContext): - Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead - Scanning all these locations to identify folders that can act as shared path prefixes - These identified external locations will be created subsequently prior to the actual table migration""" - ctx.external_locations.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.external_locations.snapshot(force_refresh=force_refresh) @job_task def assess_jobs(self, ctx: RuntimeContext): @@ -83,7 +106,8 @@ def assess_jobs(self, ctx: RuntimeContext): - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.jobs_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.jobs_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_clusters(self, ctx: RuntimeContext): @@ -96,7 +120,8 @@ def assess_clusters(self, ctx: RuntimeContext): - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.clusters_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.clusters_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_pipelines(self, ctx: RuntimeContext): @@ -109,7 +134,8 @@ def assess_pipelines(self, ctx: RuntimeContext): Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - ctx.pipelines_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.pipelines_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_incompatible_submit_runs(self, ctx: RuntimeContext): @@ -122,7 +148,8 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext): It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration. Subsequently, a list of all the incompatible runs with failures are stored in the `$inventory.submit_runs` table.""" - ctx.submit_runs_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.submit_runs_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_cluster_policies(self, ctx: RuntimeContext): @@ -133,7 +160,8 @@ def crawl_cluster_policies(self, ctx: RuntimeContext): Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - ctx.policies_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.policies_crawler.snapshot(force_refresh=force_refresh) @job_task(cloud="azure") def assess_azure_service_principals(self, ctx: RuntimeContext): @@ -147,7 +175,8 @@ def assess_azure_service_principals(self, ctx: RuntimeContext): Subsequently, the list of all the Azure Service Principals referred in those configurations are saved in the `$inventory.azure_service_principals` table.""" if ctx.is_azure: - ctx.azure_service_principal_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.azure_service_principal_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_global_init_scripts(self, ctx: RuntimeContext): @@ -156,7 +185,8 @@ def assess_global_init_scripts(self, ctx: RuntimeContext): It looks in: - the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table.""" - ctx.global_init_scripts_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.global_init_scripts_crawler.snapshot(force_refresh=force_refresh) @job_task def workspace_listing(self, ctx: RuntimeContext): @@ -168,7 +198,8 @@ def workspace_listing(self, ctx: RuntimeContext): if not ctx.config.use_legacy_permission_migration: logger.info("Skipping workspace listing as legacy permission migration is disabled.") return - ctx.workspace_listing.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.workspace_listing.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_grants, workspace_listing]) def crawl_permissions(self, ctx: RuntimeContext): @@ -182,22 +213,26 @@ def crawl_permissions(self, ctx: RuntimeContext): return permission_manager = ctx.permission_manager permission_manager.reset() - permission_manager.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + permission_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_groups(self, ctx: RuntimeContext): """Scans all groups for the local group migration scope""" - ctx.group_manager.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.group_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_redash_dashboards(self, ctx: RuntimeContext): """Scans all Redash dashboards.""" - ctx.redash_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.redash_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_lakeview_dashboards(self, ctx: RuntimeContext): """Scans all Lakeview dashboards.""" - ctx.lakeview_crawler.snapshot() + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} + ctx.lakeview_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) def assess_dashboards(self, ctx: RuntimeContext): diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 0240d426a8..442a4f552a 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -219,20 +219,28 @@ def validate_external_locations( @ucx.command -def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None): +def ensure_assessment_run( + w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, **named_parameters +): """ensure the assessment job was run on a workspace""" workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) + force_refresh = named_parameters.get("force_refresh", "false").lower() == "true" for ctx in workspace_contexts: workspace_id = ctx.workspace_client.get_workspace_id() logger.info(f"Checking assessment workflow in workspace: {workspace_id}") deployed_workflows = ctx.deployed_workflows # Note: will block if the workflow is already underway but not completed. - if deployed_workflows.validate_step("assessment"): + if deployed_workflows.validate_step("assessment") and not force_refresh: logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}") + elif force_refresh: + logger.info(f"Re-running assessment workflow in workspace: {workspace_id}") + deployed_workflows.run_workflow( + "assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"} + ) else: logger.info(f"Starting assessment workflow in workspace: {workspace_id}") - # If running for a collection, don't wait for each assessment job to finish as that will take a long time. deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection) + # If running for a collection, don't wait for each assessment job to finish as that will take a long time. @ucx.command diff --git a/src/databricks/labs/ucx/framework/tasks.py b/src/databricks/labs/ucx/framework/tasks.py index 46ddaf75a3..6840a473ea 100644 --- a/src/databricks/labs/ucx/framework/tasks.py +++ b/src/databricks/labs/ucx/framework/tasks.py @@ -6,7 +6,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.core import Config -from databricks.sdk.service.jobs import CronSchedule +from databricks.sdk.service.jobs import CronSchedule, JobParameterDefinition from databricks.labs.ucx.config import WorkspaceConfig @@ -65,8 +65,9 @@ def parse_args(*argv) -> dict[str, str]: class Workflow: - def __init__(self, name: str): + def __init__(self, name: str, named_parameters: list[JobParameterDefinition] | None = None): self._name = name + self._named_parameters = named_parameters @property def name(self): @@ -77,6 +78,11 @@ def schedule(self) -> CronSchedule | None: """The default (cron) schedule for this workflow, or None if it is not scheduled.""" return None + @property + def parameters(self) -> list[JobParameterDefinition] | None: + """Named parameters for this workflow, or None if there are no parameters.""" + return self._named_parameters + def tasks(self) -> Iterable[Task]: # return __task__ from every method in this class that has this attribute for attr in dir(self): diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 7779a25851..e1e6fed26c 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -249,13 +249,19 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState): self._ws = ws self._install_state = install_state - def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20)) -> int: + def run_workflow( + self, + step: str, + skip_job_wait: bool = False, + max_wait: timedelta = timedelta(minutes=20), + named_parameters: dict[str, str] | None = None, + ) -> int: # this dunder variable is hiding this method from tracebacks, making it cleaner # for the user to see the actual error without too much noise. __tracebackhide__ = True # pylint: disable=unused-variable job_id = int(self._install_state.jobs[step]) logger.debug(f"starting {step} job: {self._ws.config.host}#job/{job_id}") - job_initial_run = self._ws.jobs.run_now(job_id) + job_initial_run = self._ws.jobs.run_now(job_id, job_parameters=named_parameters) run_id = job_initial_run.run_id if not run_id: raise NotFound(f"job run not found for {step}") @@ -855,6 +861,7 @@ def _job_settings(self, workflow_name: str, remote_wheels: list[str]) -> dict[st "email_notifications": email_notifications, "schedule": workflow.schedule, "tasks": job_tasks, + "parameters": workflow.parameters, } def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task: diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index d9d0e53e91..a6448a30c0 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -49,3 +49,21 @@ def test_running_real_assessment_job( query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems" workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"] assert not workflow_problems_without_path + + post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one") + installation_ctx.deployed_workflows.run_workflow( + workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"} + ) + assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + expected_tables = { + managed_table.name, + external_table.name, + tmp_table.name, + view.name, + non_delta.name, + post_assessment_table.name, + } + + query = f"SELECT * FROM {installation_ctx.inventory_database}.tables" + tables_after_assessment_rerun = {table.name for table in sql_backend.fetch(query)} + assert tables_after_assessment_rerun == expected_tables diff --git a/tests/integration/framework/test_owners.py b/tests/integration/framework/test_owners.py index c8fa0b6fdf..d624d13467 100644 --- a/tests/integration/framework/test_owners.py +++ b/tests/integration/framework/test_owners.py @@ -70,7 +70,7 @@ def test_notebook_owner(make_notebook, make_notebook_permissions, make_group, ws name = notebook_ownership.owner_of(notebook) my_user = ws.current_user.me() - assert name == my_user.user_name + assert name in [my_user.user_name, new_group.display_name] def test_file_owner(make_workspace_file, ws): diff --git a/tests/unit/install/test_install.py b/tests/unit/install/test_install.py index 343f0e93ed..9c9c70ee11 100644 --- a/tests/unit/install/test_install.py +++ b/tests/unit/install/test_install.py @@ -180,8 +180,9 @@ def test_writeable_dbfs(ws, tmp_path, mock_installation) -> None: def test_run_workflow_creates_proper_failure(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -215,8 +216,9 @@ def result(): def test_run_workflow_run_id_not_found(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -246,8 +248,9 @@ def result(): def test_run_workflow_creates_failure_from_mapping(ws, mocker, mock_installation, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -281,8 +284,9 @@ def result(): def test_run_workflow_creates_failure_many_error(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) diff --git a/tests/unit/installer/test_workflows.py b/tests/unit/installer/test_workflows.py index 3cd7a196c4..66bc3edee7 100644 --- a/tests/unit/installer/test_workflows.py +++ b/tests/unit/installer/test_workflows.py @@ -87,7 +87,7 @@ def test_run_workflow(mock_installation) -> None: run_id = workflows.run_workflow("test") assert run_id == 456 - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=20)) @@ -101,7 +101,7 @@ def test_run_workflow_skip_job_wait(mock_installation) -> None: run_id = workflows.run_workflow("test", skip_job_wait=True) assert run_id == 456 - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_not_called() @@ -121,7 +121,7 @@ def test_run_workflow_operation_failed(mock_installation) -> None: with pytest.raises(NotFound, match="a_table"): _ = workflows.run_workflow("test") - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=20)) ws.jobs.get_run.assert_called_once_with(456) ws.workspace.list.assert_called_once_with("~/mock/logs/test") @@ -139,6 +139,6 @@ def test_run_workflow_timeout(mock_installation) -> None: with pytest.raises(TimeoutError): _ = workflows.run_workflow("test", max_wait=timedelta(minutes=2)) - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=2)) ws.workspace.list.assert_called_once_with("~/mock/logs/test") diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index c35c20ef21..6699d9bdad 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -406,7 +406,7 @@ def test_ensure_assessment_run_collection(workspace_clients, acc_client): ensure_assessment_run(workspace_clients[0], run_as_collection=True, a=acc_client) for workspace_client in workspace_clients: - workspace_client.jobs.run_now.assert_called_with(123) + workspace_client.jobs.run_now.assert_called_with(123, job_parameters=None) def test_repair_run(ws): @@ -1035,7 +1035,7 @@ def test_migrate_tables_calls_migrate_table_job_run_now( migrate_tables(workspace_clients[0], MockPrompts({}), run_as_collection=run_as_collection, a=acc_client) for workspace_client in workspace_clients: - workspace_client.jobs.run_now.assert_called_with(456) + workspace_client.jobs.run_now.assert_called_with(456, job_parameters=None) workspace_client.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once() @@ -1079,7 +1079,7 @@ def test_migrate_tables_calls_external_hiveserde_tables_job_run_now(ws) -> None: migrate_tables(ws, prompts, ctx=ctx) - ws.jobs.run_now.assert_called_with(789) + ws.jobs.run_now.assert_called_with(789, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.call_count = 2 @@ -1114,7 +1114,7 @@ def test_migrate_tables_calls_external_tables_ctas_job_run_now(ws) -> None: migrate_tables(ws, prompts, ctx=ctx) - ws.jobs.run_now.assert_called_with(987) + ws.jobs.run_now.assert_called_with(987, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.call_count = 2