Skip to content

Commit bc70e56

Browse files
committed
Fix "Task destroyed but is pending" warning
1 parent 6a7c168 commit bc70e56

File tree

1 file changed

+3
-7
lines changed

1 file changed

+3
-7
lines changed

src/neo4j_graphrag/experimental/pipeline/pipeline.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ async def event_stream(event: Event) -> None:
435435
# Add event streaming callback
436436
self.callbacks.append(event_stream)
437437

438+
event_queue_getter_task = None
438439
try:
439440
# Start pipeline execution in background task
440441
run_task = asyncio.create_task(self.run(data))
@@ -459,19 +460,14 @@ async def event_stream(event: Event) -> None:
459460
continue
460461
yield event_future.result() # type: ignore
461462

462-
# cancel remaining task
463-
event_queue_getter_task.cancel()
464-
465-
# # Drain any remaining events
466-
# while not event_queue.empty():
467-
# yield await event_queue.get()
468-
# Pipeline finished
469463
if run_task.exception():
470464
raise run_task.exception() # type: ignore
471465

472466
finally:
473467
# Restore original callback
474468
self.callbacks.remove(event_stream)
469+
if event_queue_getter_task and not event_queue_getter_task.done():
470+
event_queue_getter_task.cancel()
475471

476472
async def run(self, data: dict[str, Any]) -> PipelineResult:
477473
logger.debug("PIPELINE START")

0 commit comments

Comments
 (0)