Skip to content

Commit efb069d

Browse files
feat(app): iterate on processor split
- Add `OnNodeError` and `OnNonFatalProcessorError` callbacks - Move all session/node callbacks to `SessionRunner` - this ensures we dump perf stats before resetting them and generally makes sense to me - Remove `complete` event from `SessionRunner`, it's essentially the same as `OnAfterRunSession` - Remove extraneous `next_invocation` block, which would treat a processor error as a node error - Simplify loops - Add some callbacks for testing, to be removed before merge
1 parent 8edc25d commit efb069d

File tree

3 files changed

+188
-142
lines changed

3 files changed

+188
-142
lines changed

invokeai/app/api/dependencies.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from ..services.model_manager.model_manager_default import ModelManagerService
3030
from ..services.model_records import ModelRecordServiceSQL
3131
from ..services.names.names_default import SimpleNameService
32-
from ..services.session_processor.session_processor_default import DefaultSessionProcessor
32+
from ..services.session_processor.session_processor_default import DefaultSessionProcessor, DefaultSessionRunner
3333
from ..services.session_queue.session_queue_sqlite import SqliteSessionQueue
3434
from ..services.urls.urls_default import LocalUrlService
3535
from ..services.workflow_records.workflow_records_sqlite import SqliteWorkflowRecordsStorage
@@ -103,7 +103,41 @@ def initialize(config: InvokeAIAppConfig, event_handler_id: int, logger: Logger
103103
)
104104
names = SimpleNameService()
105105
performance_statistics = InvocationStatsService()
106-
session_processor = DefaultSessionProcessor()
106+
107+
def on_before_run_session(queue_item):
108+
print("BEFORE RUN SESSION", queue_item.item_id)
109+
return True
110+
111+
def on_before_run_node(invocation, queue_item):
112+
print("BEFORE RUN NODE", invocation.id)
113+
return True
114+
115+
def on_after_run_node(invocation, queue_item, outputs):
116+
print("AFTER RUN NODE", invocation.id)
117+
return True
118+
119+
def on_node_error(invocation, queue_item, exc_type, exc_value, exc_traceback):
120+
print("NODE ERROR", invocation.id)
121+
return True
122+
123+
def on_after_run_session(queue_item):
124+
print("AFTER RUN SESSION", queue_item.item_id)
125+
return True
126+
127+
def on_non_fatal_processor_error(queue_item, exc_type, exc_value, exc_traceback):
128+
print("NON FATAL PROCESSOR ERROR", exc_value)
129+
return True
130+
131+
session_processor = DefaultSessionProcessor(
132+
DefaultSessionRunner(
133+
on_before_run_session,
134+
on_before_run_node,
135+
on_after_run_node,
136+
on_node_error,
137+
on_after_run_session,
138+
),
139+
on_non_fatal_processor_error,
140+
)
107141
session_queue = SqliteSessionQueue(db=db)
108142
urls = LocalUrlService()
109143
workflow_records = SqliteWorkflowRecordsStorage(db=db)

invokeai/app/services/session_processor/session_processor_base.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from abc import ABC, abstractmethod
22
from threading import Event
33

4+
from invokeai.app.invocations.baseinvocation import BaseInvocation
45
from invokeai.app.services.invocation_services import InvocationServices
56
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
67
from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem
@@ -22,12 +23,7 @@ def run(self, queue_item: SessionQueueItem) -> None:
2223
pass
2324

2425
@abstractmethod
25-
def complete(self, queue_item: SessionQueueItem) -> None:
26-
"""Completes the session"""
27-
pass
28-
29-
@abstractmethod
30-
def run_node(self, node_id: str, queue_item: SessionQueueItem) -> None:
26+
def run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem) -> None:
3127
"""Runs an already prepared node on the session"""
3228
pass
3329

0 commit comments

Comments
 (0)