diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index a3656b290b..bb8db8cf32 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -22,10 +22,10 @@ from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler -from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder -from databricks.labs.ucx.progress.grants import GrantProgressEncoder +from databricks.labs.ucx.progress.grants import Grant, GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]: ) @cached_property - def grants_progress(self) -> GrantProgressEncoder: + def grants_progress(self) -> ProgressEncoder[Grant]: return GrantProgressEncoder( self.sql_backend, self.grant_ownership, @@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: ) @cached_property - def tables_progress(self) -> TableProgressEncoder: + def tables_progress(self) -> ProgressEncoder[Table]: return TableProgressEncoder( self.sql_backend, self.table_ownership, - self.migration_status_refresher.index(force_refresh=False), + self.migration_status_refresher, self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index dde5f17790..16252a05a1 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -53,7 +53,7 @@ def key(self): class TableMigrationIndex: - def __init__(self, tables: list[TableMigrationStatus]): + def __init__(self, tables: Iterable[TableMigrationStatus]): self._index = {(ms.src_schema, ms.src_table): ms for ms in tables} def is_migrated(self, schema: str, table: str) -> bool: diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index b1f2847807..e7b89306ff 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -1,12 +1,11 @@ from __future__ import annotations import dataclasses import datetime as dt -import typing import json import logging from enum import Enum, EnumMeta from collections.abc import Iterable, Sequence -from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints from databricks.labs.lsql.backends import SqlBackend @@ -106,7 +105,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ # are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism # captures the type hints prior to resolution (which happens later in the class initialization process). # As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly. - klass_type_hints = typing.get_type_hints(klass) + klass_type_hints = get_type_hints(klass) field_names = [field.name for field in dataclasses.fields(klass)] field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names} if "failures" not in field_names_with_types: @@ -280,11 +279,10 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" - @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: history_records = [self._encode_record_as_historical(record) for record in snapshot] logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + # The mode is 'append'. This is documented as conflict-free. self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") def _encode_record_as_historical(self, record: Record) -> Historical: diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6dc76132e2..4922e8efd2 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,14 +1,21 @@ +import logging +from collections.abc import Iterable from dataclasses import replace from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus, TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +logger = logging.getLogger(__name__) + + class TableProgressEncoder(ProgressEncoder[Table]): """Encoder class:Table to class:History. @@ -21,7 +28,7 @@ def __init__( self, sql_backend: SqlBackend, ownership: TableOwnership, - table_migration_index: TableMigrationIndex, + migration_status_refresher: CrawlerBase[TableMigrationStatus], run_id: int, workspace_id: int, catalog: str, @@ -38,17 +45,24 @@ def __init__( schema, table, ) - self._table_migration_index = table_migration_index + self._migration_status_refresher = migration_status_refresher + + def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: + migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) + history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") + # The mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Table) -> Historical: - """Encode record as historical. + def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: + """Encode a table record, enriching with the migration status. - A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant + A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ historical = super()._encode_record_as_historical(record) failures = [] - if not self._table_migration_index.is_migrated(record.database, record.name): + if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index a859bf5c03..baf08de26d 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -4,9 +4,11 @@ from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationStatus, +) from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -19,21 +21,21 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = True - grant_progress_encoder = create_autospec(GrantProgressEncoder) + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None), + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert rows, f"No rows written for: {encoder.full_name}" assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() + migration_status_crawler.snapshot.assert_called_once() @pytest.mark.parametrize( @@ -45,11 +47,12 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = False - grant_progress_encoder = create_autospec(GrantProgressEncoder) + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated. + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) @@ -58,5 +61,4 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T assert len(rows) > 0, f"No rows written for: {encoder.full_name}" assert rows[0].failures == ["Pending migration"] ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() + migration_status_crawler.snapshot.assert_called_once() diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 5119587f15..1c0dd28f14 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -4,6 +4,7 @@ import pytest from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment @@ -44,8 +45,8 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history mock_history_log.append_inventory_snapshot.assert_called_once() -def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: - """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" +def test_migration_progress_runtime_tables_refresh_crawl_tables(run_workflow) -> None: + """Ensure that step 1 of the split crawl/update-history-log tasks performs its part of the refresh process.""" mock_tables_crawler = create_autospec(TablesCrawler) mock_history_log = create_autospec(ProgressEncoder) context_replacements = { @@ -54,13 +55,44 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: "named_parameters": {"parent_run_id": 53}, } - # The first part of a 2-step update: the crawl without updating the history log. + # The first part of a 3-step update: the table crawl without updating the history log. run_workflow(MigrationProgress.crawl_tables, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True) mock_history_log.append_inventory_snapshot.assert_not_called() - mock_tables_crawler.snapshot.reset_mock() - # The second part of the 2-step update: updating the history log (without a forced crawl). + +def test_migration_progress_runtime_tables_refresh_migration_status(run_workflow) -> None: + """Ensure that step 2 of the split crawl/update-history-log tasks performs its part of the refresh process.""" + mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher) + mock_history_log = create_autospec(ProgressEncoder) + context_replacements = { + "migration_status_refresher": mock_migration_status_refresher, + "tables_progress": mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + + # The second part of a 3-step update: updating table migration status without updating the history log. + task_dependencies = getattr(MigrationProgress.refresh_table_migration_status, "__task__").depends_on + assert MigrationProgress.crawl_tables.__name__ in task_dependencies + run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements) + mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True) + mock_history_log.append_inventory_snapshot.assert_not_called() + + +def test_migration_progress_runtime_tables_refresh_update_history_log(run_workflow) -> None: + """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" + mock_tables_crawler = create_autospec(TablesCrawler) + mock_history_log = create_autospec(ProgressEncoder) + context_replacements = { + "tables_crawler": mock_tables_crawler, + "tables_progress": mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + + # The final part of the 3-step update: updating the history log (without a forced crawl). + task_dependencies = getattr(MigrationProgress.update_tables_history_log, "__task__").depends_on + assert MigrationProgress.crawl_tables.__name__ in task_dependencies + assert MigrationProgress.refresh_table_migration_status.__name__ in task_dependencies run_workflow(MigrationProgress.update_tables_history_log, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with() mock_history_log.append_inventory_snapshot.assert_called_once()