@@ -62,7 +62,6 @@ def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator:
62
62
self ._admin_locator = admin_locator
63
63
self ._last_node_id = 0
64
64
self ._nodes : dict [tuple [str , str ], MigrationNode ] = {}
65
- self ._incoming : dict [tuple [str , str ], set [tuple [str , str ]]] = defaultdict (set )
66
65
self ._outgoing : dict [tuple [str , str ], set [tuple [str , str ]]] = defaultdict (set )
67
66
68
67
def register_workflow_task (self , task : jobs .Task , job : jobs .Job , graph : DependencyGraph ) -> MigrationNode :
@@ -80,15 +79,12 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: Dependen
80
79
object_owner = job_node .object_owner , # no task owner so use job one
81
80
)
82
81
self ._nodes [task_node .key ] = task_node
83
- self ._incoming [job_node .key ].add (task_node .key )
84
82
self ._outgoing [task_node .key ].add (job_node .key )
85
83
if task .existing_cluster_id :
86
84
cluster_node = self .register_cluster (task .existing_cluster_id )
87
85
if cluster_node :
88
- self ._incoming [cluster_node .key ].add (task_node .key )
89
86
self ._outgoing [task_node .key ].add (cluster_node .key )
90
87
# also make the cluster dependent on the job
91
- self ._incoming [cluster_node .key ].add (job_node .key )
92
88
self ._outgoing [job_node .key ].add (cluster_node .key )
93
89
graph .visit (self ._visit_dependency , None )
94
90
return task_node
@@ -107,7 +103,6 @@ def register_dependency(self, parent_node: MigrationNode, object_type: str, obje
107
103
if not dependency_node :
108
104
dependency_node = self ._create_dependency_node (object_type , object_id )
109
105
if parent_node :
110
- self ._incoming [parent_node .key ].add (dependency_node .key )
111
106
self ._outgoing [dependency_node .key ].add (parent_node .key )
112
107
return dependency_node
113
108
@@ -153,7 +148,6 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
153
148
for job_cluster in job .settings .job_clusters :
154
149
cluster_node = self .register_job_cluster (job_cluster )
155
150
if cluster_node :
156
- self ._incoming [cluster_node .key ].add (job_node .key )
157
151
self ._outgoing [job_node .key ].add (cluster_node .key )
158
152
return job_node
159
153
@@ -182,7 +176,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode:
182
176
183
177
def generate_steps (self ) -> Iterable [MigrationStep ]:
184
178
"""The below algo is adapted from Kahn's topological sort.
185
- The main differences are as follows:
179
+ The differences are as follows:
186
180
1) we want the same step number for all nodes with same dependency depth
187
181
so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed
188
182
(these are transient 'leaf' nodes i.e. they only become 'leaf' during processing)
@@ -191,14 +185,17 @@ def generate_steps(self) -> Iterable[MigrationStep]:
191
185
to avoid an infinite loop. We also avoid side effects (such as negative counts).
192
186
This algo works correctly for simple cases, but is not tested on large trees.
193
187
"""
194
- incoming_counts = self ._populate_incoming_counts ()
188
+ incoming_keys = self ._collect_incoming_keys ()
189
+ incoming_counts = self ._compute_incoming_counts (incoming_keys )
195
190
step_number = 1
196
191
sorted_steps : list [MigrationStep ] = []
197
192
while len (incoming_counts ) > 0 :
198
193
leaf_keys = self ._get_leaf_keys (incoming_counts )
199
194
for leaf_key in leaf_keys :
200
195
del incoming_counts [leaf_key ]
201
- sorted_steps .append (self ._nodes [leaf_key ].as_step (step_number , list (self ._required_step_ids (leaf_key ))))
196
+ sorted_steps .append (
197
+ self ._nodes [leaf_key ].as_step (step_number , list (self ._required_step_ids (incoming_keys [leaf_key ])))
198
+ )
202
199
self ._on_leaf_key_processed (leaf_key , incoming_counts )
203
200
step_number += 1
204
201
return sorted_steps
@@ -212,14 +209,23 @@ def _on_leaf_key_processed(self, leaf_key: tuple[str, str], incoming_counts: dic
212
209
if incoming_counts [dependency_key ] > 0 :
213
210
incoming_counts [dependency_key ] -= 1
214
211
215
- def _required_step_ids (self , node_key : tuple [str , str ]) -> Iterable [int ]:
216
- for leaf_key in self ._incoming [node_key ]:
217
- yield self ._nodes [leaf_key ].node_id
212
+ def _collect_incoming_keys (self ) -> dict [tuple [str , str ], set [tuple [str , str ]]]:
213
+ result : dict [tuple [str , str ], set [tuple [str , str ]]] = defaultdict (set )
214
+ for source , outgoing in self ._outgoing .items ():
215
+ for target in outgoing :
216
+ result [target ].add (source )
217
+ return result
218
+
219
+ def _required_step_ids (self , required_step_keys : set [tuple [str , str ]]) -> Iterable [int ]:
220
+ for source_key in required_step_keys :
221
+ yield self ._nodes [source_key ].node_id
218
222
219
- def _populate_incoming_counts (self ) -> dict [tuple [str , str ], int ]:
223
+ def _compute_incoming_counts (
224
+ self , incoming : dict [tuple [str , str ], set [tuple [str , str ]]]
225
+ ) -> dict [tuple [str , str ], int ]:
220
226
result = defaultdict (int )
221
227
for node_key in self ._nodes :
222
- result [node_key ] = len (self . _incoming [node_key ])
228
+ result [node_key ] = len (incoming [node_key ])
223
229
return result
224
230
225
231
@classmethod
0 commit comments