Skip to content

Commit 086c229

Browse files
committed
Remove update from queue
1 parent cc571c4 commit 086c229

File tree

1 file changed

+9
-16
lines changed

1 file changed

+9
-16
lines changed

src/databricks/labs/ucx/assessment/sequencing.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import heapq
4+
import itertools
45
from collections import defaultdict
56
from collections.abc import Iterable
67
from dataclasses import dataclass, field
@@ -94,30 +95,29 @@ class PriorityQueue:
9495
"""
9596

9697
_REMOVED = "<removed>" # Mark removed items
97-
_UPDATED = "<updated>" # Mark updated items
9898

9999
def __init__(self):
100100
self._entries: list[QueueEntry] = []
101101
self._entry_finder: dict[MigrationNode, QueueEntry] = {}
102-
self._counter = 0 # Tiebreaker with equal priorities, then "first in, first out"
102+
self._counter = itertools.count() # Tiebreaker with equal priorities, then "first in, first out"
103103

104104
def put(self, priority: int, task: MigrationNode) -> None:
105105
"""Put or update task in the queue.
106106
107107
The lowest priority is retrieved from the queue first.
108108
"""
109109
if task in self._entry_finder:
110-
raise KeyError(f"Use `:meth:update` to update existing task: {task}")
111-
entry: QueueEntry = [priority, self._counter, task]
110+
self._remove(task)
111+
count = next(self._counter)
112+
entry = [priority, count, task]
112113
self._entry_finder[task] = entry
113114
heapq.heappush(self._entries, entry)
114-
self._counter += 1
115115

116116
def get(self) -> MigrationNode | None:
117117
"""Gets the tasks with lowest priority."""
118118
while self._entries:
119119
_, _, task = heapq.heappop(self._entries)
120-
if task in (self._REMOVED, self._UPDATED):
120+
if task == self._REMOVED:
121121
continue
122122
assert isinstance(task, MigrationNode)
123123
self._remove(task)
@@ -130,15 +130,6 @@ def _remove(self, task: MigrationNode) -> None:
130130
entry = self._entry_finder.pop(task)
131131
entry[2] = self._REMOVED
132132

133-
def update(self, priority: int, task: MigrationNode) -> None:
134-
"""Update a task in the queue."""
135-
entry = self._entry_finder.pop(task)
136-
if entry is None:
137-
raise KeyError(f"Cannot update unknown task: {task}")
138-
if entry[2] != self._REMOVED: # Do not update REMOVED tasks
139-
entry[2] = self._UPDATED
140-
self.put(priority, task)
141-
142133

143134
class MigrationSequencer:
144135
"""Sequence the migration dependencies in order to execute the migration.
@@ -244,8 +235,10 @@ def generate_steps(self) -> Iterable[MigrationStep]:
244235
seen.add(node)
245236
# Update the queue priority as if the migration step was completed
246237
for dependency in self._outgoing[node.key]:
238+
if dependency in seen:
239+
continue
247240
priority = len(incoming[dependency.key] - seen)
248-
queue.update(priority, dependency)
241+
queue.put(priority, dependency)
249242
node = queue.get()
250243
return ordered_steps
251244

0 commit comments

Comments
 (0)