@@ -148,6 +148,10 @@ def run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem):
148
148
def _on_before_run_session (self , queue_item : SessionQueueItem ) -> None :
149
149
"""Run before a session is executed"""
150
150
151
+ self ._services .logger .debug (
152
+ f"On before run session: queue item { queue_item .item_id } , session { queue_item .session_id } "
153
+ )
154
+
151
155
# If profiling is enabled, start the profiler
152
156
if self ._profiler is not None :
153
157
self ._profiler .start (profile_id = queue_item .session_id )
@@ -158,6 +162,10 @@ def _on_before_run_session(self, queue_item: SessionQueueItem) -> None:
158
162
def _on_after_run_session (self , queue_item : SessionQueueItem ) -> None :
159
163
"""Run after a session is executed"""
160
164
165
+ self ._services .logger .debug (
166
+ f"On after run session: queue item { queue_item .item_id } , session { queue_item .session_id } "
167
+ )
168
+
161
169
# If we are profiling, stop the profiler and dump the profile & stats
162
170
if self ._profiler is not None :
163
171
profile_path = self ._profiler .stop ()
@@ -172,29 +180,33 @@ def _on_after_run_session(self, queue_item: SessionQueueItem) -> None:
172
180
# while the session is running.
173
181
queue_item = self ._services .session_queue .set_queue_item_session (queue_item .item_id , queue_item .session )
174
182
175
- # TODO(psyche): This feels jumbled - we should review separation of concerns here.
176
- # Send complete event. The events service will receive this and update the queue item's status.
177
- self ._services .events .emit_graph_execution_complete (
178
- queue_batch_id = queue_item .batch_id ,
179
- queue_item_id = queue_item .item_id ,
180
- queue_id = queue_item .queue_id ,
181
- graph_execution_state_id = queue_item .session .id ,
182
- )
183
+ # TODO(psyche): This feels jumbled - we should review separation of concerns here.
184
+ # Send complete event. The events service will receive this and update the queue item's status.
185
+ self ._services .events .emit_graph_execution_complete (
186
+ queue_batch_id = queue_item .batch_id ,
187
+ queue_item_id = queue_item .item_id ,
188
+ queue_id = queue_item .queue_id ,
189
+ graph_execution_state_id = queue_item .session .id ,
190
+ )
183
191
184
- # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
185
- # we don't care about that - suppress the error.
186
- with suppress (GESStatsNotFoundError ):
187
- self ._services .performance_statistics .log_stats (queue_item .session .id )
188
- self ._services .performance_statistics .reset_stats ()
192
+ # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
193
+ # we don't care about that - suppress the error.
194
+ with suppress (GESStatsNotFoundError ):
195
+ self ._services .performance_statistics .log_stats (queue_item .session .id )
196
+ self ._services .performance_statistics .reset_stats ()
189
197
190
- for callback in self ._on_after_run_session_callbacks :
191
- callback (queue_item = queue_item )
198
+ for callback in self ._on_after_run_session_callbacks :
199
+ callback (queue_item = queue_item )
192
200
except SessionQueueItemNotFoundError :
193
201
pass
194
202
195
203
def _on_before_run_node (self , invocation : BaseInvocation , queue_item : SessionQueueItem ):
196
204
"""Run before a node is executed"""
197
205
206
+ self ._services .logger .debug (
207
+ f"On before run node: queue item { queue_item .item_id } , session { queue_item .session_id } , node { invocation .id } ({ invocation .get_type ()} )"
208
+ )
209
+
198
210
# Send starting event
199
211
self ._services .events .emit_invocation_started (
200
212
queue_batch_id = queue_item .batch_id ,
@@ -213,6 +225,10 @@ def _on_after_run_node(
213
225
):
214
226
"""Run after a node is executed"""
215
227
228
+ self ._services .logger .debug (
229
+ f"On after run node: queue item { queue_item .item_id } , session { queue_item .session_id } , node { invocation .id } ({ invocation .get_type ()} )"
230
+ )
231
+
216
232
# Send complete event on successful runs
217
233
self ._services .events .emit_invocation_complete (
218
234
queue_batch_id = queue_item .batch_id ,
@@ -237,6 +253,10 @@ def _on_node_error(
237
253
):
238
254
"""Run when a node errors"""
239
255
256
+ self ._services .logger .debug (
257
+ f"On node error: queue item { queue_item .item_id } , session { queue_item .session_id } , node { invocation .id } ({ invocation .get_type ()} )"
258
+ )
259
+
240
260
# Node errors do not get the full traceback. Only the queue item gets the full traceback.
241
261
node_error = f"{ error_type } : { error_message } "
242
262
queue_item .session .set_node_error (invocation .id , node_error )
0 commit comments