Skip to content

Commit b6727cd

Browse files
committed
Added scope for UC crawling.
1 parent fa8bcdb commit b6727cd

File tree

4 files changed

+31
-18
lines changed

4 files changed

+31
-18
lines changed

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def _migrate_tables(
123123
):
124124
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table)
125125
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
126-
self._init_seen_tables(scope={t.rule.as_uc_table_key for t in tables_to_migrate})
126+
self._init_seen_tables(scope={t.rule.as_uc_table for t in tables_to_migrate})
127127
tasks = []
128128
for table in tables_in_scope:
129129
tasks.append(
@@ -597,7 +597,7 @@ def print_revert_report(
597597
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command")
598598
return True
599599

600-
def _init_seen_tables(self, *, scope: set[str] | None = None):
600+
def _init_seen_tables(self, *, scope: set[Table] | None = None):
601601
self._seen_tables = self._migration_status_refresher.get_seen_tables(scope=scope)
602602

603603
def _sql_alter_to(self, table: Table, target_table_key: str):

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
from databricks.labs.ucx.framework.crawlers import CrawlerBase
1515
from databricks.labs.ucx.framework.utils import escape_sql_identifier
16-
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
16+
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, Table
1717

1818
logger = logging.getLogger(__name__)
1919

20+
2021
@dataclass
2122
class TableMigrationStatus:
2223
src_schema: str
@@ -95,11 +96,17 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
9596
def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
9697
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))
9798

98-
def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]:
99+
def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]:
99100
seen_tables: dict[str, str] = {}
100101
tasks = []
102+
schema_scope = {(table.catalog.lower(), table.database.lower()) for table in scope} if scope else None
103+
table_scope = {table.full_name.lower() for table in scope} if scope else None
101104
for schema in self._iter_schemas():
102-
if schema.catalog_name is None or schema.name is None:
105+
if (
106+
schema.catalog_name is None
107+
or schema.name is None
108+
or (schema_scope and (schema.catalog_name.lower(), schema.name.lower()) not in schema_scope)
109+
):
103110
continue
104111
tasks.append(
105112
partial(
@@ -121,7 +128,7 @@ def get_seen_tables(self, *, scope: set[str] | None = None) -> dict[str, str]:
121128
if not table.full_name:
122129
logger.warning(f"The table {table.name} in {schema.name} has no full name")
123130
continue
124-
if scope and table.full_name not in scope:
131+
if table_scope and table.full_name.lower() not in table_scope:
125132
continue
126133
if not table.properties or "upgraded_from" not in table.properties:
127134
continue
@@ -183,7 +190,10 @@ def _iter_catalogs(self) -> Iterable[CatalogInfo]:
183190

184191
def _iter_schemas(self) -> Iterable[SchemaInfo]:
185192
for catalog in self._iter_catalogs():
186-
if catalog.name is None or catalog.catalog_type in (CatalogType.DELTASHARING_CATALOG, CatalogType.SYSTEM_CATALOG):
193+
if catalog.name is None or catalog.catalog_type in (
194+
CatalogType.DELTASHARING_CATALOG,
195+
CatalogType.SYSTEM_CATALOG,
196+
):
187197
continue
188198
try:
189199
yield from self._ws.schemas.list(catalog_name=catalog.name)
@@ -199,7 +209,9 @@ def _iter_tables(self, catalog_name: str, schema_name: str) -> list[TableInfo]:
199209
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception
200210
return list(self._ws.tables.list(catalog_name=catalog_name, schema_name=schema_name))
201211
except NotFound:
202-
logger.warning(f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status.")
212+
logger.warning(
213+
f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status."
214+
)
203215
return []
204216
except DatabricksError as e:
205217
logger.warning(f"Error while listing tables in schema: {schema_name}", exc_info=e)

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,24 +1043,24 @@ def test_table_status_seen_tables(caplog):
10431043
table_crawler = create_autospec(TablesCrawler)
10441044
client = create_autospec(WorkspaceClient)
10451045
client.catalogs.list.return_value = [CatalogInfo(name="cat1"), CatalogInfo(name="deleted_cat")]
1046-
schemas = { "cat1":
1047-
[
1046+
schemas = {
1047+
"cat1": [
10481048
SchemaInfo(catalog_name="cat1", name="schema1", full_name="cat1.schema1"),
10491049
SchemaInfo(catalog_name="cat1", name="deleted_schema", full_name="cat1.deleted_schema"),
10501050
],
1051-
"deleted_cat":
1052-
None,
1053-
}
1051+
"deleted_cat": None,
1052+
}
10541053

10551054
def schema_list(catalog_name):
10561055
schema = schemas[catalog_name]
10571056
if not schema:
10581057
raise NotFound()
10591058
return schema
1059+
10601060
client.schemas.list = schema_list
10611061

1062-
tables = { ("cat1", "schema1"):
1063-
[
1062+
tables = {
1063+
("cat1", "schema1"): [
10641064
TableInfo(
10651065
catalog_name="cat1",
10661066
schema_name="schema1",
@@ -1097,13 +1097,13 @@ def schema_list(catalog_name):
10971097
],
10981098
("cat1", "deleted_schema"): None,
10991099
}
1100+
11001101
def table_list(catalog_name, schema_name):
11011102
table = tables[(catalog_name, schema_name)]
11021103
if not table:
11031104
raise NotFound()
11041105
return table
11051106

1106-
11071107
client.tables.list = table_list
11081108
table_status_crawler = TableMigrationStatusRefresher(client, backend, "ucx", table_crawler)
11091109
seen_tables = table_status_crawler.get_seen_tables()

tests/unit/hive_metastore/test_table_migration_status.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from databricks.sdk.errors import BadRequest, DatabricksError, NotFound
77
from databricks.sdk.service.catalog import CatalogInfoSecurableKind, CatalogInfo, SchemaInfo, TableInfo
88

9-
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
9+
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, Table
1010
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
1111

1212

@@ -163,8 +163,9 @@ def tables_list(catalog_name: str, schema_name: str) -> Iterable[TableInfo]:
163163
tables_crawler = create_autospec(TablesCrawler)
164164
refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler)
165165

166+
scope_table = Table("test1", "test1", "test1", "Table", "Delta")
166167
# Test with scope
167-
assert refresher.get_seen_tables(scope={"test1.test1.test1"}) == {"test1.test1.test1": "test1"}
168+
assert refresher.get_seen_tables(scope={scope_table}) == {"test1.test1.test1": "test1"}
168169
# Test without scope
169170
assert refresher.get_seen_tables() == {"test1.test1.test1": "test1", "test2.test2.test2": "test2"}
170171
ws.tables.list.assert_called()

0 commit comments

Comments
 (0)