Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
for i in range(10):
yield {"data": f"Event {i}"}
await asyncio.sleep(1)
async def sse_endpoint(request):
return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
- Standards Compliant: Full SSE specification implementation
- Framework Integration: Native Starlette and FastAPI support
- Async/Await: Built on modern Python async patterns
- Connection Management: Automatic client disconnect detection
- Graceful Shutdown: Proper cleanup on server termination
- Thread Safety: Context-local event management for multi-threaded applications
- Multi-Loop Support: Works correctly with multiple asyncio event loops
The main response class that handles SSE streaming:
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
for item in data:
yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
For structured event creation:
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
data="Custom message",
event="notification",
id="msg-123",
retry=5000
)
For an easy way to send json data as SSE events:
from sse_starlette import JSONServerSentEvent
event = JSONServerSentEvent(
data={"field":"value"}, # Anything serializable with json.dumps
)
from sse_starlette import ServerSentEvent
def custom_ping():
return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
generate_events(),
ping=10, # Ping every 10 seconds
ping_message_factory=custom_ping
)
sse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:
import threading
import asyncio
from sse_starlette import EventSourceResponse
def run_sse_in_thread():
"""SSE streaming works correctly in separate threads"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def thread_events():
for i in range(5):
yield {"data": f"Thread event {i}"}
await asyncio.sleep(1)
# This works without "Event bound to different loop" errors
response = EventSourceResponse(thread_events())
loop.close()
# Start SSE in multiple threads
for i in range(3):
thread = threading.Thread(target=run_sse_in_thread)
thread.start()
async def stream_database_results(request):
# CORRECT: Create session within generator context
async with AsyncSession() as session:
results = await session.execute(select(User))
for row in results:
if await request.is_disconnected():
break
yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
async def robust_stream(request):
try:
for i in range(100):
if await request.is_disconnected():
break
yield {"data": f"Item {i}"}
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - perform cleanup
raise
return EventSourceResponse(
robust_stream(request),
send_timeout=30, # Timeout hanging sends
headers={"Cache-Control": "no-cache"}
)
For complex data flows, use memory channels instead of generators:
import anyio
from functools import partial
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def channel_endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel)
)
Parameter | Type | Default | Description |
---|---|---|---|
content |
ContentStream |
Required | Async generator or iterable |
ping |
int |
15 | Ping interval in seconds |
sep |
str |
"\r\n" |
Line separator (\r\n , \r , \n ) |
send_timeout |
float |
None |
Send operation timeout |
headers |
dict |
None |
Additional HTTP headers |
ping_message_factory |
Callable |
None |
Custom ping message creator |
async def monitored_stream(request):
events_sent = 0
try:
while events_sent < 100:
if await request.is_disconnected():
print(f"Client disconnected after {events_sent} events")
break
yield {"data": f"Event {events_sent}"}
events_sent += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
sse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:
# this is deprecated and not needed since version 3.0.0
import pytest
from sse_starlette import EventSourceResponse
@pytest.fixture
def reset_sse_app_status():
AppStatus.should_exit_event = None
yield
AppStatus.should_exit_event = None
- Memory: Each connection maintains a buffer. Monitor memory usage.
- Connections: Limited by system file descriptors and application design.
- Network: High-frequency events can saturate bandwidth.
Implement client-side reconnection logic:
function createEventSource(url) {
const eventSource = new EventSource(url);
eventSource.onerror = function() {
setTimeout(() => {
createEventSource(url); // Reconnect after delay
}, 5000);
};
return eventSource;
}
The examples/
directory contains production-ready patterns:
01_basic_sse.py
: Fundamental SSE concepts02_message_broadcasting.py
: Multi-client message distribution03_database_streaming.py
: Thread-safe database integration04_advanced_features.py
: Custom protocols and error handling
The examples/demonstrations/
directory provides educational scenarios:
Basic Patterns (basic_patterns/
):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
Production Scenarios (production_scenarios/
):
- Load testing with concurrent clients
- Network interruption handling
Advanced Patterns (advanced_patterns/
):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
Database session errors with async generators
- Create database sessions inside generators, not as dependencies
Hanging connections after client disconnect
- Always check
await request.is_disconnected()
in loops - Use
send_timeout
parameter to detect dead connections
If you are using Postman, please see: #47 (comment)
# Connection limits
class ConnectionLimiter:
def __init__(self, max_connections=100):
self.semaphore = asyncio.Semaphore(max_connections)
async def limited_endpoint(self, request):
async with self.semaphore:
return EventSourceResponse(generate_events())
See examples and demonstrations for implementation patterns. Run tests with:
make test-unit # Unit tests only
make test # All tests including integration