@@ -52,9 +52,9 @@ def __init__(
52
52
53
53
def start (self , services : InvocationServices , cancel_event : ThreadEvent , profiler : Optional [Profiler ] = None ):
54
54
"""Start the session runner"""
55
- self .services = services
56
- self .cancel_event = cancel_event
57
- self .profiler = profiler
55
+ self ._services = services
56
+ self ._cancel_event = cancel_event
57
+ self ._profiler = profiler
58
58
59
59
def run (self , queue_item : SessionQueueItem ):
60
60
"""Run the graph"""
@@ -64,33 +64,33 @@ def run(self, queue_item: SessionQueueItem):
64
64
65
65
while True :
66
66
invocation = queue_item .session .next ()
67
- if invocation is None or self .cancel_event .is_set ():
67
+ if invocation is None or self ._cancel_event .is_set ():
68
68
break
69
69
self .run_node (invocation , queue_item )
70
- if queue_item .session .is_complete () or self .cancel_event .is_set ():
70
+ if queue_item .session .is_complete () or self ._cancel_event .is_set ():
71
71
break
72
72
73
73
self ._on_after_run_session (queue_item = queue_item )
74
74
75
75
def _on_before_run_session (self , queue_item : SessionQueueItem ) -> None :
76
76
# If profiling is enabled, start the profiler
77
- if self .profiler is not None :
78
- self .profiler .start (profile_id = queue_item .session_id )
77
+ if self ._profiler is not None :
78
+ self ._profiler .start (profile_id = queue_item .session_id )
79
79
80
80
if self .on_before_run_session :
81
81
self .on_before_run_session (queue_item = queue_item )
82
82
83
83
def _on_after_run_session (self , queue_item : SessionQueueItem ) -> None :
84
84
# If we are profiling, stop the profiler and dump the profile & stats
85
- if self .profiler is not None :
86
- profile_path = self .profiler .stop ()
85
+ if self ._profiler is not None :
86
+ profile_path = self ._profiler .stop ()
87
87
stats_path = profile_path .with_suffix (".json" )
88
- self .services .performance_statistics .dump_stats (
88
+ self ._services .performance_statistics .dump_stats (
89
89
graph_execution_state_id = queue_item .session .id , output_path = stats_path
90
90
)
91
91
92
92
# Send complete event
93
- self .services .events .emit_graph_execution_complete (
93
+ self ._services .events .emit_graph_execution_complete (
94
94
queue_batch_id = queue_item .batch_id ,
95
95
queue_item_id = queue_item .item_id ,
96
96
queue_id = queue_item .queue_id ,
@@ -100,16 +100,16 @@ def _on_after_run_session(self, queue_item: SessionQueueItem) -> None:
100
100
# We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
101
101
# we don't care about that - suppress the error.
102
102
with suppress (GESStatsNotFoundError ):
103
- self .services .performance_statistics .log_stats (queue_item .session .id )
104
- self .services .performance_statistics .reset_stats ()
103
+ self ._services .performance_statistics .log_stats (queue_item .session .id )
104
+ self ._services .performance_statistics .reset_stats ()
105
105
106
106
if self .on_after_run_session :
107
107
self .on_after_run_session (queue_item )
108
108
109
109
def _on_before_run_node (self , invocation : BaseInvocation , queue_item : SessionQueueItem ):
110
110
"""Run before a node is executed"""
111
111
# Send starting event
112
- self .services .events .emit_invocation_started (
112
+ self ._services .events .emit_invocation_started (
113
113
queue_batch_id = queue_item .batch_id ,
114
114
queue_item_id = queue_item .item_id ,
115
115
queue_id = queue_item .queue_id ,
@@ -126,7 +126,7 @@ def _on_after_run_node(
126
126
):
127
127
"""Run after a node is executed"""
128
128
# Send complete event on successful runs
129
- self .services .events .emit_invocation_complete (
129
+ self ._services .events .emit_invocation_complete (
130
130
queue_batch_id = queue_item .batch_id ,
131
131
queue_item_id = queue_item .item_id ,
132
132
queue_id = queue_item .queue_id ,
@@ -150,13 +150,13 @@ def _on_node_error(
150
150
stacktrace = get_stacktrace (exc_type , exc_value , exc_traceback )
151
151
152
152
queue_item .session .set_node_error (invocation .id , stacktrace )
153
- self .services .logger .error (
153
+ self ._services .logger .error (
154
154
f"Error while invoking session { queue_item .session_id } , invocation { invocation .id } ({ invocation .get_type ()} ):\n { exc_type } "
155
155
)
156
- self .services .logger .error (stacktrace )
156
+ self ._services .logger .error (stacktrace )
157
157
158
158
# Send error event
159
- self .services .events .emit_invocation_error (
159
+ self ._services .events .emit_invocation_error (
160
160
queue_batch_id = queue_item .session_id ,
161
161
queue_item_id = queue_item .item_id ,
162
162
queue_id = queue_item .queue_id ,
@@ -176,7 +176,7 @@ def run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem):
176
176
"""Run a single node in the graph"""
177
177
try :
178
178
# Any unhandled exception is an invocation error & will fail the graph
179
- with self .services .performance_statistics .collect_stats (invocation , queue_item .session_id ):
179
+ with self ._services .performance_statistics .collect_stats (invocation , queue_item .session_id ):
180
180
self ._on_before_run_node (invocation , queue_item )
181
181
182
182
data = InvocationContextData (
@@ -186,12 +186,12 @@ def run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem):
186
186
)
187
187
context = build_invocation_context (
188
188
data = data ,
189
- services = self .services ,
190
- cancel_event = self .cancel_event ,
189
+ services = self ._services ,
190
+ cancel_event = self ._cancel_event ,
191
191
)
192
192
193
193
# Invoke the node
194
- outputs = invocation .invoke_internal (context = context , services = self .services )
194
+ outputs = invocation .invoke_internal (context = context , services = self ._services )
195
195
# Save outputs and history
196
196
queue_item .session .complete (invocation .id , outputs )
197
197
0 commit comments