diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 4b37e81165..5722a1917f 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -198,6 +198,26 @@ def create_table_mapping( webbrowser.open(f"{w.config.host}/#workspace{path}") +@ucx.command +def create_directfs_mapping( + w: WorkspaceClient, + ctx: WorkspaceContext | None = None, + run_as_collection: bool = False, + a: AccountClient | None = None, +): + """Create DirectFS mapping for all the direcfs references in the workspace""" + workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) + + if ctx: + workspace_contexts = [ctx] + for workspace_ctx in workspace_contexts: + workspace_ctx.directfs_mapping.save( + [workspace_ctx.directfs_access_crawler_for_paths, workspace_ctx.directfs_access_crawler_for_queries], + workspace_ctx.tables_crawler, + workspace_ctx.workspace_info, + ) + + @ucx.command def validate_external_locations( w: WorkspaceClient, diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 82c75324d3..d33c69193d 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -14,6 +14,7 @@ from databricks.labs.ucx.assessment.jobs import JobsCrawler from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler +from databricks.labs.ucx.hive_metastore.directfs_mapping import DirectFsMapping from databricks.labs.ucx.hive_metastore.pipelines_migrate import PipelinesMigrator from databricks.labs.ucx.recon.data_comparator import StandardDataComparator from databricks.labs.ucx.recon.data_profiler import StandardDataProfiler @@ -456,6 +457,10 @@ def iam_credential_manager(self) -> CredentialManager: def table_mapping(self) -> TableMapping: return TableMapping(self.installation, self.workspace_client, self.sql_backend) + @cached_property + def directfs_mapping(self) -> DirectFsMapping: + return DirectFsMapping(self.installation, self.workspace_client, self.sql_backend) + @cached_property def catalog_schema(self) -> CatalogSchema: return CatalogSchema(self.workspace_client, self.table_mapping, self.migrate_grants, self.config.ucx_catalog) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py new file mode 100644 index 0000000000..ab5c9714a1 --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -0,0 +1,117 @@ +import logging +import re +from collections.abc import Iterable +from dataclasses import dataclass + +from databricks.labs.blueprint.installation import Installation +from databricks.labs.lsql.backends import SqlBackend +from databricks.sdk import WorkspaceClient + +from databricks.labs.ucx.account.workspaces import WorkspaceInfo +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler + +logger = logging.getLogger(__name__) + + +@dataclass +class DirectFsRule: + """ + A rule for direct filesystem access to UC table mapping. + """ + + workspace_name: str + path: str + is_read: bool + is_write: bool + catalog_name: str + dst_schema: str + dst_table: str + + @classmethod + def initial( + cls, + workspace_name: str, + path: str, + is_read: bool, + is_write: bool, + catalog_name: str, + dst_schema: str, + dst_table: str, + ) -> "DirectFsRule": + return cls( + workspace_name=workspace_name, + path=path, + is_read=is_read, + is_write=is_write, + catalog_name=catalog_name, + dst_schema=dst_schema, + dst_table=dst_table, + ) + + +class DirectFsMapping: + FILENAME = 'directfs_mapping.csv' + UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" + + def __init__( + self, + installation: Installation, + ws: WorkspaceClient, + sql_backend: SqlBackend, + ) -> None: + self._installation = installation + self._ws = ws + self._sql_backend = sql_backend + + def directfs_list( + self, + directfs_crawlers: list[DirectFsAccessCrawler], + tables_crawler: TablesCrawler, + workspace_name: str, + catalog_name: str, + ) -> Iterable["DirectFsRule"]: + """ + List all direct filesystem access records. + """ + directfs_snapshot = [] + for crawler in directfs_crawlers: + for directfs_access in crawler.snapshot(): + directfs_snapshot.append(directfs_access) + tables_snapshot = list(tables_crawler.snapshot()) + if not tables_snapshot: + msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run" + raise ValueError(msg) + if not directfs_snapshot: + msg = "No directfs references found in code" + raise ValueError(msg) + + # TODO: very inefficient search, just for initial testing + # + for table in tables_snapshot: + for directfs_record in directfs_snapshot: + if table.location: + if directfs_record.path in table.location: + yield DirectFsRule.initial( + workspace_name=workspace_name, + path=directfs_record.path, + is_read=directfs_record.is_read, + is_write=directfs_record.is_write, + catalog_name=catalog_name, + dst_schema=table.database, + dst_table=table.name, + ) + + def save( + self, + directfs_crawlers: list[DirectFsAccessCrawler], + tables_crawler: TablesCrawler, + workspace_info: WorkspaceInfo, + ) -> str: + """ + Save direct filesystem access records to a CSV file. + """ + workspace_name = workspace_info.current() + default_catalog_name = re.sub(r"\W+", "_", workspace_name) + directfs_records = self.directfs_list(directfs_crawlers, tables_crawler, workspace_name, default_catalog_name) + return self._installation.save(list(directfs_records), filename=self.FILENAME)