From 493f6ab358446368d6c78c46d6732457752a6c37 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Tue, 10 Dec 2024 16:56:36 -0500 Subject: [PATCH 1/9] Initial commit --- src/databricks/labs/ucx/cli.py | 14 ++++++++++++++ src/databricks/labs/ucx/contexts/application.py | 4 ++++ 2 files changed, 18 insertions(+) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 4b37e81165..df21d07c74 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -197,6 +197,20 @@ def create_table_mapping( if len(workspace_contexts) == 1: webbrowser.open(f"{w.config.host}/#workspace{path}") +@ucx.command +def create_directfs_mapping( + w: WorkspaceClient, + ctx: WorkspaceContext | None = None, + run_as_collection: bool = False, + a: AccountClient | None = None, +): + """Create DirectFS mapping for all the direcfs references in the workspace""" + workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) + + if ctx: + workspace_contexts = [ctx] + for workspace_context in workspace_contexts: + workspace_context.directfs_mapping.create_directfs_mapping() @ucx.command def validate_external_locations( diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 82c75324d3..b529df71a9 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -456,6 +456,10 @@ def iam_credential_manager(self) -> CredentialManager: def table_mapping(self) -> TableMapping: return TableMapping(self.installation, self.workspace_client, self.sql_backend) + @cached_property + def directfs_mapping(self) -> DirectFsMapping: + return DirectFsMapping(self.installation, self.workspace_client, self.sql_backend) + @cached_property def catalog_schema(self) -> CatalogSchema: return CatalogSchema(self.workspace_client, self.table_mapping, self.migrate_grants, self.config.ucx_catalog) From 5e16967076d6fa647651d2708b50b0298010f575 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Tue, 10 Dec 2024 16:59:57 -0500 Subject: [PATCH 2/9] Add new class --- src/databricks/labs/ucx/hive_metastore/directfs_mapping.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 src/databricks/labs/ucx/hive_metastore/directfs_mapping.py diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py new file mode 100644 index 0000000000..cea0b3a48d --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -0,0 +1,3 @@ +class TableMapping: + FILENAME = 'directfs_mapping.csv' + UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" From ac73ceff8b1f704306a13e0b5f2b939586a24794 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Tue, 10 Dec 2024 17:00:22 -0500 Subject: [PATCH 3/9] fix class name --- src/databricks/labs/ucx/hive_metastore/directfs_mapping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index cea0b3a48d..22a84f8d5d 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -1,3 +1,3 @@ -class TableMapping: +class DirectFsMapping: FILENAME = 'directfs_mapping.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" From 9e9e85ae68bb29bdb68368cd4f79c7983c412a78 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Tue, 10 Dec 2024 17:05:45 -0500 Subject: [PATCH 4/9] add init --- .../labs/ucx/hive_metastore/directfs_mapping.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index 22a84f8d5d..74c6fad981 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -1,3 +1,19 @@ +from databricks.labs.lsql.backends import SqlBackend +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.marketplace import Installation + + class DirectFsMapping: FILENAME = 'directfs_mapping.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" + + + def __init__( + self, + installation: Installation, + workspace_client: WorkspaceClient, + sql_backend: SqlBackend, + ) -> None: + self.installation = installation + self.workspace_client = workspace_client + self.sql_backend = sql_backend From 8ea622db4a3351bf5046143fa4a9f6ec78b2bc18 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 11 Dec 2024 06:14:31 -0500 Subject: [PATCH 5/9] add method to get all directfs access from the crawler snapshot --- .../labs/ucx/hive_metastore/directfs_mapping.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index 74c6fad981..6bfb9b2f26 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -2,6 +2,8 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.service.marketplace import Installation +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler + class DirectFsMapping: FILENAME = 'directfs_mapping.csv' @@ -17,3 +19,10 @@ def __init__( self.installation = installation self.workspace_client = workspace_client self.sql_backend = sql_backend + + + def directfs_list(self, directfs_crawler: DirectFsAccessCrawler): + """ + List all direct filesystem access records. + """ + return directfs_crawler.snapshot() From bf804618e38d7a25b82db6920f7c5532cd9ed675 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 11 Dec 2024 06:22:21 -0500 Subject: [PATCH 6/9] add DirectFsRule --- .../ucx/hive_metastore/directfs_mapping.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index 6bfb9b2f26..bac5b6c95d 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -1,9 +1,27 @@ +import logging +from dataclasses import dataclass + from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.service.marketplace import Installation from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler +logger = logging.getLogger(__name__) + +@dataclass +class DirectFsRule: + """ + A rule for direct filesystem access to UC table mapping. + """ + workspace_name:str + path: str + is_read: bool + is_write: bool + catalog_name: str + dst_schema: str + dst_table: str + class DirectFsMapping: FILENAME = 'directfs_mapping.csv' From d94f1b447b7bec673c44379c2ab1da1102e64626 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 11 Dec 2024 07:16:45 -0500 Subject: [PATCH 7/9] add initial functionality to fetch directfs access, compare with table locations and create mapping --- src/databricks/labs/ucx/cli.py | 8 +- .../labs/ucx/contexts/application.py | 1 + .../ucx/hive_metastore/directfs_mapping.py | 104 ++++++++++++++---- 3 files changed, 92 insertions(+), 21 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index df21d07c74..5865e3764b 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -197,6 +197,7 @@ def create_table_mapping( if len(workspace_contexts) == 1: webbrowser.open(f"{w.config.host}/#workspace{path}") + @ucx.command def create_directfs_mapping( w: WorkspaceClient, @@ -209,8 +210,11 @@ def create_directfs_mapping( if ctx: workspace_contexts = [ctx] - for workspace_context in workspace_contexts: - workspace_context.directfs_mapping.create_directfs_mapping() + for workspace_ctx in workspace_contexts: + workspace_ctx.directfs_mapping.save( + workspace_ctx.directfs_crawler, workspace_ctx.tables_crawler, workspace_ctx.workspace_info + ) + @ucx.command def validate_external_locations( diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index b529df71a9..d33c69193d 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -14,6 +14,7 @@ from databricks.labs.ucx.assessment.jobs import JobsCrawler from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler +from databricks.labs.ucx.hive_metastore.directfs_mapping import DirectFsMapping from databricks.labs.ucx.hive_metastore.pipelines_migrate import PipelinesMigrator from databricks.labs.ucx.recon.data_comparator import StandardDataComparator from databricks.labs.ucx.recon.data_profiler import StandardDataProfiler diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index bac5b6c95d..aa03a28a05 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -1,46 +1,112 @@ import logging +import re +from collections.abc import Iterable from dataclasses import dataclass +from databricks.labs.blueprint.installation import Installation from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient -from databricks.sdk.service.marketplace import Installation +from databricks.labs.ucx.account.workspaces import WorkspaceInfo +from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler logger = logging.getLogger(__name__) + @dataclass class DirectFsRule: - """ - A rule for direct filesystem access to UC table mapping. - """ - workspace_name:str - path: str - is_read: bool - is_write: bool - catalog_name: str - dst_schema: str - dst_table: str + """ + A rule for direct filesystem access to UC table mapping. + """ + + workspace_name: str + path: str + is_read: bool + is_write: bool + catalog_name: str + dst_schema: str + dst_table: str + + @classmethod + def initial( + cls, + workspace_name: str, + path: str, + is_read: bool, + is_write: bool, + catalog_name: str, + dst_schema: str, + dst_table: str, + ) -> "DirectFsRule": + return cls( + workspace_name=workspace_name, + path=path, + is_read=is_read, + is_write=is_write, + catalog_name=catalog_name, + dst_schema=dst_schema, + dst_table=dst_table, + ) class DirectFsMapping: FILENAME = 'directfs_mapping.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" - def __init__( self, installation: Installation, - workspace_client: WorkspaceClient, + ws: WorkspaceClient, sql_backend: SqlBackend, ) -> None: - self.installation = installation - self.workspace_client = workspace_client - self.sql_backend = sql_backend - + self._installation = installation + self._ws = ws + self._sql_backend = sql_backend - def directfs_list(self, directfs_crawler: DirectFsAccessCrawler): + def directfs_list( + self, + directfs_crawler: DirectFsAccessCrawler, + tables_crawler: TablesCrawler, + workspace_name: str, + catalog_name: str, + ) -> Iterable["DirectFsRule"]: """ List all direct filesystem access records. """ - return directfs_crawler.snapshot() + directfs_snapshot = list(directfs_crawler.snapshot()) + tables_snapshot = list(tables_crawler.snapshot()) + if not tables_snapshot: + msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run" + raise ValueError(msg) + if not directfs_snapshot: + msg = "No directfs references found in code" + raise ValueError(msg) + + # TODO: very inefficient search, just for initial testing + for table in tables_snapshot: + for directfs_record in directfs_snapshot: + if table.location: + if directfs_record.path in table.location: + yield DirectFsRule.initial( + workspace_name=workspace_name, + path=directfs_record.path, + is_read=directfs_record.is_read, + is_write=directfs_record.is_write, + catalog_name=catalog_name, + dst_schema=table.database, + dst_table=table.name, + ) + + def save( + self, directfs_crawler: DirectFsAccessCrawler, tables_crawler: TablesCrawler, workspace_info: WorkspaceInfo + ) -> str: + """ + Save direct filesystem access records to a CSV file. + """ + workspace_name = workspace_info.current() + default_catalog_name = re.sub(r"\W+", "_", workspace_name) + + directfs_records = self.directfs_list(directfs_crawler, tables_crawler, workspace_name, default_catalog_name) + + return self._installation.save(list(directfs_records), filename=self.FILENAME) From 74f38db05f069baa1527219bf2484d2e7b0bf28c Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 11 Dec 2024 10:05:06 -0500 Subject: [PATCH 8/9] use dump_all method to get directfs access --- src/databricks/labs/ucx/hive_metastore/directfs_mapping.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index aa03a28a05..a42ad2fd44 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -74,7 +74,8 @@ def directfs_list( """ List all direct filesystem access records. """ - directfs_snapshot = list(directfs_crawler.snapshot()) + directfs_snapshot = [] + directfs_crawler.dump_all(directfs_snapshot) tables_snapshot = list(tables_crawler.snapshot()) if not tables_snapshot: msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run" @@ -106,7 +107,8 @@ def save( """ workspace_name = workspace_info.current() default_catalog_name = re.sub(r"\W+", "_", workspace_name) + directfs_records = [] - directfs_records = self.directfs_list(directfs_crawler, tables_crawler, workspace_name, default_catalog_name) + directfs_records = self.directfs_list(directfs_crawler, tables_crawler, workspace_name, default_catalog_name) return self._installation.save(list(directfs_records), filename=self.FILENAME) From 16892808f40f9ecc4b65ea88c9287f635ad8285e Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 11 Dec 2024 11:18:06 -0500 Subject: [PATCH 9/9] update to use the snapshot from both directfs crawlers --- src/databricks/labs/ucx/cli.py | 4 +++- .../labs/ucx/hive_metastore/directfs_mapping.py | 17 ++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 5865e3764b..5722a1917f 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -212,7 +212,9 @@ def create_directfs_mapping( workspace_contexts = [ctx] for workspace_ctx in workspace_contexts: workspace_ctx.directfs_mapping.save( - workspace_ctx.directfs_crawler, workspace_ctx.tables_crawler, workspace_ctx.workspace_info + [workspace_ctx.directfs_access_crawler_for_paths, workspace_ctx.directfs_access_crawler_for_queries], + workspace_ctx.tables_crawler, + workspace_ctx.workspace_info, ) diff --git a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py index a42ad2fd44..ab5c9714a1 100644 --- a/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/directfs_mapping.py @@ -66,7 +66,7 @@ def __init__( def directfs_list( self, - directfs_crawler: DirectFsAccessCrawler, + directfs_crawlers: list[DirectFsAccessCrawler], tables_crawler: TablesCrawler, workspace_name: str, catalog_name: str, @@ -75,7 +75,9 @@ def directfs_list( List all direct filesystem access records. """ directfs_snapshot = [] - directfs_crawler.dump_all(directfs_snapshot) + for crawler in directfs_crawlers: + for directfs_access in crawler.snapshot(): + directfs_snapshot.append(directfs_access) tables_snapshot = list(tables_crawler.snapshot()) if not tables_snapshot: msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run" @@ -85,6 +87,7 @@ def directfs_list( raise ValueError(msg) # TODO: very inefficient search, just for initial testing + # for table in tables_snapshot: for directfs_record in directfs_snapshot: if table.location: @@ -100,15 +103,15 @@ def directfs_list( ) def save( - self, directfs_crawler: DirectFsAccessCrawler, tables_crawler: TablesCrawler, workspace_info: WorkspaceInfo + self, + directfs_crawlers: list[DirectFsAccessCrawler], + tables_crawler: TablesCrawler, + workspace_info: WorkspaceInfo, ) -> str: """ Save direct filesystem access records to a CSV file. """ workspace_name = workspace_info.current() default_catalog_name = re.sub(r"\W+", "_", workspace_name) - directfs_records = [] - - - directfs_records = self.directfs_list(directfs_crawler, tables_crawler, workspace_name, default_catalog_name) + directfs_records = self.directfs_list(directfs_crawlers, tables_crawler, workspace_name, default_catalog_name) return self._installation.save(list(directfs_records), filename=self.FILENAME)