From d7e9ac4145ab74ee672585c1a1e532f86e4eb3e6 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 20 Feb 2025 11:09:18 -0500 Subject: [PATCH 01/17] Added functionality to scope down "seen_tables" --- .../labs/ucx/hive_metastore/table_migrate.py | 6 +++--- .../labs/ucx/hive_metastore/table_migration_status.py | 11 ++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index d58e6b8da4..02bbde824c 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -108,7 +108,6 @@ def migrate_tables( if what in [What.DB_DATASET, What.UNKNOWN]: logger.error(f"Can't migrate tables with type {what.name}") return None - self._init_seen_tables() if what == What.VIEW: return self._migrate_views() return self._migrate_tables( @@ -124,6 +123,7 @@ def _migrate_tables( ): tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) + self._init_seen_tables({t.rule.as_uc_table_key for t in tables_to_migrate}) tasks = [] for table in tables_in_scope: tasks.append( @@ -597,8 +597,8 @@ def print_revert_report( print("To revert and delete Migrated Tables, add --delete_managed true flag to the command") return True - def _init_seen_tables(self): - self._seen_tables = self._migration_status_refresher.get_seen_tables() + def _init_seen_tables(self, scope: set[str] | None = None): + self._seen_tables = self._migration_status_refresher.get_seen_tables(scope=scope) def _sql_alter_to(self, table: Table, target_table_key: str): return f"ALTER {table.kind} {escape_sql_identifier(table.key)} SET TBLPROPERTIES ('upgraded_to' = '{target_table_key}');" diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 8b766755fe..2af44eedcf 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -92,7 +92,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_ def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: return TableMigrationIndex(self.snapshot(force_refresh=force_refresh)) - def get_seen_tables(self) -> dict[str, str]: + def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: seen_tables: dict[str, str] = {} for schema in self._iter_schemas(): if schema.catalog_name is None or schema.name is None: @@ -107,13 +107,14 @@ def get_seen_tables(self) -> dict[str, str]: logger.warning(f"Error while listing tables in schema: {schema.full_name}", exc_info=e) continue for table in tables: - if not table.properties: - continue - if "upgraded_from" not in table.properties: - continue if not table.full_name: logger.warning(f"The table {table.name} in {schema.name} has no full name") continue + if scope and table.full_name not in scope: + continue + if not table.properties or "upgraded_from" not in table.properties: + continue + seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() return seen_tables From 923f4da19a2cf55ae9aefd79652132a3f0b22b98 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 20 Feb 2025 11:23:47 -0500 Subject: [PATCH 02/17] Added unit test. --- .../test_table_migration_status.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/unit/hive_metastore/test_table_migration_status.py b/tests/unit/hive_metastore/test_table_migration_status.py index ad9350e92c..f9da1af5d8 100644 --- a/tests/unit/hive_metastore/test_table_migration_status.py +++ b/tests/unit/hive_metastore/test_table_migration_status.py @@ -119,3 +119,55 @@ def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]: ws.schemas.list.assert_called_once_with(catalog_name="test") # System is NOT called ws.tables.list.assert_called() tables_crawler.snapshot.assert_not_called() + + +def test_table_migration_status_refresher_scope( + mock_backend +) -> None: + ws = create_autospec(WorkspaceClient) + ws.catalogs.list.return_value = [ + CatalogInfo(name="test1"), + CatalogInfo(name="test2"), + ] + + def schemas_list(catalog_name: str) -> Iterable[SchemaInfo]: + schemas = [ + SchemaInfo(catalog_name="test1", name="test1"), + SchemaInfo(catalog_name="test1", name="test2"), + ] + for schema in schemas: + if schema.catalog_name == catalog_name: + yield schema + + def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]: + tables = [ + TableInfo( + full_name="test1.test1.test1", + catalog_name="test1", + schema_name="test1", + name="test1", + properties={"upgraded_from": "test1"}, + ), + TableInfo( + full_name="test2.test2.test2", + catalog_name="test2", + schema_name="test2", + name="test2", + properties={"upgraded_from": "test2"}, + ), + ] + for table in tables: + if table.catalog_name == catalog_name and table.schema_name == schema_name: + yield table + + ws.schemas.list.side_effect = schemas_list + ws.tables.list.side_effect = tables_list + tables_crawler = create_autospec(TablesCrawler) + refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler) + + seen_tables = refresher.get_seen_tables(scope={"test1.test1.test1"}) + + assert seen_tables == {"test1.test1.test1": "test1"} + ws.catalogs.list.assert_called_once() # System is NOT called + ws.tables.list.assert_called() + tables_crawler.snapshot.assert_not_called() From 525c9b68a8598e6a5d4e5fbd9b629f280fb7a248 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 20 Feb 2025 11:26:29 -0500 Subject: [PATCH 03/17] Added unit test. --- .../unit/hive_metastore/test_table_migration_status.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/unit/hive_metastore/test_table_migration_status.py b/tests/unit/hive_metastore/test_table_migration_status.py index f9da1af5d8..aaa090755a 100644 --- a/tests/unit/hive_metastore/test_table_migration_status.py +++ b/tests/unit/hive_metastore/test_table_migration_status.py @@ -133,7 +133,7 @@ def test_table_migration_status_refresher_scope( def schemas_list(catalog_name: str) -> Iterable[SchemaInfo]: schemas = [ SchemaInfo(catalog_name="test1", name="test1"), - SchemaInfo(catalog_name="test1", name="test2"), + SchemaInfo(catalog_name="test2", name="test2"), ] for schema in schemas: if schema.catalog_name == catalog_name: @@ -165,9 +165,9 @@ def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]: tables_crawler = create_autospec(TablesCrawler) refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler) - seen_tables = refresher.get_seen_tables(scope={"test1.test1.test1"}) - - assert seen_tables == {"test1.test1.test1": "test1"} - ws.catalogs.list.assert_called_once() # System is NOT called + # Test with scope + assert refresher.get_seen_tables(scope={"test1.test1.test1"}) == {"test1.test1.test1": "test1"} + # Test without scope + assert refresher.get_seen_tables() == {"test1.test1.test1": "test1", "test2.test2.test2": "test2"} ws.tables.list.assert_called() tables_crawler.snapshot.assert_not_called() From 519112cd478bd6140515179556b356361de8bf37 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 13:35:37 -0500 Subject: [PATCH 04/17] Fixed scoping param --- src/databricks/labs/ucx/hive_metastore/table_migrate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 02bbde824c..e636e17efd 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -123,7 +123,7 @@ def _migrate_tables( ): tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) - self._init_seen_tables({t.rule.as_uc_table_key for t in tables_to_migrate}) + self._init_seen_tables(scope={t.rule.as_uc_table_key for t in tables_to_migrate}) tasks = [] for table in tables_in_scope: tasks.append( @@ -597,7 +597,7 @@ def print_revert_report( print("To revert and delete Migrated Tables, add --delete_managed true flag to the command") return True - def _init_seen_tables(self, scope: set[str] | None = None): + def _init_seen_tables(self,*, scope: set[str] | None = None): self._seen_tables = self._migration_status_refresher.get_seen_tables(scope=scope) def _sql_alter_to(self, table: Table, target_table_key: str): From d13ba47acf1bfa1dafca5b1f0d285e98cce51170 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 14:17:53 -0500 Subject: [PATCH 05/17] Parallelized seen tables --- .../labs/ucx/hive_metastore/table_migrate.py | 2 +- .../hive_metastore/table_migration_status.py | 41 ++++++++++++++----- .../test_table_migration_status.py | 4 +- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index e636e17efd..af70cad60b 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -597,7 +597,7 @@ def print_revert_report( print("To revert and delete Migrated Tables, add --delete_managed true flag to the command") return True - def _init_seen_tables(self,*, scope: set[str] | None = None): + def _init_seen_tables(self, *, scope: set[str] | None = None): self._seen_tables = self._migration_status_refresher.get_seen_tables(scope=scope) def _sql_alter_to(self, table: Table, target_table_key: str): diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 2af44eedcf..eb4bf53a9d 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -2,12 +2,14 @@ import logging from dataclasses import dataclass, replace from collections.abc import Iterable, KeysView +from functools import partial from typing import ClassVar +from databricks.labs.blueprint.parallel import Threads from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError, NotFound -from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo +from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo, TableInfo from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.utils import escape_sql_identifier @@ -94,19 +96,27 @@ def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: seen_tables: dict[str, str] = {} + tasks = [] for schema in self._iter_schemas(): if schema.catalog_name is None or schema.name is None: continue - try: - # ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception - tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name)) - except NotFound: - logger.warning(f"Schema {schema.full_name} no longer exists. Skipping checking its migration status.") - continue - except DatabricksError as e: - logger.warning(f"Error while listing tables in schema: {schema.full_name}", exc_info=e) - continue + tasks.append( + partial( + self._iter_tables, + schema.catalog_name, + schema.name, + ) + ) + tables: list = [] + table_lists = Threads.gather("migrate tables", tasks) + # Combine tuple of lists to a list + for table_list in table_lists: + if table_list is not None: + tables.extend(table_list) for table in tables: + if not isinstance(table, TableInfo): + logger.warning(f"Table {table} is not an instance of TableInfo") + continue if not table.full_name: logger.warning(f"The table {table.name} in {schema.name} has no full name") continue @@ -182,3 +192,14 @@ def _iter_schemas(self) -> Iterable[SchemaInfo]: except DatabricksError as e: logger.warning(f"Error while listing schemas in catalog: {catalog.name}", exc_info=e) continue + + def _iter_tables(self, catalog_name: str, schema_name: str) -> list[TableInfo]: + try: + # ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception + return list(self._ws.tables.list(catalog_name=catalog_name, schema_name=schema_name)) + except NotFound: + logger.warning(f"Schema {schema_name} no longer exists. Skipping checking its migration status.") + return [] + except DatabricksError as e: + logger.warning(f"Error while listing tables in schema: {schema_name}", exc_info=e) + return [] diff --git a/tests/unit/hive_metastore/test_table_migration_status.py b/tests/unit/hive_metastore/test_table_migration_status.py index aaa090755a..6315b23daa 100644 --- a/tests/unit/hive_metastore/test_table_migration_status.py +++ b/tests/unit/hive_metastore/test_table_migration_status.py @@ -121,9 +121,7 @@ def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]: tables_crawler.snapshot.assert_not_called() -def test_table_migration_status_refresher_scope( - mock_backend -) -> None: +def test_table_migration_status_refresher_scope(mock_backend) -> None: ws = create_autospec(WorkspaceClient) ws.catalogs.list.return_value = [ CatalogInfo(name="test1"), From 53b99dd318009f04d137656c1cc38f85f21a01e4 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 14:49:01 -0500 Subject: [PATCH 06/17] Addressed Testing Issues --- .../hive_metastore/table_migration_status.py | 4 +-- .../unit/hive_metastore/test_table_migrate.py | 29 +++++++++++++++---- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index eb4bf53a9d..173dc57e97 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -110,7 +110,7 @@ def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: tables: list = [] table_lists = Threads.gather("migrate tables", tasks) # Combine tuple of lists to a list - for table_list in table_lists: + for table_list in table_lists[0]: if table_list is not None: tables.extend(table_list) for table in tables: @@ -198,7 +198,7 @@ def _iter_tables(self, catalog_name: str, schema_name: str) -> list[TableInfo]: # ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception return list(self._ws.tables.list(catalog_name=catalog_name, schema_name=schema_name)) except NotFound: - logger.warning(f"Schema {schema_name} no longer exists. Skipping checking its migration status.") + logger.warning(f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status.") return [] except DatabricksError as e: logger.warning(f"Error while listing tables in schema: {schema_name}", exc_info=e) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 1712852585..ce54b5c878 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1043,14 +1043,23 @@ def test_table_status_seen_tables(caplog): table_crawler = create_autospec(TablesCrawler) client = create_autospec(WorkspaceClient) client.catalogs.list.return_value = [CatalogInfo(name="cat1"), CatalogInfo(name="deleted_cat")] - client.schemas.list.side_effect = [ + schemas = { "cat1": [ SchemaInfo(catalog_name="cat1", name="schema1", full_name="cat1.schema1"), SchemaInfo(catalog_name="cat1", name="deleted_schema", full_name="cat1.deleted_schema"), ], - NotFound(), - ] - client.tables.list.side_effect = [ + "deleted_cat": + None, +} + + def schema_list(catalog_name): + schema = schemas[catalog_name] + if not schema: + raise NotFound() + return schema + client.schemas.list = schema_list + + tables = { ("cat1", "schema1"): [ TableInfo( catalog_name="cat1", @@ -1086,8 +1095,16 @@ def test_table_status_seen_tables(caplog): properties={"upgraded_from": "hive_metastore.schema1.table2"}, ), ], - NotFound(), - ] + ("cat1", "deleted_schema"): None, + } + def table_list(catalog_name, schema_name): + table = tables[(catalog_name, schema_name)] + if not table: + raise NotFound() + return table + + + client.tables.list = table_list table_status_crawler = TableMigrationStatusRefresher(client, backend, "ucx", table_crawler) seen_tables = table_status_crawler.get_seen_tables() assert seen_tables == { From 83c8dae9bf4b69b82cb557306ba754a138d76c30 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 15:26:48 -0500 Subject: [PATCH 07/17] Added API Rate Limit --- .../labs/ucx/hive_metastore/table_migration_status.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 173dc57e97..5b5ff0b208 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -17,7 +17,6 @@ logger = logging.getLogger(__name__) - @dataclass class TableMigrationStatus: src_schema: str @@ -86,6 +85,8 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]): CatalogInfoSecurableKind.CATALOG_SYSTEM, ] + API_LIMIT: ClassVar[int] = 20 + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_crawler: TablesCrawler): super().__init__(sql_backend, "hive_metastore", schema, "migration_status", TableMigrationStatus) self._ws = ws @@ -108,7 +109,7 @@ def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: ) ) tables: list = [] - table_lists = Threads.gather("migrate tables", tasks) + table_lists = Threads.gather("migrate tables", tasks, self.API_LIMIT) # Combine tuple of lists to a list for table_list in table_lists[0]: if table_list is not None: From 4b720b94f5f37823d426d144786642f11bcc5498 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 15:29:44 -0500 Subject: [PATCH 08/17] Fixed Crawl name --- .../labs/ucx/hive_metastore/table_migration_status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 5b5ff0b208..7725f88636 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -109,7 +109,7 @@ def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: ) ) tables: list = [] - table_lists = Threads.gather("migrate tables", tasks, self.API_LIMIT) + table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) # Combine tuple of lists to a list for table_list in table_lists[0]: if table_list is not None: From 5c1e4c4771de657f5d376396ef597c2fd4c0e295 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 15:32:14 -0500 Subject: [PATCH 09/17] Fixed Crawl name --- .../labs/ucx/hive_metastore/table_migration_status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 7725f88636..9a11407847 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -85,7 +85,7 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]): CatalogInfoSecurableKind.CATALOG_SYSTEM, ] - API_LIMIT: ClassVar[int] = 20 + API_LIMIT: ClassVar[int] = 25 def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_crawler: TablesCrawler): super().__init__(sql_backend, "hive_metastore", schema, "migration_status", TableMigrationStatus) From 4252fbaff47b830ceff50521836d297949c48c66 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 15:36:11 -0500 Subject: [PATCH 10/17] Skip Delta Sharing Catalog --- .../labs/ucx/hive_metastore/table_migration_status.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 9a11407847..9b5e1cfd7c 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -9,7 +9,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError, NotFound -from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo, TableInfo +from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo, TableInfo, CatalogType from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.utils import escape_sql_identifier @@ -183,7 +183,7 @@ def _iter_catalogs(self) -> Iterable[CatalogInfo]: def _iter_schemas(self) -> Iterable[SchemaInfo]: for catalog in self._iter_catalogs(): - if catalog.name is None: + if catalog.name is None or catalog.catalog_type in (CatalogType.DELTASHARING_CATALOG, CatalogType.SYSTEM_CATALOG): continue try: yield from self._ws.schemas.list(catalog_name=catalog.name) From 4945f9a147616621e9b704bfb22830c77b1d752d Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:05:01 -0500 Subject: [PATCH 11/17] Added scope for UC crawling. --- .../labs/ucx/hive_metastore/table_migrate.py | 4 ++-- .../hive_metastore/table_migration_status.py | 24 ++++++++++++++----- .../unit/hive_metastore/test_table_migrate.py | 16 ++++++------- .../test_table_migration_status.py | 5 ++-- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index af70cad60b..732a223f77 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -123,7 +123,7 @@ def _migrate_tables( ): tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) - self._init_seen_tables(scope={t.rule.as_uc_table_key for t in tables_to_migrate}) + self._init_seen_tables(scope={t.rule.as_uc_table for t in tables_to_migrate}) tasks = [] for table in tables_in_scope: tasks.append( @@ -597,7 +597,7 @@ def print_revert_report( print("To revert and delete Migrated Tables, add --delete_managed true flag to the command") return True - def _init_seen_tables(self, *, scope: set[str] | None = None): + def _init_seen_tables(self, *, scope: set[Table] | None = None): self._seen_tables = self._migration_status_refresher.get_seen_tables(scope=scope) def _sql_alter_to(self, table: Table, target_table_key: str): diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 9b5e1cfd7c..31cb6ef2a6 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -13,10 +13,11 @@ from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.utils import escape_sql_identifier -from databricks.labs.ucx.hive_metastore.tables import TablesCrawler +from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, Table logger = logging.getLogger(__name__) + @dataclass class TableMigrationStatus: src_schema: str @@ -95,11 +96,17 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_ def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: return TableMigrationIndex(self.snapshot(force_refresh=force_refresh)) - def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: + def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: seen_tables: dict[str, str] = {} tasks = [] + schema_scope = {(table.catalog.lower(), table.database.lower()) for table in scope} if scope else None + table_scope = {table.full_name.lower() for table in scope} if scope else None for schema in self._iter_schemas(): - if schema.catalog_name is None or schema.name is None: + if ( + schema.catalog_name is None + or schema.name is None + or (schema_scope and (schema.catalog_name.lower(), schema.name.lower()) not in schema_scope) + ): continue tasks.append( partial( @@ -121,7 +128,7 @@ def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]: if not table.full_name: logger.warning(f"The table {table.name} in {schema.name} has no full name") continue - if scope and table.full_name not in scope: + if table_scope and table.full_name.lower() not in table_scope: continue if not table.properties or "upgraded_from" not in table.properties: continue @@ -183,7 +190,10 @@ def _iter_catalogs(self) -> Iterable[CatalogInfo]: def _iter_schemas(self) -> Iterable[SchemaInfo]: for catalog in self._iter_catalogs(): - if catalog.name is None or catalog.catalog_type in (CatalogType.DELTASHARING_CATALOG, CatalogType.SYSTEM_CATALOG): + if catalog.name is None or catalog.catalog_type in ( + CatalogType.DELTASHARING_CATALOG, + CatalogType.SYSTEM_CATALOG, + ): continue try: yield from self._ws.schemas.list(catalog_name=catalog.name) @@ -199,7 +209,9 @@ def _iter_tables(self, catalog_name: str, schema_name: str) -> list[TableInfo]: # ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception return list(self._ws.tables.list(catalog_name=catalog_name, schema_name=schema_name)) except NotFound: - logger.warning(f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status.") + logger.warning( + f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status." + ) return [] except DatabricksError as e: logger.warning(f"Error while listing tables in schema: {schema_name}", exc_info=e) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index ce54b5c878..45e876e159 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1043,24 +1043,24 @@ def test_table_status_seen_tables(caplog): table_crawler = create_autospec(TablesCrawler) client = create_autospec(WorkspaceClient) client.catalogs.list.return_value = [CatalogInfo(name="cat1"), CatalogInfo(name="deleted_cat")] - schemas = { "cat1": - [ + schemas = { + "cat1": [ SchemaInfo(catalog_name="cat1", name="schema1", full_name="cat1.schema1"), SchemaInfo(catalog_name="cat1", name="deleted_schema", full_name="cat1.deleted_schema"), ], - "deleted_cat": - None, -} + "deleted_cat": None, + } def schema_list(catalog_name): schema = schemas[catalog_name] if not schema: raise NotFound() return schema + client.schemas.list = schema_list - tables = { ("cat1", "schema1"): - [ + tables = { + ("cat1", "schema1"): [ TableInfo( catalog_name="cat1", schema_name="schema1", @@ -1097,13 +1097,13 @@ def schema_list(catalog_name): ], ("cat1", "deleted_schema"): None, } + def table_list(catalog_name, schema_name): table = tables[(catalog_name, schema_name)] if not table: raise NotFound() return table - client.tables.list = table_list table_status_crawler = TableMigrationStatusRefresher(client, backend, "ucx", table_crawler) seen_tables = table_status_crawler.get_seen_tables() diff --git a/tests/unit/hive_metastore/test_table_migration_status.py b/tests/unit/hive_metastore/test_table_migration_status.py index 6315b23daa..1203fd12b3 100644 --- a/tests/unit/hive_metastore/test_table_migration_status.py +++ b/tests/unit/hive_metastore/test_table_migration_status.py @@ -6,7 +6,7 @@ from databricks.sdk.errors import BadRequest, DatabricksError, NotFound from databricks.sdk.service.catalog import CatalogInfoSecurableKind, CatalogInfo, SchemaInfo, TableInfo -from databricks.labs.ucx.hive_metastore.tables import TablesCrawler +from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, Table from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher @@ -163,8 +163,9 @@ def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]: tables_crawler = create_autospec(TablesCrawler) refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler) + scope_table = Table("test1", "test1", "test1", "Table", "Delta") # Test with scope - assert refresher.get_seen_tables(scope={"test1.test1.test1"}) == {"test1.test1.test1": "test1"} + assert refresher.get_seen_tables(scope={scope_table}) == {"test1.test1.test1": "test1"} # Test without scope assert refresher.get_seen_tables() == {"test1.test1.test1": "test1", "test2.test2.test2": "test2"} ws.tables.list.assert_called() From ef60201f58d2936b47745440e2637800c1d6f6f5 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:13:25 -0500 Subject: [PATCH 12/17] Fixed Unit Test --- tests/unit/hive_metastore/test_table_migrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 45e876e159..db5996d79c 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -498,7 +498,7 @@ def test_migrate_already_upgraded_table_should_produce_no_queries(ws, mock_pyspa table_crawler = TablesCrawler(crawler_backend, "inventory_database") ws.catalogs.list.return_value = [CatalogInfo(name="cat1")] ws.schemas.list.return_value = [ - SchemaInfo(catalog_name="cat1", name="test_schema1"), + SchemaInfo(catalog_name="cat1", name="schema1"), ] ws.tables.list.return_value = [ TableInfo( From 25727d56484066d0ec1c627a5cd70d458cf1e64b Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:34:39 -0500 Subject: [PATCH 13/17] Skip "information_schema" --- src/databricks/labs/ucx/hive_metastore/table_migration_status.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 31cb6ef2a6..dfba4269cd 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -105,6 +105,7 @@ def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: if ( schema.catalog_name is None or schema.name is None + or schema.name == "information_schema" or (schema_scope and (schema.catalog_name.lower(), schema.name.lower()) not in schema_scope) ): continue From d1de5434370ad500ab4a6baae432c15fed554498 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:38:13 -0500 Subject: [PATCH 14/17] Fixed Loop --- .../hive_metastore/table_migration_status.py | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index dfba4269cd..a6eb56dc1b 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -116,23 +116,22 @@ def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: schema.name, ) ) - tables: list = [] - table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) - # Combine tuple of lists to a list - for table_list in table_lists[0]: - if table_list is not None: - tables.extend(table_list) - for table in tables: - if not isinstance(table, TableInfo): - logger.warning(f"Table {table} is not an instance of TableInfo") - continue - if not table.full_name: - logger.warning(f"The table {table.name} in {schema.name} has no full name") - continue - if table_scope and table.full_name.lower() not in table_scope: - continue - if not table.properties or "upgraded_from" not in table.properties: - continue + tables: list = [] + table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) + # Combine tuple of lists to a list + for table_list in table_lists: + tables.extend(table_list) + for table in tables: + if not isinstance(table, TableInfo): + logger.warning(f"Table {table} is not an instance of TableInfo") + continue + if not table.full_name: + logger.warning(f"The table {table.name} in {schema.name} has no full name") + continue + if table_scope and table.full_name.lower() not in table_scope: + continue + if not table.properties or "upgraded_from" not in table.properties: + continue seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() return seen_tables From 543b6ecdd7b0aceb9b5c46b77152b53d5fdd48fd Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:39:31 -0500 Subject: [PATCH 15/17] Fixed Loop --- .../labs/ucx/hive_metastore/table_migration_status.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index a6eb56dc1b..da2faf1d4d 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -126,14 +126,14 @@ def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: logger.warning(f"Table {table} is not an instance of TableInfo") continue if not table.full_name: - logger.warning(f"The table {table.name} in {schema.name} has no full name") + logger.warning(f"The table {table.name} in {table.schema_name} has no full name") continue if table_scope and table.full_name.lower() not in table_scope: continue if not table.properties or "upgraded_from" not in table.properties: continue - seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() + seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() return seen_tables def is_migrated(self, schema: str, table: str) -> bool: From 817b7840be95d597fba5f29fc363fe8d4efbbb8f Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:42:24 -0500 Subject: [PATCH 16/17] Address Testing Issue --- .../labs/ucx/hive_metastore/table_migration_status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index da2faf1d4d..f37b33b771 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -119,7 +119,7 @@ def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: tables: list = [] table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) # Combine tuple of lists to a list - for table_list in table_lists: + for table_list in table_lists[0]: tables.extend(table_list) for table in tables: if not isinstance(table, TableInfo): From c51ee78a1e08d0abe15681c1d18f7a2b2aba852e Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 21 Feb 2025 16:45:45 -0500 Subject: [PATCH 17/17] Added count message --- src/databricks/labs/ucx/hive_metastore/table_migration_status.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index f37b33b771..d307986dd6 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -117,6 +117,7 @@ def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: ) ) tables: list = [] + logger.info(f"Scanning {len(tasks)} schemas for tables") table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) # Combine tuple of lists to a list for table_list in table_lists[0]: