Skip to content

Commit da3b15b

Browse files
committed
Back out changes relating to the way the migration-status information is passed into the table encoder.
This has been backed out to reduce clutter on the PR; they have been moved to PR #3270.
1 parent f9bf219 commit da3b15b

File tree

4 files changed

+28
-42
lines changed

4 files changed

+28
-42
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
from databricks.labs.ucx.contexts.application import GlobalContext
2323
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
2424
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
25-
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
25+
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
2626
from databricks.labs.ucx.hive_metastore.udfs import Udf
2727
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
28-
from databricks.labs.ucx.progress.grants import GrantProgressEncoder, Grant
28+
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
2929
from databricks.labs.ucx.progress.history import ProgressEncoder
3030
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
3131
from databricks.labs.ucx.progress.tables import TableProgressEncoder
@@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
189189
)
190190

191191
@cached_property
192-
def grants_progress(self) -> ProgressEncoder[Grant]:
192+
def grants_progress(self) -> GrantProgressEncoder:
193193
return GrantProgressEncoder(
194194
self.sql_backend,
195195
self.grant_ownership,
@@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
221221
)
222222

223223
@cached_property
224-
def tables_progress(self) -> ProgressEncoder[Table]:
224+
def tables_progress(self) -> TableProgressEncoder:
225225
return TableProgressEncoder(
226226
self.sql_backend,
227227
self.table_ownership,
228-
self.migration_status_refresher,
228+
self.migration_status_refresher.index(force_refresh=False),
229229
self.parent_run_id,
230230
self.workspace_id,
231231
self.config.ucx_catalog,

src/databricks/labs/ucx/progress/history.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
from enum import Enum, EnumMeta
77
from collections.abc import Iterable, Sequence
8-
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints
8+
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final
99

1010
from databricks.labs.lsql.backends import SqlBackend
1111

@@ -279,6 +279,7 @@ def __init__(
279279
def full_name(self) -> str:
280280
return f"{self._catalog}.{self._schema}.{self._table}"
281281

282+
@final
282283
def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None:
283284
history_records = [self._encode_record_as_historical(record) for record in snapshot]
284285
logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.")
Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
1-
import logging
2-
from collections.abc import Iterable
31
from dataclasses import replace
42

53
from databricks.labs.lsql.backends import SqlBackend
6-
from databricks.labs.ucx.framework.crawlers import CrawlerBase
7-
from databricks.labs.ucx.framework.utils import escape_sql_identifier
84

95
from databricks.labs.ucx.hive_metastore.tables import Table
10-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus
6+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
117
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
128
from databricks.labs.ucx.progress.history import ProgressEncoder
139
from databricks.labs.ucx.progress.install import Historical
1410

15-
logger = logging.getLogger(__name__)
16-
1711

1812
class TableProgressEncoder(ProgressEncoder[Table]):
1913
"""Encoder class:Table to class:History.
@@ -27,7 +21,7 @@ def __init__(
2721
self,
2822
sql_backend: SqlBackend,
2923
ownership: TableOwnership,
30-
migration_status_refresher: CrawlerBase[TableMigrationStatus],
24+
table_migration_index: TableMigrationIndex,
3125
run_id: int,
3226
workspace_id: int,
3327
catalog: str,
@@ -44,24 +38,17 @@ def __init__(
4438
schema,
4539
table,
4640
)
47-
self._migration_status_refresher = migration_status_refresher
48-
49-
def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
50-
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
51-
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
52-
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
53-
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
54-
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")
41+
self._table_migration_index = table_migration_index
5542

56-
def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
57-
"""Encode a table record, enriching with the migration status.
43+
def _encode_record_as_historical(self, record: Table) -> Historical:
44+
"""Encode record as historical.
5845
5946
A table failure means that the table is pending migration. Grants are purposefully left out, because a grant
6047
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
6148
for tables that are migrated to UC with their relevant grants also being migrated.
6249
"""
6350
historical = super()._encode_record_as_historical(record)
6451
failures = []
65-
if not migration_index.is_migrated(record.database, record.name):
52+
if not self._table_migration_index.is_migrated(record.database, record.name):
6653
failures.append("Pending migration")
6754
return replace(historical, failures=historical.failures + failures)

tests/unit/progress/test_tables.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@
44

55
from databricks.labs.ucx.framework.owners import Ownership
66
from databricks.labs.ucx.framework.utils import escape_sql_identifier
7-
from databricks.labs.ucx.hive_metastore.table_migration_status import (
8-
TableMigrationStatusRefresher,
9-
TableMigrationStatus,
10-
)
7+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
118
from databricks.labs.ucx.hive_metastore.tables import Table
9+
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
1210
from databricks.labs.ucx.progress.tables import TableProgressEncoder
1311

1412

@@ -21,21 +19,21 @@
2119
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
2220
ownership = create_autospec(Ownership)
2321
ownership.owner_of.return_value = "user"
24-
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
25-
migration_status_crawler.snapshot.return_value = (
26-
TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None),
27-
)
22+
table_migration_index = create_autospec(TableMigrationIndex)
23+
table_migration_index.is_migrated.return_value = True
24+
grant_progress_encoder = create_autospec(GrantProgressEncoder)
2825
encoder = TableProgressEncoder(
29-
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
26+
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
3027
)
3128

3229
encoder.append_inventory_snapshot([table])
3330

3431
rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
35-
assert rows, f"No rows written for: {encoder.full_name}"
32+
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
3633
assert len(rows[0].failures) == 0
3734
ownership.owner_of.assert_called_once()
38-
migration_status_crawler.snapshot.assert_called_once()
35+
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
36+
grant_progress_encoder.assert_not_called()
3937

4038

4139
@pytest.mark.parametrize(
@@ -47,12 +45,11 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
4745
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
4846
ownership = create_autospec(Ownership)
4947
ownership.owner_of.return_value = "user"
50-
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
51-
migration_status_crawler.snapshot.return_value = (
52-
TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated.
53-
)
48+
table_migration_index = create_autospec(TableMigrationIndex)
49+
table_migration_index.is_migrated.return_value = False
50+
grant_progress_encoder = create_autospec(GrantProgressEncoder)
5451
encoder = TableProgressEncoder(
55-
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
52+
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
5653
)
5754

5855
encoder.append_inventory_snapshot([table])
@@ -61,4 +58,5 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T
6158
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
6259
assert rows[0].failures == ["Pending migration"]
6360
ownership.owner_of.assert_called_once()
64-
migration_status_crawler.snapshot.assert_called_once()
61+
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
62+
grant_progress_encoder.assert_not_called()

0 commit comments

Comments
 (0)