Skip to content

Commit 3f6da0d

Browse files
authored
Skip listing built-in catalogs to update table migration process (#3464)
## Changes Skip listing built-in catalogs to update table migration process ### Linked issues Resolves #3462 ### Functionality - [x] modified existing workflow: `migrate-tables` ### Tests - [x] added unit tests
1 parent e768af2 commit 3f6da0d

File tree

2 files changed

+72
-7
lines changed

2 files changed

+72
-7
lines changed

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from databricks.labs.lsql.backends import SqlBackend
88
from databricks.sdk import WorkspaceClient
99
from databricks.sdk.errors import DatabricksError, NotFound
10-
from databricks.sdk.service.catalog import CatalogInfo
10+
from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo
1111

1212
from databricks.labs.ucx.framework.crawlers import CrawlerBase
1313
from databricks.labs.ucx.framework.utils import escape_sql_identifier
@@ -79,6 +79,11 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]):
7979
properties for the presence of the marker.
8080
"""
8181

82+
_skip_catalog_securable_kinds = [
83+
CatalogInfoSecurableKind.CATALOG_INTERNAL,
84+
CatalogInfoSecurableKind.CATALOG_SYSTEM,
85+
]
86+
8287
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_crawler: TablesCrawler):
8388
super().__init__(sql_backend, "hive_metastore", schema, "migration_status", TableMigrationStatus)
8489
self._ws = ws
@@ -90,6 +95,8 @@ def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
9095
def get_seen_tables(self) -> dict[str, str]:
9196
seen_tables: dict[str, str] = {}
9297
for schema in self._iter_schemas():
98+
if schema.catalog_name is None or schema.name is None:
99+
continue
93100
try:
94101
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception
95102
tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name))
@@ -136,9 +143,7 @@ def _crawl(self) -> Iterable[TableMigrationStatus]:
136143
src_schema = table.database.lower()
137144
src_table = table.name.lower()
138145
table_migration_status = TableMigrationStatus(
139-
src_schema=src_schema,
140-
src_table=src_table,
141-
update_ts=str(timestamp),
146+
src_schema=src_schema, src_table=src_table, update_ts=str(timestamp)
142147
)
143148
if table.key in reverse_seen and self.is_migrated(src_schema, src_table):
144149
target_table = reverse_seen[table.key]
@@ -157,12 +162,17 @@ def _try_fetch(self) -> Iterable[TableMigrationStatus]:
157162

158163
def _iter_catalogs(self) -> Iterable[CatalogInfo]:
159164
try:
160-
yield from self._ws.catalogs.list()
165+
for catalog in self._ws.catalogs.list():
166+
if catalog.securable_kind in self._skip_catalog_securable_kinds:
167+
continue
168+
yield catalog
161169
except DatabricksError as e:
162170
logger.error("Cannot list catalogs", exc_info=e)
163171

164-
def _iter_schemas(self):
172+
def _iter_schemas(self) -> Iterable[SchemaInfo]:
165173
for catalog in self._iter_catalogs():
174+
if catalog.name is None:
175+
continue
166176
try:
167177
yield from self._ws.schemas.list(catalog_name=catalog.name)
168178
except NotFound:

tests/unit/hive_metastore/test_table_migration_status.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
from collections.abc import Iterable
12
from unittest.mock import create_autospec
23

34
import pytest
45
from databricks.sdk import WorkspaceClient
56
from databricks.sdk.errors import BadRequest, DatabricksError, NotFound
6-
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo
7+
from databricks.sdk.service.catalog import CatalogInfoSecurableKind, CatalogInfo, SchemaInfo, TableInfo
78

89
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
910
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
@@ -64,3 +65,57 @@ def test_table_migration_status_refresher_get_seen_tables_handles_errors_on_tabl
6465
ws.schemas.list.assert_called_once()
6566
ws.tables.list.assert_called_once()
6667
tables_crawler.snapshot.assert_not_called()
68+
69+
70+
@pytest.mark.parametrize(
71+
"securable_kind",
72+
[
73+
CatalogInfoSecurableKind.CATALOG_INTERNAL,
74+
CatalogInfoSecurableKind.CATALOG_SYSTEM,
75+
],
76+
)
77+
def test_table_migration_status_refresher_get_seen_tables_skips_builtin_catalog(
78+
mock_backend, securable_kind: CatalogInfoSecurableKind
79+
) -> None:
80+
ws = create_autospec(WorkspaceClient)
81+
ws.catalogs.list.return_value = [
82+
CatalogInfo(name="test"),
83+
CatalogInfo(name="system", securable_kind=securable_kind),
84+
]
85+
86+
def schemas_list(catalog_name: str) -> Iterable[SchemaInfo]:
87+
schemas = [
88+
SchemaInfo(catalog_name="test", name="test"),
89+
SchemaInfo(catalog_name="system", name="access"),
90+
]
91+
for schema in schemas:
92+
if schema.catalog_name == catalog_name:
93+
yield schema
94+
95+
def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]:
96+
tables = [
97+
TableInfo(
98+
full_name="test.test.test",
99+
catalog_name="test",
100+
schema_name="test",
101+
name="test",
102+
properties={"upgraded_from": "test"},
103+
),
104+
TableInfo(catalog_name="system", schema_name="access", name="audit"),
105+
]
106+
for table in tables:
107+
if table.catalog_name == catalog_name and table.schema_name == schema_name:
108+
yield table
109+
110+
ws.schemas.list.side_effect = schemas_list
111+
ws.tables.list.side_effect = tables_list
112+
tables_crawler = create_autospec(TablesCrawler)
113+
refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler)
114+
115+
seen_tables = refresher.get_seen_tables()
116+
117+
assert seen_tables == {"test.test.test": "test"}
118+
ws.catalogs.list.assert_called_once()
119+
ws.schemas.list.assert_called_once_with(catalog_name="test") # System is NOT called
120+
ws.tables.list.assert_called()
121+
tables_crawler.snapshot.assert_not_called()

0 commit comments

Comments
 (0)