Skip to content

Streaming deadlocks with async tools - sync_send_to_stream blocks event loop #8934

@AndreRatzenberger

Description

@AndreRatzenberger

Streaming deadlocks when using async tools/callbacks - sync_send_to_stream blocks event loop

🐛 Bug Description

When using DSPy's streaming feature (dspy.streamify()) with async tools (like MCP tools or ReAct tool calls), the streaming freezes indefinitely after a tool completes execution. The stream never continues past "Tool calling finished!" status message.

💥 Error Message

WARNING dspy.utils.callback: Error when applying callback <dspy.streaming.messages.StatusStreamingCallback object>'s end handler on function acall: Not running inside an AnyIO worker thread, and no event loop token was provided.

🔍 Root Cause

The sync_send_to_stream() function in dspy/streaming/messages.py blocks the event loop in two ways:

  1. When event loop IS running (lines 45-47):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(run_in_new_loop)
    return future.result()  # ← BLOCKS the main event loop!
  1. When event loop is NOT running (line 50):
return syncify(_send)()  # ← Raises AnyIO error without proper context

The blocking future.result() call prevents the stream from continuing, causing a deadlock when StatusStreamingCallback.on_tool_end() fires after async tool execution.

📋 Reproduction Steps

import asyncio
import dspy
from dspy.streaming import StreamListener

# 1. Set up streaming with async tool
lm = dspy.LM(model="openai/gpt-4o")

def my_async_tool(query: str) -> str:
    """An async tool that simulates work."""
    import time
    time.sleep(0.1)  # Simulate async work
    return f"Result for: {query}"

signature = dspy.Signature(
    {"input": (str, dspy.InputField()), "output": (str, dspy.OutputField())}
)

# 2. Create ReAct program with tools
program = dspy.ReAct(signature, tools=[my_async_tool], max_iters=2)

# 3. Wrap with streamify
streaming_task = dspy.streamify(
    program,
    is_async_program=True,
    stream_listeners=[StreamListener(signature_field_name="output")]
)

# 4. Execute - this will freeze after tool execution
async def run():
    with dspy.context(lm=lm):
        stream = streaming_task(input="test query")
        async for value in stream:
            print(value)

asyncio.run(run())
# ⚠️ Stream freezes at "Tool calling finished!" - never completes

🔧 Proposed Fix

Replace the blocking implementation with non-blocking fire-and-forget:

def sync_send_to_stream(stream, message):
    """Send message to stream without blocking the event loop."""

    async def _send():
        try:
            await stream.send(message)
        except Exception:
            pass  # Gracefully handle send failures

    try:
        loop = asyncio.get_running_loop()
        # Non-blocking: schedule as background task (fire-and-forget)
        loop.create_task(_send())
    except RuntimeError:
        # No event loop running - safe to create new one
        try:
            asyncio.run(_send())
        except Exception:
            pass  # Gracefully handle failures

Key changes:

  • Remove ThreadPoolExecutor + blocking future.result()
  • Use loop.create_task() for fire-and-forget pattern
  • Never block the calling thread
  • Gracefully handle errors

✅ Testing

We've tested this fix extensively with:

  • Multiple concurrent streams
  • MCP tool integrations
  • WebSocket broadcasting
  • Long-running tool executions

The fix eliminates the deadlock while maintaining all streaming functionality.

🌍 Impact

This bug affects any use case involving:

  • Async tools with streaming
  • ReAct agents with tool calls
  • Real-time status updates during tool execution
  • Multi-agent systems with parallel streaming

📦 Version Info

  • DSPy Version: 3.0.4b1 (also present in 3.0.0)
  • Python: 3.10+
  • Environment: Async event loop context

🤝 Contribution

We have a working implementation and can submit a PR with tests if this approach looks good to the maintainers.

Let me know if you need any additional details or reproduction examples!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions