Skip to content

Commit 80faeac

Browse files
fix(processor): fix race condition related to clearing the queue
1 parent 418c932 commit 80faeac

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

invokeai/app/services/session_processor/session_processor_default.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
OnNonFatalProcessorError,
2020
)
2121
from invokeai.app.services.session_processor.session_processor_common import CanceledException
22-
from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem
22+
from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem, SessionQueueItemNotFoundError
2323
from invokeai.app.services.shared.graph import NodeInputError
2424
from invokeai.app.services.shared.invocation_context import InvocationContextData, build_invocation_context
2525
from invokeai.app.util.profiler import Profiler
@@ -166,8 +166,11 @@ def _on_after_run_session(self, queue_item: SessionQueueItem) -> None:
166166
graph_execution_state_id=queue_item.session.id, output_path=stats_path
167167
)
168168

169-
# Update the queue item with the completed session
170-
self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session)
169+
try:
170+
# Update the queue item with the completed session. If the queue item has been removed from the queue,
171+
# we'll get a SessionQueueItemNotFoundError and we can ignore it. This can happen if the queue is cleared
172+
# while the session is running.
173+
queue_item = self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session)
171174

172175
# TODO(psyche): This feels jumbled - we should review separation of concerns here.
173176
# Send complete event. The events service will receive this and update the queue item's status.
@@ -186,6 +189,8 @@ def _on_after_run_session(self, queue_item: SessionQueueItem) -> None:
186189

187190
for callback in self._on_after_run_session_callbacks:
188191
callback(queue_item=queue_item)
192+
except SessionQueueItemNotFoundError:
193+
pass
189194

190195
def _on_before_run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem):
191196
"""Run before a node is executed"""
@@ -349,6 +354,7 @@ async def _on_queue_event(self, event: FastAPIEvent) -> None:
349354
"failed",
350355
"canceled",
351356
]:
357+
self._cancel_event.set()
352358
self._poll_now()
353359

354360
def resume(self) -> SessionProcessorStatus:

0 commit comments

Comments
 (0)