Skip to content

Fixed snapshot loading for DFSA and used-table crawlers #3046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None:
def _try_fetch(self) -> Iterable[DirectFsAccess]:
sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}"
for row in self._backend.fetch(sql):
yield self._klass.from_dict(row.as_dict())
yield self._klass.from_dict(row.asDict())

def _crawl(self) -> Iterable[DirectFsAccess]:
return []
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/used_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def dump_all(self, tables: Sequence[UsedTable]) -> None:
def _try_fetch(self) -> Iterable[UsedTable]:
sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}"
for row in self._backend.fetch(sql):
yield self._klass.from_dict(row.as_dict())
yield self._klass.from_dict(row.asDict())

def _crawl(self) -> Iterable[UsedTable]:
return []
Expand Down
10 changes: 8 additions & 2 deletions tests/integration/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,23 @@ def test_running_real_workflow_linter_job(installation_ctx, make_job) -> None:
def test_linter_from_context(simple_ctx, make_job) -> None:
# This code is similar to test_running_real_workflow_linter_job, but it's executed on the caller side and is easier
# to debug.
# Ensure we have at least 1 job that fails
job = make_job(content="import xyz")
# Ensure we have at least 1 job that fails: "Deprecated file system path in call to: /mnt/things/e/f/g"
job = make_job(content="spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n")
simple_ctx.config.include_job_ids = [job.job_id]
simple_ctx.workflow_linter.refresh_report(simple_ctx.sql_backend, simple_ctx.inventory_database)

# Verify that the 'problems' table has content.
cursor = simple_ctx.sql_backend.fetch(
f"SELECT COUNT(*) AS count FROM {simple_ctx.inventory_database}.workflow_problems"
)
result = next(cursor)
assert result['count'] > 0

# Verify that the other data produced snapshot can be loaded.
dfsa_records = simple_ctx.directfs_access_crawler_for_paths.snapshot()
used_table_records = simple_ctx.used_tables_crawler_for_paths.snapshot()
assert dfsa_records and used_table_records


def test_job_linter_no_problems(simple_ctx, make_job) -> None:
j = make_job()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/source_code/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_query_linter_collects_dfsas_from_queries(name, query, dfsa_paths, is_re
assert all(dfsa.is_write == is_write for dfsa in dfsas)


def test_query_liner_refresh_report_writes_query_problems(migration_index, mock_backend) -> None:
def test_query_linter_refresh_report_writes_query_problems(migration_index, mock_backend) -> None:
ws = create_autospec(WorkspaceClient)
dfsa_crawler = create_autospec(DirectFsAccessCrawler)
used_tables_crawler = create_autospec(UsedTablesCrawler)
Expand Down