Skip to content

Commit af42122

Browse files
authored
fix(request-response): Report failure when streams are at capacity
Fixes potential hanging issue if use relies on response or failures to make progress Pull-Request: #5417.
1 parent 68301b8 commit af42122

File tree

3 files changed

+74
-4
lines changed

3 files changed

+74
-4
lines changed

protocols/request-response/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 0.26.3
22

3+
- Report failure when streams are at capacity.
4+
See [PR 5417](https://github.com/libp2p/rust-libp2p/pull/5417).
5+
36
- Report dial IO errors to the user.
47
See [PR 5429](https://github.com/libp2p/rust-libp2p/pull/5429).
58

protocols/request-response/src/handler.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ where
159159
}
160160
};
161161

162+
// Inbound connections are reported to the upper layer from within the above task,
163+
// so by failing to schedule it, it means the upper layer will never know about the
164+
// inbound request. Because of that we do not report any inbound failure.
162165
if self
163166
.worker_streams
164167
.try_push(RequestId::Inbound(request_id), recv.boxed())
@@ -204,7 +207,10 @@ where
204207
.try_push(RequestId::Outbound(request_id), send.boxed())
205208
.is_err()
206209
{
207-
tracing::warn!("Dropping outbound stream because we are at capacity")
210+
self.pending_events.push_back(Event::OutboundStreamFailed {
211+
request_id: message.request_id,
212+
error: io::Error::new(io::ErrorKind::Other, "max sub-streams reached"),
213+
});
208214
}
209215
}
210216

protocols/request-response/tests/error_reporting.rs

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,58 @@ async fn report_outbound_timeout_on_read_response() {
161161
futures::future::select(server_task, client_task).await;
162162
}
163163

164+
#[async_std::test]
165+
async fn report_outbound_failure_on_max_streams() {
166+
let _ = tracing_subscriber::fmt()
167+
.with_env_filter(EnvFilter::from_default_env())
168+
.try_init();
169+
170+
// `swarm2` will be able to handle only 1 stream per time.
171+
let swarm2_config = request_response::Config::default()
172+
.with_request_timeout(Duration::from_millis(100))
173+
.with_max_concurrent_streams(1);
174+
175+
let (peer1_id, mut swarm1) = new_swarm();
176+
let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config);
177+
178+
swarm1.listen().with_memory_addr_external().await;
179+
swarm2.connect(&mut swarm1).await;
180+
181+
let swarm1_task = async move {
182+
let _req_id = swarm1
183+
.behaviour_mut()
184+
.send_request(&peer2_id, Action::FailOnMaxStreams);
185+
186+
// Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead.
187+
wait_no_events(&mut swarm1).await;
188+
};
189+
190+
// Expects OutboundFailure::Io failure.
191+
let swarm2_task = async move {
192+
let (peer, _inbound_req_id, action, _resp_channel) =
193+
wait_request(&mut swarm2).await.unwrap();
194+
assert_eq!(peer, peer1_id);
195+
assert_eq!(action, Action::FailOnMaxStreams);
196+
197+
// A task for sending back a response is already scheduled so max concurrent
198+
// streams is reached and no new tasks can be sheduled.
199+
//
200+
// We produce the failure by creating new request before we response.
201+
let outbound_req_id = swarm2
202+
.behaviour_mut()
203+
.send_request(&peer1_id, Action::FailOnMaxStreams);
204+
205+
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
206+
assert_eq!(peer, peer1_id);
207+
assert_eq!(req_id_done, outbound_req_id);
208+
assert!(matches!(error, OutboundFailure::Io(_)));
209+
};
210+
211+
let swarm1_task = pin!(swarm1_task);
212+
let swarm2_task = pin!(swarm2_task);
213+
futures::future::select(swarm1_task, swarm2_task).await;
214+
}
215+
164216
#[async_std::test]
165217
async fn report_inbound_failure_on_read_request() {
166218
let _ = tracing_subscriber::fmt()
@@ -332,6 +384,7 @@ enum Action {
332384
FailOnWriteRequest,
333385
FailOnWriteResponse,
334386
TimeoutOnWriteResponse,
387+
FailOnMaxStreams,
335388
}
336389

337390
impl From<Action> for u8 {
@@ -343,6 +396,7 @@ impl From<Action> for u8 {
343396
Action::FailOnWriteRequest => 3,
344397
Action::FailOnWriteResponse => 4,
345398
Action::TimeoutOnWriteResponse => 5,
399+
Action::FailOnMaxStreams => 6,
346400
}
347401
}
348402
}
@@ -358,6 +412,7 @@ impl TryFrom<u8> for Action {
358412
3 => Ok(Action::FailOnWriteRequest),
359413
4 => Ok(Action::FailOnWriteResponse),
360414
5 => Ok(Action::TimeoutOnWriteResponse),
415+
6 => Ok(Action::FailOnMaxStreams),
361416
_ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")),
362417
}
363418
}
@@ -468,11 +523,10 @@ impl Codec for TestCodec {
468523
}
469524
}
470525

471-
fn new_swarm_with_timeout(
472-
timeout: Duration,
526+
fn new_swarm_with_config(
527+
cfg: request_response::Config,
473528
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
474529
let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full));
475-
let cfg = request_response::Config::default().with_request_timeout(timeout);
476530

477531
let swarm =
478532
Swarm::new_ephemeral(|_| request_response::Behaviour::<TestCodec>::new(protocols, cfg));
@@ -481,6 +535,13 @@ fn new_swarm_with_timeout(
481535
(peed_id, swarm)
482536
}
483537

538+
fn new_swarm_with_timeout(
539+
timeout: Duration,
540+
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
541+
let cfg = request_response::Config::default().with_request_timeout(timeout);
542+
new_swarm_with_config(cfg)
543+
}
544+
484545
fn new_swarm() -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
485546
new_swarm_with_timeout(Duration::from_millis(100))
486547
}

0 commit comments

Comments
 (0)