From 3b496e0a9b59c64aba880a94a322795479045ed8 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 14:23:05 +0100 Subject: [PATCH 1/8] Use the interface type as the return type rather than the specialisation in use. --- src/databricks/labs/ucx/contexts/workflow_task.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index a3656b290b..18204314a9 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -22,10 +22,10 @@ from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler -from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder -from databricks.labs.ucx.progress.grants import GrantProgressEncoder +from databricks.labs.ucx.progress.grants import Grant, GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]: ) @cached_property - def grants_progress(self) -> GrantProgressEncoder: + def grants_progress(self) -> ProgressEncoder[Grant]: return GrantProgressEncoder( self.sql_backend, self.grant_ownership, @@ -221,7 +221,7 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: ) @cached_property - def tables_progress(self) -> TableProgressEncoder: + def tables_progress(self) -> ProgressEncoder[Table]: return TableProgressEncoder( self.sql_backend, self.table_ownership, From c78d55296ed46cc14be8918f964aa05b01c0856d Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 14:23:50 +0100 Subject: [PATCH 2/8] Fix internal documentation. There are really multiple writers now, because we have multiple instances of this class. --- src/databricks/labs/ucx/progress/history.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index b1f2847807..4681b25614 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -284,7 +284,7 @@ def full_name(self) -> str: def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: history_records = [self._encode_record_as_historical(record) for record in snapshot] logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + # The mode is 'append'. This is documented as conflict-free. self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") def _encode_record_as_historical(self, record: Record) -> Historical: From 7b576063431d74a38cc9cf3c34badf1d66d6a853 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 14:24:52 +0100 Subject: [PATCH 3/8] Fix spelling mistake. --- src/databricks/labs/ucx/progress/tables.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6dc76132e2..548286d150 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -43,7 +43,7 @@ def __init__( def _encode_record_as_historical(self, record: Table) -> Historical: """Encode record as historical. - A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant + A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ From a8d0b79433aa834e0d271bcf24e73257485d9e3c Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 14:30:05 +0100 Subject: [PATCH 4/8] Update migration-progress-experimental unit test to also cover the migration-status-refresh task. --- tests/unit/progress/test_workflows.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 6ce5174847..182c7c5965 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -4,6 +4,7 @@ import pytest from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment @@ -47,22 +48,31 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" mock_tables_crawler = create_autospec(TablesCrawler) + mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher) mock_history_log = create_autospec(ProgressEncoder) context_replacements = { "tables_crawler": mock_tables_crawler, + "migration_status_refresher": mock_migration_status_refresher, "tables_progress": mock_history_log, "named_parameters": {"parent_run_id": 53}, } - # The first part of a 2-step update: the crawl without updating the history log. + # The first part of a 3-step update: the table crawl without updating the history log. run_workflow(MigrationProgress.crawl_tables, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True) + mock_tables_crawler.snapshot.reset_mock() mock_history_log.append_inventory_snapshot.assert_not_called() - mock_tables_crawler.snapshot.reset_mock() - # The second part of the 2-step update: updating the history log (without a forced crawl). + # The second part of a 3-step update: updating table migration status without updating the history log. + run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements) + mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True) + mock_migration_status_refresher.snapshot.reset_mock() + mock_history_log.append_inventory_snapshot.assert_not_called() + + # The final part of the 3-step update: updating the history log (without a forced crawl). run_workflow(MigrationProgress.update_tables_history_log, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with() + # migration_status_refresher is not directly used within step 3, so interactions don't need to be checked. mock_history_log.append_inventory_snapshot.assert_called_once() From e7eadae65a12d03352f386f6cb415384e484ddf1 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 14:37:41 +0100 Subject: [PATCH 5/8] Refactor refreshing for migration status so that the runtime context doesn't automatically refresh on access. Currently access the history log for the table triggers a refresh of the migration status data. Refreshing is supposed to be controlled explicitly by the workflows, which this refactoring implements. Doing this eliminates a redundant refresh from the migration-progress-experimental workflow. --- .../labs/ucx/contexts/workflow_task.py | 2 +- .../hive_metastore/table_migration_status.py | 2 +- src/databricks/labs/ucx/progress/history.py | 3 +- src/databricks/labs/ucx/progress/tables.py | 26 +++++++++++---- tests/unit/progress/test_tables.py | 32 ++++++++++--------- 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 18204314a9..bb8db8cf32 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -225,7 +225,7 @@ def tables_progress(self) -> ProgressEncoder[Table]: return TableProgressEncoder( self.sql_backend, self.table_ownership, - self.migration_status_refresher.index(force_refresh=False), + self.migration_status_refresher, self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index dde5f17790..16252a05a1 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -53,7 +53,7 @@ def key(self): class TableMigrationIndex: - def __init__(self, tables: list[TableMigrationStatus]): + def __init__(self, tables: Iterable[TableMigrationStatus]): self._index = {(ms.src_schema, ms.src_table): ms for ms in tables} def is_migrated(self, schema: str, table: str) -> bool: diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 4681b25614..d1c2ff22f7 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -6,7 +6,7 @@ import logging from enum import Enum, EnumMeta from collections.abc import Iterable, Sequence -from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints from databricks.labs.lsql.backends import SqlBackend @@ -280,7 +280,6 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" - @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: history_records = [self._encode_record_as_historical(record) for record in snapshot] logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 548286d150..4922e8efd2 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,14 +1,21 @@ +import logging +from collections.abc import Iterable from dataclasses import replace from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus, TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +logger = logging.getLogger(__name__) + + class TableProgressEncoder(ProgressEncoder[Table]): """Encoder class:Table to class:History. @@ -21,7 +28,7 @@ def __init__( self, sql_backend: SqlBackend, ownership: TableOwnership, - table_migration_index: TableMigrationIndex, + migration_status_refresher: CrawlerBase[TableMigrationStatus], run_id: int, workspace_id: int, catalog: str, @@ -38,10 +45,17 @@ def __init__( schema, table, ) - self._table_migration_index = table_migration_index + self._migration_status_refresher = migration_status_refresher + + def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: + migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) + history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") + # The mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Table) -> Historical: - """Encode record as historical. + def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: + """Encode a table record, enriching with the migration status. A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure @@ -49,6 +63,6 @@ def _encode_record_as_historical(self, record: Table) -> Historical: """ historical = super()._encode_record_as_historical(record) failures = [] - if not self._table_migration_index.is_migrated(record.database, record.name): + if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index a859bf5c03..baf08de26d 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -4,9 +4,11 @@ from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationStatus, +) from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -19,21 +21,21 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = True - grant_progress_encoder = create_autospec(GrantProgressEncoder) + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None), + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert rows, f"No rows written for: {encoder.full_name}" assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() + migration_status_crawler.snapshot.assert_called_once() @pytest.mark.parametrize( @@ -45,11 +47,12 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = False - grant_progress_encoder = create_autospec(GrantProgressEncoder) + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated. + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) @@ -58,5 +61,4 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T assert len(rows) > 0, f"No rows written for: {encoder.full_name}" assert rows[0].failures == ["Pending migration"] ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() + migration_status_crawler.snapshot.assert_called_once() From f2aa76edf1022862c0800abc963f7fe58498be59 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 16:05:10 +0100 Subject: [PATCH 6/8] Consistent usage. --- src/databricks/labs/ucx/progress/history.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index d1c2ff22f7..e7b89306ff 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -1,7 +1,6 @@ from __future__ import annotations import dataclasses import datetime as dt -import typing import json import logging from enum import Enum, EnumMeta @@ -106,7 +105,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ # are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism # captures the type hints prior to resolution (which happens later in the class initialization process). # As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly. - klass_type_hints = typing.get_type_hints(klass) + klass_type_hints = get_type_hints(klass) field_names = [field.name for field in dataclasses.fields(klass)] field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names} if "failures" not in field_names_with_types: From 0bfe3d653bc171b8559547e8006c40ca35966ded Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 16:50:23 +0100 Subject: [PATCH 7/8] Ensure the workflow tasks depend on each other as expected. --- tests/unit/progress/test_workflows.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 182c7c5965..4f04466854 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -64,12 +64,17 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: mock_history_log.append_inventory_snapshot.assert_not_called() # The second part of a 3-step update: updating table migration status without updating the history log. + task_dependencies = getattr(MigrationProgress.refresh_table_migration_status, "__task__").depends_on + assert MigrationProgress.crawl_tables.__name__ in task_dependencies run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements) mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True) mock_migration_status_refresher.snapshot.reset_mock() mock_history_log.append_inventory_snapshot.assert_not_called() # The final part of the 3-step update: updating the history log (without a forced crawl). + task_dependencies = getattr(MigrationProgress.update_tables_history_log, "__task__").depends_on + assert MigrationProgress.crawl_tables.__name__ in task_dependencies + assert MigrationProgress.refresh_table_migration_status.__name__ in task_dependencies run_workflow(MigrationProgress.update_tables_history_log, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with() # migration_status_refresher is not directly used within step 3, so interactions don't need to be checked. From 27662b585c845293d1c256cc27ced0e459305970 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 19 Nov 2024 16:08:19 +0100 Subject: [PATCH 8/8] Split multi-stage unit test up into individual tests. --- tests/unit/progress/test_workflows.py | 31 +++++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index b42a52b855..1c0dd28f14 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -45,14 +45,12 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history mock_history_log.append_inventory_snapshot.assert_called_once() -def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: - """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" +def test_migration_progress_runtime_tables_refresh_crawl_tables(run_workflow) -> None: + """Ensure that step 1 of the split crawl/update-history-log tasks performs its part of the refresh process.""" mock_tables_crawler = create_autospec(TablesCrawler) - mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher) mock_history_log = create_autospec(ProgressEncoder) context_replacements = { "tables_crawler": mock_tables_crawler, - "migration_status_refresher": mock_migration_status_refresher, "tables_progress": mock_history_log, "named_parameters": {"parent_run_id": 53}, } @@ -60,24 +58,43 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: # The first part of a 3-step update: the table crawl without updating the history log. run_workflow(MigrationProgress.crawl_tables, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True) - mock_tables_crawler.snapshot.reset_mock() mock_history_log.append_inventory_snapshot.assert_not_called() + +def test_migration_progress_runtime_tables_refresh_migration_status(run_workflow) -> None: + """Ensure that step 2 of the split crawl/update-history-log tasks performs its part of the refresh process.""" + mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher) + mock_history_log = create_autospec(ProgressEncoder) + context_replacements = { + "migration_status_refresher": mock_migration_status_refresher, + "tables_progress": mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + # The second part of a 3-step update: updating table migration status without updating the history log. task_dependencies = getattr(MigrationProgress.refresh_table_migration_status, "__task__").depends_on assert MigrationProgress.crawl_tables.__name__ in task_dependencies run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements) mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True) - mock_migration_status_refresher.snapshot.reset_mock() mock_history_log.append_inventory_snapshot.assert_not_called() + +def test_migration_progress_runtime_tables_refresh_update_history_log(run_workflow) -> None: + """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" + mock_tables_crawler = create_autospec(TablesCrawler) + mock_history_log = create_autospec(ProgressEncoder) + context_replacements = { + "tables_crawler": mock_tables_crawler, + "tables_progress": mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + # The final part of the 3-step update: updating the history log (without a forced crawl). task_dependencies = getattr(MigrationProgress.update_tables_history_log, "__task__").depends_on assert MigrationProgress.crawl_tables.__name__ in task_dependencies assert MigrationProgress.refresh_table_migration_status.__name__ in task_dependencies run_workflow(MigrationProgress.update_tables_history_log, **context_replacements) mock_tables_crawler.snapshot.assert_called_once_with() - # migration_status_refresher is not directly used within step 3, so interactions don't need to be checked. mock_history_log.append_inventory_snapshot.assert_called_once()