@@ -181,20 +181,31 @@ def register_cluster(self, cluster_id: str) -> MigrationNode:
181
181
return cluster_node
182
182
183
183
def generate_steps (self ) -> Iterable [MigrationStep ]:
184
- # algo adapted from Kahn topological sort. The main differences is that
185
- # we want the same step number for all nodes with same dependency depth
186
- # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed
187
- # (these are transient leaf nodes i.e. they only become leaf during processing)
184
+ """ The below algo is adapted from Kahn's topological sort.
185
+ The main differences are as follows:
186
+ 1) we want the same step number for all nodes with same dependency depth
187
+ so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed
188
+ (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing)
189
+ 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG
190
+ so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order
191
+ to avoid an infinite loop. We also avoid side effects (such as negative counts).
192
+ This algo works correctly for simple cases, but is not tested on large trees.
193
+ """
188
194
incoming_counts = self ._populate_incoming_counts ()
189
195
step_number = 1
190
196
sorted_steps : list [MigrationStep ] = []
191
197
while len (incoming_counts ) > 0 :
192
- leaf_keys = list ( self ._get_leaf_keys (incoming_counts ) )
198
+ leaf_keys = self ._get_leaf_keys (incoming_counts )
193
199
for leaf_key in leaf_keys :
194
200
del incoming_counts [leaf_key ]
195
201
sorted_steps .append (self ._nodes [leaf_key ].as_step (step_number , list (self ._required_step_ids (leaf_key ))))
196
202
for dependency_key in self ._outgoing [leaf_key ]:
197
- incoming_counts [dependency_key ] -= 1
203
+ # prevent re-instantiation of already deleted keys
204
+ if dependency_key not in incoming_counts :
205
+ continue
206
+ # prevent negative count with cyclic dependencies
207
+ if incoming_counts [dependency_key ] > 0 :
208
+ incoming_counts [dependency_key ] -= 1
198
209
step_number += 1
199
210
return sorted_steps
200
211
@@ -208,9 +219,20 @@ def _populate_incoming_counts(self) -> dict[tuple[str, str], int]:
208
219
result [node_key ] = len (self ._incoming [node_key ])
209
220
return result
210
221
211
- @staticmethod
212
- def _get_leaf_keys (incoming_counts : dict [tuple [str , str ], int ]) -> Iterable [tuple [str , str ]]:
222
+ @classmethod
223
+ def _get_leaf_keys (cls , incoming_counts : dict [tuple [str , str ], int ]) -> Iterable [tuple [str , str ]]:
224
+ count = 0
225
+ leaf_keys = list (cls ._yield_leaf_keys (incoming_counts , count ))
226
+ # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies
227
+ # in which case it's safe to process nodes with a higher incoming count
228
+ while not leaf_keys :
229
+ count += 1
230
+ leaf_keys = list (cls ._yield_leaf_keys (incoming_counts , count ))
231
+ return leaf_keys
232
+
233
+ @classmethod
234
+ def _yield_leaf_keys (cls , incoming_counts : dict [tuple [str , str ], int ], level : int ) -> Iterable [tuple [str , str ]]:
213
235
for node_key , incoming_count in incoming_counts .items ():
214
- if incoming_count > 0 :
236
+ if incoming_count > level :
215
237
continue
216
238
yield node_key
0 commit comments