diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 4d2a29ac92c..e9d92b08857 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.26.3 +- Report failure when streams are at capacity. + See [PR 5417](/~https://github.com/libp2p/rust-libp2p/pull/5417). + - Report dial IO errors to the user. See [PR 5429](/~https://github.com/libp2p/rust-libp2p/pull/5429). diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 96b6217b12a..f0467593f85 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -159,6 +159,9 @@ where } }; + // Inbound connections are reported to the upper layer from within the above task, + // so by failing to schedule it, it means the upper layer will never know about the + // inbound request. Because of that we do not report any inbound failure. if self .worker_streams .try_push(RequestId::Inbound(request_id), recv.boxed()) @@ -204,7 +207,10 @@ where .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { - tracing::warn!("Dropping outbound stream because we are at capacity") + self.pending_events.push_back(Event::OutboundStreamFailed { + request_id: message.request_id, + error: io::Error::new(io::ErrorKind::Other, "max sub-streams reached"), + }); } } diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 2dc82b2e0c5..e78bba926e2 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -161,6 +161,58 @@ async fn report_outbound_timeout_on_read_response() { futures::future::select(server_task, client_task).await; } +#[async_std::test] +async fn report_outbound_failure_on_max_streams() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + // `swarm2` will be able to handle only 1 stream per time. + let swarm2_config = request_response::Config::default() + .with_request_timeout(Duration::from_millis(100)) + .with_max_concurrent_streams(1); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config); + + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + let swarm1_task = async move { + let _req_id = swarm1 + .behaviour_mut() + .send_request(&peer2_id, Action::FailOnMaxStreams); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::Io failure. + let swarm2_task = async move { + let (peer, _inbound_req_id, action, _resp_channel) = + wait_request(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(action, Action::FailOnMaxStreams); + + // A task for sending back a response is already scheduled so max concurrent + // streams is reached and no new tasks can be sheduled. + // + // We produce the failure by creating new request before we response. + let outbound_req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnMaxStreams); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, outbound_req_id); + assert!(matches!(error, OutboundFailure::Io(_))); + }; + + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; +} + #[async_std::test] async fn report_inbound_failure_on_read_request() { let _ = tracing_subscriber::fmt() @@ -332,6 +384,7 @@ enum Action { FailOnWriteRequest, FailOnWriteResponse, TimeoutOnWriteResponse, + FailOnMaxStreams, } impl From for u8 { @@ -343,6 +396,7 @@ impl From for u8 { Action::FailOnWriteRequest => 3, Action::FailOnWriteResponse => 4, Action::TimeoutOnWriteResponse => 5, + Action::FailOnMaxStreams => 6, } } } @@ -358,6 +412,7 @@ impl TryFrom for Action { 3 => Ok(Action::FailOnWriteRequest), 4 => Ok(Action::FailOnWriteResponse), 5 => Ok(Action::TimeoutOnWriteResponse), + 6 => Ok(Action::FailOnMaxStreams), _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), } } @@ -468,11 +523,10 @@ impl Codec for TestCodec { } } -fn new_swarm_with_timeout( - timeout: Duration, +fn new_swarm_with_config( + cfg: request_response::Config, ) -> (PeerId, Swarm>) { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default().with_request_timeout(timeout); let swarm = Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); @@ -481,6 +535,13 @@ fn new_swarm_with_timeout( (peed_id, swarm) } +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let cfg = request_response::Config::default().with_request_timeout(timeout); + new_swarm_with_config(cfg) +} + fn new_swarm() -> (PeerId, Swarm>) { new_swarm_with_timeout(Duration::from_millis(100)) }