Skip to content

Commit 98f415d

Browse files
committed
Fix implementation of custom queue
1 parent 8cdde13 commit 98f415d

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

src/databricks/labs/ucx/assessment/sequencing.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class PriorityQueue:
6767
"""
6868

6969
_REMOVED = "<removed>" # Mark removed items
70+
_UPDATED = "<updated>" # Mark updated items
7071

7172
def __init__(self):
7273
self._entries: list[QueueEntry] = []
@@ -78,9 +79,8 @@ def put(self, priority: int, task: QueueTask) -> None:
7879
7980
The lowest priority is retrieved from the queue first.
8081
"""
81-
# Update a task by checking if it already exists, then removing it
8282
if task in self._entry_finder:
83-
self._remove(task)
83+
raise KeyError(f"Use `:meth:update` to update existing task: {task}")
8484
entry = [priority, self._counter, task]
8585
self._entry_finder[task] = entry
8686
heapq.heappush(self._entries, entry)
@@ -90,17 +90,27 @@ def get(self) -> QueueTask | None:
9090
"""Gets the tasks with lowest priority."""
9191
while self._entries:
9292
_, _, task = heapq.heappop(self._entries)
93-
if task != self._REMOVED:
94-
self._remove(task)
95-
# Ignore type because heappop returns Any, while we know it is an QueueEntry
96-
return task # type: ignore
93+
if task in (self._REMOVED, self._UPDATED):
94+
continue
95+
self._remove(task)
96+
# Ignore type because heappop returns Any, while we know it is an QueueEntry
97+
return task # type: ignore
9798
return None
9899

99100
def _remove(self, task: QueueTask) -> None:
100101
"""Remove a task from the queue."""
101102
entry = self._entry_finder.pop(task)
102103
entry[2] = self._REMOVED
103104

105+
def update(self, priority: int, task: QueueTask) -> None:
106+
"""Update a task in the queue."""
107+
entry = self._entry_finder.pop(task)
108+
if entry is None:
109+
raise KeyError(f"Cannot update unknown task: {task}")
110+
if entry[2] != self._REMOVED: # Do not update REMOVED tasks
111+
entry[2] = self._UPDATED
112+
self.put(priority, task)
113+
104114

105115
class MigrationSequencer:
106116
"""Sequence the migration dependencies in order to execute the migration.
@@ -205,12 +215,11 @@ def generate_steps(self) -> Iterable[MigrationStep]:
205215
required_step_ids = sorted(self._get_required_step_ids(incoming_keys[key]))
206216
step = self._nodes[key].as_step(step_number, required_step_ids)
207217
sorted_steps.append(step)
208-
# Update the
218+
# Update queue priorities
209219
for dependency_key in self._outgoing[key]:
210220
incoming_counts[dependency_key] -= 1
211-
key_queue.put(incoming_counts[dependency_key], dependency_key)
221+
key_queue.update(incoming_counts[dependency_key], dependency_key)
212222
step_number += 1
213-
key_queue = self._create_key_queue(incoming_counts) # Reprioritize queue given new incoming counts
214223
key = key_queue.get()
215224
return sorted_steps
216225

0 commit comments

Comments
 (0)