|
3 | 3 | from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
|
4 | 4 | from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom
|
5 | 5 | from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter
|
| 6 | +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom, CurrentSessionState |
| 7 | +from databricks.labs.ucx.source_code.jobs import WorkflowLinter |
| 8 | +from databricks.labs.ucx.source_code.linters.directfs import DirectFsAccessPyFixer |
| 9 | +from databricks.labs.ucx.source_code.python.python_ast import Tree |
| 10 | +from integration.conftest import runtime_ctx |
| 11 | +from unit.source_code.linters.test_spark_connect import session_state |
6 | 12 |
|
7 | 13 |
|
8 | 14 | def test_legacy_query_dfsa_ownership(runtime_ctx) -> None:
|
@@ -110,3 +116,48 @@ def test_path_dfsa_ownership(
|
110 | 116 | # Verify ownership can be made.
|
111 | 117 | owner = runtime_ctx.directfs_access_ownership.owner_of(path_record)
|
112 | 118 | assert owner == runtime_ctx.workspace_client.current_user.me().user_name
|
| 119 | + |
| 120 | +def test_path_dfsa_replacement( |
| 121 | + runtime_ctx, |
| 122 | + make_directory, |
| 123 | + make_mounted_location, |
| 124 | + inventory_schema, |
| 125 | + sql_backend, |
| 126 | +) -> None: |
| 127 | + """Verify that the direct-fs access in python notebook is replaced with Unity catalog table""" |
| 128 | + |
| 129 | + mounted_location = '/mnt/things/e/f/g' |
| 130 | + external_table = runtime_ctx.make_table(external_csv=mounted_location, |
| 131 | + ) |
| 132 | + notebook_content = f"display(spark.read.csv('{mounted_location}'))" |
| 133 | + notebook = runtime_ctx.make_notebook(path=f"{make_directory()}/notebook.py", |
| 134 | + content=notebook_content.encode("ASCII")) |
| 135 | + job = runtime_ctx.make_job(notebook_path=notebook) |
| 136 | + |
| 137 | + # # Produce a DFSA record for the job. |
| 138 | + linter = WorkflowLinter( |
| 139 | + runtime_ctx.workspace_client, |
| 140 | + runtime_ctx.dependency_resolver, |
| 141 | + runtime_ctx.path_lookup, |
| 142 | + TableMigrationIndex([]), |
| 143 | + runtime_ctx.directfs_access_crawler_for_paths, |
| 144 | + runtime_ctx.used_tables_crawler_for_paths, |
| 145 | + include_job_ids=[job.job_id], |
| 146 | + ) |
| 147 | + linter.refresh_report(sql_backend, inventory_schema) |
| 148 | + |
| 149 | + runtime_ctx.tables_crawler.snapshot() |
| 150 | + runtime_ctx.directfs_access_crawler_for_paths.snapshot() |
| 151 | + |
| 152 | + session_state = CurrentSessionState() |
| 153 | + directfs_py_fixer = DirectFsAccessPyFixer(session_state, |
| 154 | + runtime_ctx.directfs_access_crawler_for_paths, |
| 155 | + runtime_ctx.tables_crawler) |
| 156 | + directfs_py_fixer.populate_directfs_table_list([runtime_ctx.directfs_access_crawler_for_paths], |
| 157 | + runtime_ctx.tables_crawler, |
| 158 | + "workspace_name", |
| 159 | + "catalog_name") |
| 160 | + |
| 161 | + assert True |
| 162 | + directfs_py_fixer.fix_tree(Tree.maybe_normalized_parse(notebook_content).tree) |
| 163 | + assert True |
0 commit comments