-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Body timeout errors incorrectly reported as "error decoding response body"
Summary
When a read_timeout expires while streaming a response body, reqwest reports it as "error decoding response body" instead of a timeout error. This is misleading because it suggests data corruption or format issues when the actual problem is a timeout.
Current Behavior
When using streaming responses (e.g., Server-Sent Events from OpenAI API) with read_timeout configured, if the timeout expires while waiting for data, the error message is:
error decoding response body
Users see "error decoding response body" and assume:
- Malformed JSON/data
- Encoding/UTF-8 issues
- Corrupted response
When it's actually: "Timeout waiting for next chunk"
Root Cause Analysis
Error transformation chain:
-
async_impl/body.rs:361-ReadTimeoutBody::poll_frame()if let Poll::Ready(()) = sleep_pinned.poll(cx) { return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); }
Creates:
Error { kind: Kind::Body, source: TimedOut } -
async_impl/decoder.rs:369-Decoder::poll_frame()forPlainTextInner::PlainText(ref mut body) => match ready!(Pin::new(body).poll_frame(cx)) { Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))), // ← HERE None => Poll::Ready(None), },
Blindly wraps ANY body error as decode error
-
error.rs:325pub(crate) fn decode<E: Into<BoxError>>(e: E) -> Error { Error::new(Kind::Decode, Some(e)) }
Expected Behavior
The error should be reported as a timeout, not a decode error. Options:
Option 1: Check if the error is a timeout and preserve it:
Some(Err(err)) => {
if err.is_timeout() || err.is_body() {
Poll::Ready(Some(Err(err))) // Don't wrap timeouts/body errors
} else {
Poll::Ready(Some(Err(crate::error::decode(err))))
}
}Option 2: Only use decode for actual decoding errors (gzip/brotli/etc):
Inner::PlainText(ref mut body) => match ready!(Pin::new(body).poll_frame(cx)) {
Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))),
Some(Err(err)) => Poll::Ready(Some(Err(err))), // Pass through as-is
None => Poll::Ready(None),
},Reproduction
Complete reproduction with SSE server and client demonstrating the bug:
File structure:
sse_timeout_demo/
├── Cargo.toml
├── server.rs
├── client.rs
└── run.sh
Cargo.toml:
[package]
name = "sse_timeout_demo"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "server"
path = "server.rs"
[[bin]]
name = "client"
path = "client.rs"
[dependencies]
tokio = { version = "1.41", features = ["full"] }
reqwest = { version = "0.12", features = ["stream"] }
futures-util = "0.3"
warp = "0.3"
async-stream = "0.3"server.rs:
/// Slow SSE server that sends events with long gaps to trigger read timeouts
use std::time::Duration;
use tokio::time::sleep;
use warp::Filter;
#[tokio::main]
async fn main() {
println!("Starting slow SSE server on http://127.0.0.1:3030/stream");
println!("Server will send chunks with 10 second gaps between them");
let stream_route = warp::path("stream")
.and(warp::get())
.map(|| {
let event_stream = async_stream::stream! {
println!("[Server] Client connected");
// Send first event immediately
let event1 = warp::sse::Event::default()
.data(r#"{"event": 1, "message": "First event"}"#);
yield Ok::<_, warp::Error>(event1);
println!("[Server] Sent event 1");
// Wait 10 seconds (will trigger 5 second read timeout on client)
sleep(Duration::from_secs(10)).await;
// Send second event
let event2 = warp::sse::Event::default()
.data(r#"{"event": 2, "message": "Second event after 10s delay"}"#);
yield Ok::<_, warp::Error>(event2);
println!("[Server] Sent event 2");
// Wait another 10 seconds
sleep(Duration::from_secs(10)).await;
// Send third event
let event3 = warp::sse::Event::default()
.data(r#"{"event": 3, "message": "Third event after another 10s delay"}"#);
yield Ok::<_, warp::Error>(event3);
println!("[Server] Sent event 3");
// End stream
println!("[Server] Stream complete");
};
warp::sse::reply(warp::sse::keep_alive().stream(event_stream))
});
warp::serve(stream_route)
.run(([127, 0, 0, 1], 3030))
.await;
}client.rs:
/// Reqwest client that demonstrates timeout being reported as decode error
use reqwest::Client;
use std::time::Duration;
use futures_util::StreamExt;
#[tokio::main]
async fn main() {
println!("=== Reqwest SSE Client with 5 second read timeout ===\n");
// Build client with 5 second read timeout
let client = Client::builder()
.read_timeout(Duration::from_secs(5))
.build()
.unwrap();
println!("Connecting to http://127.0.0.1:3030/stream");
println!("Read timeout: 5 seconds");
println!("Expected: Server sends events with 10 second gaps, causing timeout\n");
let response = match client
.get("http://127.0.0.1:3030/stream")
.send()
.await
{
Ok(resp) => {
println!("✓ Connected successfully");
println!("Status: {}", resp.status());
println!("Headers: {:?}\n", resp.headers());
resp
}
Err(e) => {
eprintln!("Failed to connect: {}", e);
return;
}
};
println!("Starting to read stream...\n");
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(bytes) => {
let data = String::from_utf8_lossy(&bytes);
println!(" Data: {}", data.trim());
println!();
}
Err(e) => {
println!("\n=== ERROR OCCURRED ===");
println!("Error message: {}", e);
println!();
println!("\n=== DEMONSTRATION COMPLETE ===");
break;
}
}
}
}run.sh:
#!/bin/bash
set -e
echo "=== SSE Timeout Bug Demonstration ==="
echo ""
echo "This demo shows how reqwest incorrectly reports read timeouts"
echo "as 'error decoding response body' when streaming SSE events."
echo ""
echo "Setup:"
echo " • Server: Sends SSE events with 10 second gaps"
echo " • Client: Has 5 second read timeout"
echo " • Expected: Timeout after first event"
echo ""
# Change to demo directory
cd "$(dirname "$0")"
# Build both binaries
echo "Building server and client..."
cargo build --release --bins
echo ""
# Start server in background
echo "Starting server..."
cargo run --release --bin server &
SERVER_PID=$!
# Give server time to start
sleep 2
# Run client
echo ""
echo "Running client..."
echo "=================================="
cargo run --release --bin client
# Cleanup
echo ""
echo "Stopping server..."
kill $SERVER_PID 2>/dev/null || trueTo run the reproduction:
chmod +x run.sh
./run.shExpected output:
Starting to read stream...
Data: data:{"event": 1, "message": "First event"}
=== ERROR OCCURRED ===
Error message: error decoding response body
=== DEMONSTRATION COMPLETE ===
Stopping server...
The demonstration shows that both is_timeout() and is_decode() return true, proving the timeout error is incorrectly wrapped as a decode error.
Impact
- Misleading error messages cause users to debug the wrong thing (data format instead of timeout config)
- Error handling - Code checking
is_decode()will incorrectly match timeout errors - Observability - Monitoring systems categorize these as "decode failures" instead of "timeout failures"
- User confusion - "error decoding response body" implies the response data is corrupted when it's actually a timing issue
Environment
- reqwest version: 0.12.23 (also affects 0.12.15+)
- Rust version: 1.81+
- OS: Linux, Windows, macOS (all affected)
Related Issues
This affects any streaming response with read_timeout:
- Server-Sent Events (SSE)
- Chunked transfer encoding
- Long-running streams (video, audio, data feeds)
- OpenAI/Anthropic streaming APIs
Suggested Fix Location
File: src/async_impl/decoder.rs
Line: 369
Function: Decoder::poll_frame() - Inner::PlainText match arm
Check if error is a body/timeout error before wrapping as decode error.