Skip to content

Commit 1054e35

Browse files
authored
Fixed snapshot loading for DFSA and used-table crawlers (#3046)
## Changes This PR fixes an issue with the DFSA and used-table crawlers that could prevent loading of the snapshots. When loading they convert the rows to dictionaries using `.as_dict()` which isn't available on rows provided by the spark-based lsql backend. Instead `.asDict()` needs to be used. Incidental changes: - An existing integration test was updated to also test snapshot loading for these crawlers. - Another test was renamed to fix a typo in the name. ### Linked issues Relates to #3036, #3039. ### Tests - existing unit tests - existing integration tests
1 parent 6165454 commit 1054e35

File tree

4 files changed

+11
-5
lines changed

4 files changed

+11
-5
lines changed

src/databricks/labs/ucx/source_code/directfs_access.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None:
4848
def _try_fetch(self) -> Iterable[DirectFsAccess]:
4949
sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}"
5050
for row in self._backend.fetch(sql):
51-
yield self._klass.from_dict(row.as_dict())
51+
yield self._klass.from_dict(row.asDict())
5252

5353
def _crawl(self) -> Iterable[DirectFsAccess]:
5454
return []

src/databricks/labs/ucx/source_code/used_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def dump_all(self, tables: Sequence[UsedTable]) -> None:
4747
def _try_fetch(self) -> Iterable[UsedTable]:
4848
sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}"
4949
for row in self._backend.fetch(sql):
50-
yield self._klass.from_dict(row.as_dict())
50+
yield self._klass.from_dict(row.asDict())
5151

5252
def _crawl(self) -> Iterable[UsedTable]:
5353
return []

tests/integration/source_code/test_jobs.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,23 @@
3636
def test_linter_from_context(simple_ctx, make_job) -> None:
3737
# This code is similar to test_running_real_workflow_linter_job, but it's executed on the caller side and is easier
3838
# to debug.
39-
# Ensure we have at least 1 job that fails
40-
job = make_job(content="import xyz")
39+
# Ensure we have at least 1 job that fails: "Deprecated file system path in call to: /mnt/things/e/f/g"
40+
job = make_job(content="spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n")
4141
simple_ctx.config.include_job_ids = [job.job_id]
4242
simple_ctx.workflow_linter.refresh_report(simple_ctx.sql_backend, simple_ctx.inventory_database)
4343

44+
# Verify that the 'problems' table has content.
4445
cursor = simple_ctx.sql_backend.fetch(
4546
f"SELECT COUNT(*) AS count FROM {simple_ctx.inventory_database}.workflow_problems"
4647
)
4748
result = next(cursor)
4849
assert result['count'] > 0
4950

51+
# Verify that the other data produced snapshot can be loaded.
52+
dfsa_records = simple_ctx.directfs_access_crawler_for_paths.snapshot()
53+
used_table_records = simple_ctx.used_tables_crawler_for_paths.snapshot()
54+
assert dfsa_records and used_table_records
55+
5056

5157
def test_job_linter_no_problems(simple_ctx, make_job) -> None:
5258
j = make_job()

tests/unit/source_code/test_queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def test_query_linter_collects_dfsas_from_queries(name, query, dfsa_paths, is_re
3939
assert all(dfsa.is_write == is_write for dfsa in dfsas)
4040

4141

42-
def test_query_liner_refresh_report_writes_query_problems(migration_index, mock_backend) -> None:
42+
def test_query_linter_refresh_report_writes_query_problems(migration_index, mock_backend) -> None:
4343
ws = create_autospec(WorkspaceClient)
4444
dfsa_crawler = create_autospec(DirectFsAccessCrawler)
4545
used_tables_crawler = create_autospec(UsedTablesCrawler)

0 commit comments

Comments
 (0)