Skip to content

Commit de8c60e

Browse files
asnarenfx
andauthored
Refactor refreshing of migration-status information for tables, eliminate another redundant refresh. (#3270)
## Changes This PR updates the way table records are enriched with migration-status information during encoding for the history log. Changes include: - Ensuring refresh of the migration-status information is explicit and under control of the workflow, an intent expressed [here](#2743 (comment)) when the current design was laid out. - A redundant refresh of the migration-status information is eliminated. (This took place in the `update_tables_history_log` task, even though it had just occurred in the `refresh_table_migration_status` task immediately prior to it.) - Some additional (unit) test coverage of the `migration-progress-experimental` workflow. ### Linked issues Splits #3239. ### Functionality - modified existing workflow: `migration-progress-experimental` ### Tests - added/updated unit tests - existing integration tests --------- Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
1 parent b971bb8 commit de8c60e

File tree

6 files changed

+84
-38
lines changed

6 files changed

+84
-38
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
from databricks.labs.ucx.contexts.application import GlobalContext
2323
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
2424
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
25-
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
25+
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
2626
from databricks.labs.ucx.hive_metastore.udfs import Udf
2727
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
28-
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
28+
from databricks.labs.ucx.progress.grants import Grant, GrantProgressEncoder
2929
from databricks.labs.ucx.progress.history import ProgressEncoder
3030
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
3131
from databricks.labs.ucx.progress.tables import TableProgressEncoder
@@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
189189
)
190190

191191
@cached_property
192-
def grants_progress(self) -> GrantProgressEncoder:
192+
def grants_progress(self) -> ProgressEncoder[Grant]:
193193
return GrantProgressEncoder(
194194
self.sql_backend,
195195
self.grant_ownership,
@@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
221221
)
222222

223223
@cached_property
224-
def tables_progress(self) -> TableProgressEncoder:
224+
def tables_progress(self) -> ProgressEncoder[Table]:
225225
return TableProgressEncoder(
226226
self.sql_backend,
227227
self.table_ownership,
228-
self.migration_status_refresher.index(force_refresh=False),
228+
self.migration_status_refresher,
229229
self.parent_run_id,
230230
self.workspace_id,
231231
self.config.ucx_catalog,

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def key(self):
5353

5454

5555
class TableMigrationIndex:
56-
def __init__(self, tables: list[TableMigrationStatus]):
56+
def __init__(self, tables: Iterable[TableMigrationStatus]):
5757
self._index = {(ms.src_schema, ms.src_table): ms for ms in tables}
5858

5959
def is_migrated(self, schema: str, table: str) -> bool:

src/databricks/labs/ucx/progress/history.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
from __future__ import annotations
22
import dataclasses
33
import datetime as dt
4-
import typing
54
import json
65
import logging
76
from enum import Enum, EnumMeta
87
from collections.abc import Iterable, Sequence
9-
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final
8+
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints
109

1110
from databricks.labs.lsql.backends import SqlBackend
1211

@@ -106,7 +105,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ
106105
# are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism
107106
# captures the type hints prior to resolution (which happens later in the class initialization process).
108107
# As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly.
109-
klass_type_hints = typing.get_type_hints(klass)
108+
klass_type_hints = get_type_hints(klass)
110109
field_names = [field.name for field in dataclasses.fields(klass)]
111110
field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names}
112111
if "failures" not in field_names_with_types:
@@ -280,11 +279,10 @@ def __init__(
280279
def full_name(self) -> str:
281280
return f"{self._catalog}.{self._schema}.{self._table}"
282281

283-
@final
284282
def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None:
285283
history_records = [self._encode_record_as_historical(record) for record in snapshot]
286284
logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.")
287-
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
285+
# The mode is 'append'. This is documented as conflict-free.
288286
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")
289287

290288
def _encode_record_as_historical(self, record: Record) -> Historical:

src/databricks/labs/ucx/progress/tables.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import logging
2+
from collections.abc import Iterable
13
from dataclasses import replace
24

35
from databricks.labs.lsql.backends import SqlBackend
46

7+
from databricks.labs.ucx.framework.crawlers import CrawlerBase
8+
from databricks.labs.ucx.framework.utils import escape_sql_identifier
9+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus, TableMigrationIndex
510
from databricks.labs.ucx.hive_metastore.tables import Table
6-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
711
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
812
from databricks.labs.ucx.progress.history import ProgressEncoder
913
from databricks.labs.ucx.progress.install import Historical
1014

1115

16+
logger = logging.getLogger(__name__)
17+
18+
1219
class TableProgressEncoder(ProgressEncoder[Table]):
1320
"""Encoder class:Table to class:History.
1421
@@ -21,7 +28,7 @@ def __init__(
2128
self,
2229
sql_backend: SqlBackend,
2330
ownership: TableOwnership,
24-
table_migration_index: TableMigrationIndex,
31+
migration_status_refresher: CrawlerBase[TableMigrationStatus],
2532
run_id: int,
2633
workspace_id: int,
2734
catalog: str,
@@ -38,17 +45,24 @@ def __init__(
3845
schema,
3946
table,
4047
)
41-
self._table_migration_index = table_migration_index
48+
self._migration_status_refresher = migration_status_refresher
49+
50+
def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
51+
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
52+
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
53+
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
54+
# The mode is 'append'. This is documented as conflict-free.
55+
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")
4256

43-
def _encode_record_as_historical(self, record: Table) -> Historical:
44-
"""Encode record as historical.
57+
def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
58+
"""Encode a table record, enriching with the migration status.
4559
46-
A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant
60+
A table failure means that the table is pending migration. Grants are purposefully left out, because a grant
4761
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
4862
for tables that are migrated to UC with their relevant grants also being migrated.
4963
"""
5064
historical = super()._encode_record_as_historical(record)
5165
failures = []
52-
if not self._table_migration_index.is_migrated(record.database, record.name):
66+
if not migration_index.is_migrated(record.database, record.name):
5367
failures.append("Pending migration")
5468
return replace(historical, failures=historical.failures + failures)

tests/unit/progress/test_tables.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
from databricks.labs.ucx.framework.owners import Ownership
66
from databricks.labs.ucx.framework.utils import escape_sql_identifier
7-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
7+
from databricks.labs.ucx.hive_metastore.table_migration_status import (
8+
TableMigrationStatusRefresher,
9+
TableMigrationStatus,
10+
)
811
from databricks.labs.ucx.hive_metastore.tables import Table
9-
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
1012
from databricks.labs.ucx.progress.tables import TableProgressEncoder
1113

1214

@@ -19,21 +21,21 @@
1921
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
2022
ownership = create_autospec(Ownership)
2123
ownership.owner_of.return_value = "user"
22-
table_migration_index = create_autospec(TableMigrationIndex)
23-
table_migration_index.is_migrated.return_value = True
24-
grant_progress_encoder = create_autospec(GrantProgressEncoder)
24+
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
25+
migration_status_crawler.snapshot.return_value = (
26+
TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None),
27+
)
2528
encoder = TableProgressEncoder(
26-
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
29+
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
2730
)
2831

2932
encoder.append_inventory_snapshot([table])
3033

3134
rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
32-
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
35+
assert rows, f"No rows written for: {encoder.full_name}"
3336
assert len(rows[0].failures) == 0
3437
ownership.owner_of.assert_called_once()
35-
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
36-
grant_progress_encoder.assert_not_called()
38+
migration_status_crawler.snapshot.assert_called_once()
3739

3840

3941
@pytest.mark.parametrize(
@@ -45,11 +47,12 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
4547
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
4648
ownership = create_autospec(Ownership)
4749
ownership.owner_of.return_value = "user"
48-
table_migration_index = create_autospec(TableMigrationIndex)
49-
table_migration_index.is_migrated.return_value = False
50-
grant_progress_encoder = create_autospec(GrantProgressEncoder)
50+
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
51+
migration_status_crawler.snapshot.return_value = (
52+
TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated.
53+
)
5154
encoder = TableProgressEncoder(
52-
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
55+
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
5356
)
5457

5558
encoder.append_inventory_snapshot([table])
@@ -58,5 +61,4 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T
5861
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
5962
assert rows[0].failures == ["Pending migration"]
6063
ownership.owner_of.assert_called_once()
61-
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
62-
grant_progress_encoder.assert_not_called()
64+
migration_status_crawler.snapshot.assert_called_once()

tests/unit/progress/test_workflows.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66
from databricks.labs.ucx.hive_metastore import TablesCrawler
7+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
78
from databricks.labs.ucx.progress.history import ProgressEncoder
89
from databricks.sdk import WorkspaceClient
910
from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment
@@ -44,8 +45,8 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history
4445
mock_history_log.append_inventory_snapshot.assert_called_once()
4546

4647

47-
def test_migration_progress_runtime_tables_refresh(run_workflow) -> None:
48-
"""Ensure that the split crawl and update-history-log tasks perform their part of the refresh process."""
48+
def test_migration_progress_runtime_tables_refresh_crawl_tables(run_workflow) -> None:
49+
"""Ensure that step 1 of the split crawl/update-history-log tasks performs its part of the refresh process."""
4950
mock_tables_crawler = create_autospec(TablesCrawler)
5051
mock_history_log = create_autospec(ProgressEncoder)
5152
context_replacements = {
@@ -54,13 +55,44 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None:
5455
"named_parameters": {"parent_run_id": 53},
5556
}
5657

57-
# The first part of a 2-step update: the crawl without updating the history log.
58+
# The first part of a 3-step update: the table crawl without updating the history log.
5859
run_workflow(MigrationProgress.crawl_tables, **context_replacements)
5960
mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True)
6061
mock_history_log.append_inventory_snapshot.assert_not_called()
6162

62-
mock_tables_crawler.snapshot.reset_mock()
63-
# The second part of the 2-step update: updating the history log (without a forced crawl).
63+
64+
def test_migration_progress_runtime_tables_refresh_migration_status(run_workflow) -> None:
65+
"""Ensure that step 2 of the split crawl/update-history-log tasks performs its part of the refresh process."""
66+
mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher)
67+
mock_history_log = create_autospec(ProgressEncoder)
68+
context_replacements = {
69+
"migration_status_refresher": mock_migration_status_refresher,
70+
"tables_progress": mock_history_log,
71+
"named_parameters": {"parent_run_id": 53},
72+
}
73+
74+
# The second part of a 3-step update: updating table migration status without updating the history log.
75+
task_dependencies = getattr(MigrationProgress.refresh_table_migration_status, "__task__").depends_on
76+
assert MigrationProgress.crawl_tables.__name__ in task_dependencies
77+
run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements)
78+
mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True)
79+
mock_history_log.append_inventory_snapshot.assert_not_called()
80+
81+
82+
def test_migration_progress_runtime_tables_refresh_update_history_log(run_workflow) -> None:
83+
"""Ensure that the split crawl and update-history-log tasks perform their part of the refresh process."""
84+
mock_tables_crawler = create_autospec(TablesCrawler)
85+
mock_history_log = create_autospec(ProgressEncoder)
86+
context_replacements = {
87+
"tables_crawler": mock_tables_crawler,
88+
"tables_progress": mock_history_log,
89+
"named_parameters": {"parent_run_id": 53},
90+
}
91+
92+
# The final part of the 3-step update: updating the history log (without a forced crawl).
93+
task_dependencies = getattr(MigrationProgress.update_tables_history_log, "__task__").depends_on
94+
assert MigrationProgress.crawl_tables.__name__ in task_dependencies
95+
assert MigrationProgress.refresh_table_migration_status.__name__ in task_dependencies
6496
run_workflow(MigrationProgress.update_tables_history_log, **context_replacements)
6597
mock_tables_crawler.snapshot.assert_called_once_with()
6698
mock_history_log.append_inventory_snapshot.assert_called_once()

0 commit comments

Comments
 (0)