4
4
from collections .abc import Iterable
5
5
from dataclasses import dataclass , field
6
6
7
+ from databricks .sdk import WorkspaceClient
7
8
from databricks .sdk .service import jobs
8
9
9
10
from databricks .labs .ucx .source_code .graph import DependencyGraph
@@ -66,7 +67,8 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None:
66
67
67
68
class MigrationSequencer :
68
69
69
- def __init__ (self ):
70
+ def __init__ (self , ws : WorkspaceClient ):
71
+ self ._ws = ws
70
72
self ._root = MigrationNode (
71
73
node_id = 0 , object_type = "ROOT" , object_id = "ROOT" , object_name = "ROOT" , object_owner = "NONE"
72
74
)
@@ -83,7 +85,7 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende
83
85
object_type = "TASK" ,
84
86
object_id = task_id ,
85
87
object_name = task .task_key ,
86
- object_owner = job_node .object_owner , # no task owner so use job one
88
+ object_owner = job_node .object_owner , # no task owner so use job one
87
89
)
88
90
job_node .required_steps .append (task_node )
89
91
if task .existing_cluster_id :
@@ -127,14 +129,17 @@ def register_cluster(self, cluster_key: str) -> MigrationNode:
127
129
cluster_node = self ._find_node (object_type = "CLUSTER" , object_id = cluster_key )
128
130
if cluster_node :
129
131
return cluster_node
132
+ details = self ._ws .clusters .get (cluster_key )
133
+ object_name = details .cluster_name if details and details .cluster_name else cluster_key
134
+ object_owner = details .creator_user_name if details and details .creator_user_name else "<UNKNOWN>"
130
135
MigrationNode .last_node_id += 1
131
136
cluster_node = MigrationNode (
132
137
node_id = MigrationNode .last_node_id ,
133
138
object_type = "CLUSTER" ,
134
139
object_id = cluster_key ,
135
- object_name = cluster_key ,
136
- object_owner = "NONE" ,
137
- ) # TODO object_owner
140
+ object_name = object_name ,
141
+ object_owner = object_owner ,
142
+ )
138
143
# TODO register warehouses and policies
139
144
self ._root .required_steps .append (cluster_node )
140
145
return cluster_node
@@ -155,6 +160,8 @@ def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep
155
160
for step in steps :
156
161
existing = best_steps .get (step .step_id , None )
157
162
# keep the step with the highest step number
163
+ # TODO this possibly affects the step_number of steps that depend on this one
164
+ # but it's probably OK to not be 100% accurate initially
158
165
if existing and existing .step_number >= step .step_number :
159
166
continue
160
167
best_steps [step .step_id ] = step
0 commit comments