Skip to content

Added force_refresh parameter for Assessment #4183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/ucx/docs/reference/commands/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,14 @@ are displayed. To display `DEBUG` logs, use the `--debug` flag.
### `ensure-assessment-run`

```commandline
databricks labs ucx ensure-assessment-run
databricks labs ucx ensure-assessment-run [--force-refresh true/false]
```

This command ensures that the [assessment workflow](/docs/reference/workflows#assessment-workflow) was run on a workspace.
This command will block until job finishes.
Failed workflows can be fixed with the [`repair-run` command](/docs/reference/commands#repair-run). Workflows and their status can be
listed with the [`workflows` command](/docs/reference/commands#workflows).
The `--force-refresh` flag can be used to force the assessment workflow to run again overwriting the previous assessment results, even if it was already run before.

### `update-migration-progress`

Expand Down
6 changes: 3 additions & 3 deletions docs/ucx/docs/reference/workflows/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ See [this guide](/docs/reference/assessment) for more details.
Proceed to the [group migration workflow](/docs/reference/workflows#group-migration-workflow) below or go back to the
[migration process diagram](/docs/process/).

The UCX assessment workflow is designed to only run once, re-running will **not** update the existing results. If the
inventory and findings for a workspace need to be updated then first reinstall UCX by [uninstalling](/docs/installation#uninstall-ucx)
and [installing](/docs/installation) it again.
> ⚠️ Caution: To fully refresh the UCX assessment workflow and overwrite existing results, set the force_refresh parameter to true (or True).
For large workspaces, this process can be time-consuming and resource-intensive. Only use this option if a complete reassessment is absolutely necessary.
Alternatively, you can re run assessment using the [command](/docs/reference/commands#ensure-assessment-run) with the `force-refresh` parameter set to `true` to overwrite existing results.

## Group migration workflow

Expand Down
3 changes: 3 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ commands:
- name: run-as-collection
description: (Optional) Whether to check (and run if necessary) the assessment for the collection of workspaces
with ucx installed. Default is false.
- name: force-refresh
description: (Optional) Force a full refresh of the assessment job, even if it was run recently.
Default is false.

- name: update-migration-progress
description: trigger the `migration-progress-experimental` job to refresh the inventory that tracks the workspace
Expand Down
75 changes: 55 additions & 20 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from databricks.sdk.service.jobs import JobParameterDefinition

from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task

Expand All @@ -9,23 +11,40 @@

class Assessment(Workflow): # pylint: disable=too-many-public-methods
def __init__(self):
super().__init__('assessment')
super().__init__('assessment', [JobParameterDefinition(name="force_refresh", default=False)])

@staticmethod
def _get_force_refresh(ctx: RuntimeContext) -> bool:
"""Extracts the force_refresh parameter from the job run parameters."""
force_refresh = False
job_id = ctx.install_state.jobs["assessment"]
job_runs = list(ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id))
job_parameters = job_runs[0].job_parameters if job_runs else []
if not job_parameters:
return False
for job_parameter in job_parameters:
if job_parameter.name == "force_refresh" and job_parameter.value is not None:
force_refresh = job_parameter.value.lower() in {"true", "1"}
logger.info(f"Found force_refresh parameter in job run: {force_refresh}")
break
return force_refresh

@job_task
def crawl_tables(self, ctx: RuntimeContext):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""
ctx.tables_crawler.snapshot()
ctx.tables_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_udfs(self, ctx: RuntimeContext):
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
issues with grants that cannot be migrated to Unit Catalog."""
ctx.udfs_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.udfs_crawler.snapshot(force_refresh=force_refresh)

@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
Expand All @@ -40,15 +59,17 @@ def crawl_grants(self, ctx: RuntimeContext):

Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
ACLs enabled and available for retrieval."""
ctx.grants_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.grants_crawler.snapshot(force_refresh=force_refresh)

@job_task(depends_on=[crawl_tables])
def estimate_table_size_for_migration(self, ctx: RuntimeContext):
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
"synced". These tables will have to be cloned in the migration process.
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
The table size is a factor in deciding whether to clone these tables."""
ctx.table_size_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.table_size_crawler.snapshot(force_refresh=force_refresh)

@job_task
def crawl_mounts(self, ctx: RuntimeContext):
Expand All @@ -58,7 +79,8 @@ def crawl_mounts(self, ctx: RuntimeContext):

The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
ctx.mounts_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.mounts_crawler.snapshot(force_refresh=force_refresh)

@job_task(depends_on=[crawl_mounts, crawl_tables])
def guess_external_locations(self, ctx: RuntimeContext):
Expand All @@ -70,7 +92,8 @@ def guess_external_locations(self, ctx: RuntimeContext):
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
- Scanning all these locations to identify folders that can act as shared path prefixes
- These identified external locations will be created subsequently prior to the actual table migration"""
ctx.external_locations.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.external_locations.snapshot(force_refresh=force_refresh)

@job_task
def assess_jobs(self, ctx: RuntimeContext):
Expand All @@ -83,7 +106,8 @@ def assess_jobs(self, ctx: RuntimeContext):
- Clusters with incompatible Spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.jobs_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.jobs_crawler.snapshot(force_refresh=force_refresh)

@job_task
def assess_clusters(self, ctx: RuntimeContext):
Expand All @@ -96,7 +120,8 @@ def assess_clusters(self, ctx: RuntimeContext):
- Clusters with incompatible spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.clusters_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.clusters_crawler.snapshot(force_refresh=force_refresh)

@job_task
def assess_pipelines(self, ctx: RuntimeContext):
Expand All @@ -109,7 +134,8 @@ def assess_pipelines(self, ctx: RuntimeContext):

Subsequently, a list of all the pipelines with matching configurations are stored in the
`$inventory.pipelines` table."""
ctx.pipelines_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.pipelines_crawler.snapshot(force_refresh=force_refresh)

@job_task
def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
Expand All @@ -122,7 +148,8 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
Subsequently, a list of all the incompatible runs with failures are stored in the
`$inventory.submit_runs` table."""
ctx.submit_runs_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.submit_runs_crawler.snapshot(force_refresh=force_refresh)

@job_task
def crawl_cluster_policies(self, ctx: RuntimeContext):
Expand All @@ -133,7 +160,8 @@ def crawl_cluster_policies(self, ctx: RuntimeContext):

Subsequently, a list of all the policies with matching configurations are stored in the
`$inventory.policies` table."""
ctx.policies_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.policies_crawler.snapshot(force_refresh=force_refresh)

@job_task(cloud="azure")
def assess_azure_service_principals(self, ctx: RuntimeContext):
Expand All @@ -147,7 +175,8 @@ def assess_azure_service_principals(self, ctx: RuntimeContext):
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
in the `$inventory.azure_service_principals` table."""
if ctx.is_azure:
ctx.azure_service_principal_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.azure_service_principal_crawler.snapshot(force_refresh=force_refresh)

@job_task
def assess_global_init_scripts(self, ctx: RuntimeContext):
Expand All @@ -156,7 +185,8 @@ def assess_global_init_scripts(self, ctx: RuntimeContext):

It looks in:
- the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table."""
ctx.global_init_scripts_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.global_init_scripts_crawler.snapshot(force_refresh=force_refresh)

@job_task
def workspace_listing(self, ctx: RuntimeContext):
Expand All @@ -168,7 +198,8 @@ def workspace_listing(self, ctx: RuntimeContext):
if not ctx.config.use_legacy_permission_migration:
logger.info("Skipping workspace listing as legacy permission migration is disabled.")
return
ctx.workspace_listing.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.workspace_listing.snapshot(force_refresh=force_refresh)

@job_task(depends_on=[crawl_grants, workspace_listing])
def crawl_permissions(self, ctx: RuntimeContext):
Expand All @@ -182,22 +213,26 @@ def crawl_permissions(self, ctx: RuntimeContext):
return
permission_manager = ctx.permission_manager
permission_manager.reset()
permission_manager.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
permission_manager.snapshot(force_refresh=force_refresh)

@job_task
def crawl_groups(self, ctx: RuntimeContext):
"""Scans all groups for the local group migration scope"""
ctx.group_manager.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.group_manager.snapshot(force_refresh=force_refresh)

@job_task
def crawl_redash_dashboards(self, ctx: RuntimeContext):
"""Scans all Redash dashboards."""
ctx.redash_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.redash_crawler.snapshot(force_refresh=force_refresh)

@job_task
def crawl_lakeview_dashboards(self, ctx: RuntimeContext):
"""Scans all Lakeview dashboards."""
ctx.lakeview_crawler.snapshot()
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"}
ctx.lakeview_crawler.snapshot(force_refresh=force_refresh)

@job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards])
def assess_dashboards(self, ctx: RuntimeContext):
Expand Down
14 changes: 11 additions & 3 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,28 @@ def validate_external_locations(


@ucx.command
def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None):
def ensure_assessment_run(
w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, **named_parameters
):
"""ensure the assessment job was run on a workspace"""
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection)
force_refresh = named_parameters.get("force_refresh", "false").lower() == "true"
for ctx in workspace_contexts:
workspace_id = ctx.workspace_client.get_workspace_id()
logger.info(f"Checking assessment workflow in workspace: {workspace_id}")
deployed_workflows = ctx.deployed_workflows
# Note: will block if the workflow is already underway but not completed.
if deployed_workflows.validate_step("assessment"):
if deployed_workflows.validate_step("assessment") and not force_refresh:
logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}")
elif force_refresh:
logger.info(f"Re-running assessment workflow in workspace: {workspace_id}")
deployed_workflows.run_workflow(
"assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"}
)
else:
logger.info(f"Starting assessment workflow in workspace: {workspace_id}")
# If running for a collection, don't wait for each assessment job to finish as that will take a long time.
deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection)
# If running for a collection, don't wait for each assessment job to finish as that will take a long time.


@ucx.command
Expand Down
10 changes: 8 additions & 2 deletions src/databricks/labs/ucx/framework/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import Config
from databricks.sdk.service.jobs import CronSchedule
from databricks.sdk.service.jobs import CronSchedule, JobParameterDefinition

from databricks.labs.ucx.config import WorkspaceConfig

Expand Down Expand Up @@ -65,8 +65,9 @@ def parse_args(*argv) -> dict[str, str]:


class Workflow:
def __init__(self, name: str):
def __init__(self, name: str, named_parameters: list[JobParameterDefinition] | None = None):
self._name = name
self._named_parameters = named_parameters

@property
def name(self):
Expand All @@ -77,6 +78,11 @@ def schedule(self) -> CronSchedule | None:
"""The default (cron) schedule for this workflow, or None if it is not scheduled."""
return None

@property
def parameters(self) -> list[JobParameterDefinition] | None:
"""Named parameters for this workflow, or None if there are no parameters."""
return self._named_parameters

def tasks(self) -> Iterable[Task]:
# return __task__ from every method in this class that has this attribute
for attr in dir(self):
Expand Down
11 changes: 9 additions & 2 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,19 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState):
self._ws = ws
self._install_state = install_state

def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20)) -> int:
def run_workflow(
self,
step: str,
skip_job_wait: bool = False,
max_wait: timedelta = timedelta(minutes=20),
named_parameters: dict[str, str] | None = None,
) -> int:
# this dunder variable is hiding this method from tracebacks, making it cleaner
# for the user to see the actual error without too much noise.
__tracebackhide__ = True # pylint: disable=unused-variable
job_id = int(self._install_state.jobs[step])
logger.debug(f"starting {step} job: {self._ws.config.host}#job/{job_id}")
job_initial_run = self._ws.jobs.run_now(job_id)
job_initial_run = self._ws.jobs.run_now(job_id, job_parameters=named_parameters)
run_id = job_initial_run.run_id
if not run_id:
raise NotFound(f"job run not found for {step}")
Expand Down Expand Up @@ -855,6 +861,7 @@ def _job_settings(self, workflow_name: str, remote_wheels: list[str]) -> dict[st
"email_notifications": email_notifications,
"schedule": workflow.schedule,
"tasks": job_tasks,
"parameters": workflow.parameters,
}

def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task:
Expand Down
18 changes: 18 additions & 0 deletions tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,21 @@ def test_running_real_assessment_job(
query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
assert not workflow_problems_without_path

post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one")
installation_ctx.deployed_workflows.run_workflow(
workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"}
)
assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}"
expected_tables = {
managed_table.name,
external_table.name,
tmp_table.name,
view.name,
non_delta.name,
post_assessment_table.name,
}

query = f"SELECT * FROM {installation_ctx.inventory_database}.tables"
tables_after_assessment_rerun = {table.name for table in sql_backend.fetch(query)}
assert tables_after_assessment_rerun == expected_tables
2 changes: 1 addition & 1 deletion tests/integration/framework/test_owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_notebook_owner(make_notebook, make_notebook_permissions, make_group, ws
name = notebook_ownership.owner_of(notebook)

my_user = ws.current_user.me()
assert name == my_user.user_name
assert name in [my_user.user_name, new_group.display_name]


def test_file_owner(make_workspace_file, ws):
Expand Down
Loading
Loading