Skip to content

Commit 288478c

Browse files
committed
Introduce WIP DirectFsAccessPyFixer for code replacement
1 parent 08db6cb commit 288478c

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

src/databricks/labs/ucx/source_code/linters/directfs.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import logging
22
from abc import ABC
33
from collections.abc import Iterable
4+
from typing import Any
45

56
from astroid import Call, InferenceError, NodeNG # type: ignore
67
from sqlglot.expressions import Alter, Create, Delete, Drop, Expression, Identifier, Insert, Literal, Select
78

9+
from databricks.labs.ucx.hive_metastore import TablesCrawler
810
from databricks.labs.ucx.source_code.base import (
911
Advice,
1012
Deprecation,
@@ -14,6 +16,7 @@
1416
DirectFsAccess,
1517
)
1618
from databricks.labs.ucx.source_code.linters.base import SqlLinter, PythonLinter, DfsaPyCollector
19+
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
1720
from databricks.labs.ucx.source_code.python.python_ast import (
1821
Tree,
1922
TreeVisitor,
@@ -205,3 +208,77 @@ def _walk_up(cls, expression: Expression | None) -> Expression | None:
205208
if isinstance(expression, (Create, Alter, Drop, Insert, Delete, Select)):
206209
return expression
207210
return cls._walk_up(expression.parent)
211+
212+
class DirectFsAccessPyFixer(DirectFsAccessPyLinter):
213+
def __init__(self,
214+
session_state: CurrentSessionState,
215+
directfs_crawler: DirectFsAccessCrawler,
216+
tables_crawler: TablesCrawler,
217+
prevent_spark_duplicates=True,
218+
):
219+
super().__init__(session_state, prevent_spark_duplicates)
220+
self.directfs_crawler = directfs_crawler
221+
self.tables_crawler = tables_crawler
222+
self.direct_fs_table_list = [Any, [dict[str,str], Any]]
223+
224+
def fix_tree(self, tree: Tree) -> Tree:
225+
for directfs_node in self.collect_dfsas_from_tree(tree):
226+
self._fix_node(directfs_node)
227+
return tree
228+
229+
def _fix_node(self, directfs_node: DirectFsAccessNode) -> None:
230+
dfsa = directfs_node.dfsa
231+
if dfsa.is_read:
232+
self._replace_read(directfs_node)
233+
elif dfsa.is_write:
234+
self._replace_write(directfs_node)
235+
236+
def _replace_read(self, directfs_node: DirectFsAccessNode) -> None:
237+
dfsa = directfs_node.dfsa
238+
dfsa_details = self.direct_fs_table_list[dfsa.path]
239+
240+
# TODO: Actual code replacement
241+
logger.info(f"Replacing read of {dfsa.path} with table {dfsa_details.dst_schema}.{dfsa_details.dst_table}")
242+
243+
def _replace_write(self, directfs_node):
244+
dfsa = directfs_node.dfsa
245+
logger.info(f"Replacing read of {dfsa.path} with table")
246+
247+
def populate_directfs_table_list(
248+
self,
249+
directfs_crawlers: list[DirectFsAccessCrawler],
250+
tables_crawler: TablesCrawler,
251+
workspace_name: str,
252+
catalog_name: str,
253+
) -> None:
254+
"""
255+
List all direct filesystem access records.
256+
"""
257+
directfs_snapshot = []
258+
for crawler in directfs_crawlers:
259+
for directfs_access in crawler.snapshot():
260+
directfs_snapshot.append(directfs_access)
261+
tables_snapshot = list(tables_crawler.snapshot())
262+
if not tables_snapshot:
263+
msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run"
264+
raise ValueError(msg)
265+
if not directfs_snapshot:
266+
msg = "No directfs references found in code"
267+
raise ValueError(msg)
268+
269+
# TODO: very inefficient search, just for initial testing
270+
#
271+
for table in tables_snapshot:
272+
for directfs_record in directfs_snapshot:
273+
if table.location:
274+
if directfs_record.path in table.location:
275+
self.direct_fs_table_list.append({
276+
directfs_record.path:{
277+
"workspace_name":workspace_name,
278+
"is_read":directfs_record.is_read,
279+
"is_write":directfs_record.is_write,
280+
"catalog_name":catalog_name,
281+
"dst_schema":table.database,
282+
"dst_table":table.name,
283+
}
284+
})

0 commit comments

Comments
 (0)