File tree Expand file tree Collapse file tree 1 file changed +3
-7
lines changed
src/neo4j_graphrag/experimental/pipeline Expand file tree Collapse file tree 1 file changed +3
-7
lines changed Original file line number Diff line number Diff line change @@ -435,6 +435,7 @@ async def event_stream(event: Event) -> None:
435
435
# Add event streaming callback
436
436
self .callbacks .append (event_stream )
437
437
438
+ event_queue_getter_task = None
438
439
try :
439
440
# Start pipeline execution in background task
440
441
run_task = asyncio .create_task (self .run (data ))
@@ -459,19 +460,14 @@ async def event_stream(event: Event) -> None:
459
460
continue
460
461
yield event_future .result () # type: ignore
461
462
462
- # cancel remaining task
463
- event_queue_getter_task .cancel ()
464
-
465
- # # Drain any remaining events
466
- # while not event_queue.empty():
467
- # yield await event_queue.get()
468
- # Pipeline finished
469
463
if run_task .exception ():
470
464
raise run_task .exception () # type: ignore
471
465
472
466
finally :
473
467
# Restore original callback
474
468
self .callbacks .remove (event_stream )
469
+ if event_queue_getter_task and not event_queue_getter_task .done ():
470
+ event_queue_getter_task .cancel ()
475
471
476
472
async def run (self , data : dict [str , Any ]) -> PipelineResult :
477
473
logger .debug ("PIPELINE START" )
You can’t perform that action at this time.
0 commit comments