5
5
from collections import defaultdict
6
6
from collections .abc import Iterable
7
7
from dataclasses import dataclass , field
8
+ from pathlib import Path
8
9
9
10
from databricks .sdk import WorkspaceClient
10
11
from databricks .sdk .errors import DatabricksError
11
12
from databricks .sdk .service .jobs import Job , JobCluster , Task
12
13
13
14
from databricks .labs .ucx .assessment .clusters import ClusterOwnership , ClusterInfo
14
15
from databricks .labs .ucx .assessment .jobs import JobOwnership , JobInfo
15
- from databricks .labs .ucx .framework .owners import AdministratorLocator
16
- from databricks .labs .ucx .source_code .graph import DependencyProblem
16
+ from databricks .labs .ucx .framework .owners import AdministratorLocator , WorkspaceObjectOwnership
17
+ from databricks .labs .ucx .source_code .graph import DependencyGraph , DependencyProblem
18
+ from databricks .labs .ucx .source_code .path_lookup import PathLookup
17
19
18
20
19
21
@dataclass
@@ -39,6 +41,9 @@ class MigrationStep:
39
41
required_step_ids : list [int ]
40
42
"""The step ids that should be completed before this step is started."""
41
43
44
+ @property
45
+ def key (self ) -> tuple [str , str ]:
46
+ return self .object_type , self .object_id
42
47
43
48
MigrationNodeKey = tuple [str , str ]
44
49
@@ -148,8 +153,9 @@ class MigrationSequencer:
148
153
Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`.
149
154
"""
150
155
151
- def __init__ (self , ws : WorkspaceClient , administrator_locator : AdministratorLocator ):
156
+ def __init__ (self , ws : WorkspaceClient , path_lookup : PathLookup , administrator_locator : AdministratorLocator ):
152
157
self ._ws = ws
158
+ self ._path_lookup = path_lookup
153
159
self ._admin_locator = administrator_locator
154
160
self ._counter = itertools .count ()
155
161
self ._nodes : dict [MigrationNodeKey , MigrationNode ] = {}
@@ -220,7 +226,7 @@ def _register_job(self, job: Job) -> MaybeMigrationNode:
220
226
problems .append (problem )
221
227
return MaybeMigrationNode (job_node , problems )
222
228
223
- def _register_workflow_task (self , task : Task , parent : MigrationNode ) -> MaybeMigrationNode :
229
+ def _register_workflow_task (self , task : Task , parent : MigrationNode , graph : DependencyGraph ) -> MaybeMigrationNode :
224
230
"""Register a workflow task.
225
231
226
232
TODO:
@@ -262,8 +268,51 @@ def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMig
262
268
else :
263
269
problem = DependencyProblem ('cluster-not-found' , f"Could not find cluster: { task .job_cluster_key } " )
264
270
problems .append (problem )
271
+ graph .visit (self ._visit_dependency , None )
265
272
return MaybeMigrationNode (task_node , problems )
266
273
274
+ def _visit_dependency (self , graph : DependencyGraph ) -> bool | None :
275
+ lineage = graph .dependency .lineage [- 1 ]
276
+ parent_node = self ._nodes [(lineage .object_type , lineage .object_id )]
277
+ for dependency in graph .local_dependencies :
278
+ lineage = dependency .lineage [- 1 ]
279
+ self .register_dependency (parent_node , lineage .object_type , lineage .object_id )
280
+ # TODO tables and dfsas
281
+ return False
282
+
283
+ def register_dependency (self , parent_node : MigrationNode , object_type : str , object_id : str ) -> MigrationNode :
284
+ dependency_node = self ._nodes .get ((object_type , object_id ), None )
285
+ if not dependency_node :
286
+ dependency_node = self ._create_dependency_node (object_type , object_id )
287
+ if parent_node :
288
+ self ._incoming [parent_node .key ].add (dependency_node .key )
289
+ self ._outgoing [dependency_node .key ].add (parent_node .key )
290
+ return dependency_node
291
+
292
+ def _create_dependency_node (self , object_type : str , object_id : str ) -> MigrationNode :
293
+ object_name : str = "<ANONYMOUS>"
294
+ _object_owner : str = "<UNKNOWN>"
295
+ if object_type in {"NOTEBOOK" , "FILE" }:
296
+ path = Path (object_id )
297
+ for library_root in self ._path_lookup .library_roots :
298
+ if not path .is_relative_to (library_root ):
299
+ continue
300
+ object_name = path .relative_to (library_root ).as_posix ()
301
+ break
302
+ object_owner = WorkspaceObjectOwnership (self ._admin_locator ).owner_of ((object_type , object_id ))
303
+ else :
304
+ raise ValueError (f"{ object_type } not supported yet!" )
305
+ self ._last_node_id += 1
306
+ dependency_node = MigrationNode (
307
+ node_id = self ._last_node_id ,
308
+ object_type = object_type ,
309
+ object_id = object_id ,
310
+ object_name = object_name ,
311
+ object_owner = object_owner ,
312
+ )
313
+ self ._nodes [dependency_node .key ] = dependency_node
314
+ return dependency_node
315
+
267
316
def _register_job_cluster (self , cluster : JobCluster , parent : MigrationNode ) -> MaybeMigrationNode :
268
317
"""Register a job cluster.
269
318
@@ -322,6 +371,16 @@ def generate_steps(self) -> Iterable[MigrationStep]:
322
371
leaf during processing)
323
372
- We handle cyclic dependencies (implemented in PR #3009)
324
373
"""
374
+ """The below algo is adapted from Kahn's topological sort.
375
+ The main differences are as follows:
376
+ 1) we want the same step number for all nodes with same dependency depth
377
+ so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed
378
+ (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing)
379
+ 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG
380
+ so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order
381
+ to avoid an infinite loop. We also avoid side effects (such as negative counts).
382
+ This algo works correctly for simple cases, but is not tested on large trees.
383
+ """
325
384
ordered_steps : list [MigrationStep ] = []
326
385
# For updating the priority of steps that depend on other steps
327
386
incoming_references = self ._invert_outgoing_to_incoming_references ()
0 commit comments