From fd64a1e7768ba6e8676cbbf25c4e821a901c0a7f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 22 Jan 2025 18:51:59 +0200 Subject: [PATCH] net/libp2p: Enforce outbound request-response timeout limits (#7222) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR enforces that outbound requests are finished within the specified protocol timeout. The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly: - /~https://github.com/libp2p/rust-libp2p/pull/5429 The issue has been detected while submitting libp2p -> litep2p requests in kusama. This aims to check that pending outbound requests have not timedout. Although the issue has been fixed in libp2p, there might be other cases where this may happen. For example: - /~https://github.com/libp2p/rust-libp2p/pull/5417 For more context see: /~https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096 1. Ideally, the force-timeout mechanism in this PR should never be triggered in production. However, origin/stable2412 occasionally encounters this issue. When this happens, 2 warnings may be generated: - one warning introduced by this PR wrt force timeout terminating the request - possible one warning when the libp2p decides (if at all) to provide the response back to substrate (as mentioned by @alexggh [here](/~https://github.com/paritytech/polkadot-sdk/pull/7222/files#diff-052aeaf79fef3d9a18c2cfd67006aa306b8d52e848509d9077a6a0f2eb856af7L769) and [here](/~https://github.com/paritytech/polkadot-sdk/pull/7222/files#diff-052aeaf79fef3d9a18c2cfd67006aa306b8d52e848509d9077a6a0f2eb856af7L842) 2. This implementation does not propagate to the substrate service the `RequestFinished { error: .. }`. That event is only used internally by substrate to increment metrics. However, we don't have the peer information available to propagate the event properly when we force-timeout the request. Considering this should most likely not happen in production (origin/master) and that we'll be able to extract information by warnings, I would say this is a good tradeoff for code simplicity: /~https://github.com/paritytech/polkadot-sdk/blob/06e3b5c6a7696048d65f1b8729f16b379a16f501/substrate/client/network/src/service.rs#L1543 ### Testing Added a new test to ensure the timeout is reached properly, even if libp2p does not produce a response in due time. I've also transitioned the tests to using `tokio::test` due to a limitation of [CI](/~https://github.com/paritytech/polkadot-sdk/actions/runs/12832055737/job/35784043867) ``` --- TRY 1 STDERR: sc-network request_responses::tests::max_response_size_exceeded --- thread 'request_responses::tests::max_response_size_exceeded' panicked at /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/time/interval.rs:139:26: there is no reactor running, must be called from the context of a Tokio 1.x runtime ``` cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile Co-authored-by: Bastian Köcher --- prdoc/pr_7222.prdoc | 19 + .../client/network/src/request_responses.rs | 1020 ++++++++++------- 2 files changed, 612 insertions(+), 427 deletions(-) create mode 100644 prdoc/pr_7222.prdoc diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc new file mode 100644 index 000000000000..40b89b0a1820 --- /dev/null +++ b/prdoc/pr_7222.prdoc @@ -0,0 +1,19 @@ +title: Enforce libp2p outbound request-response timeout limits + +doc: + - audience: Node Dev + description: | + This PR enforces that outbound requests are finished within the specified protocol timeout. + The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly + /~https://github.com/libp2p/rust-libp2p/pull/5429. + + The issue has been detected while submitting libp2p to litep2p requests in Kusama. + This aims to check that pending outbound requests have not timed out. + Although the issue has been fixed in libp2p, there might be other cases where this may happen. + For example, /~https://github.com/libp2p/rust-libp2p/pull/5417. + + For more context see /~https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. + +crates: +- name: sc-network + bump: patch diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5fe34c781378..e21773632ed7 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -64,6 +64,9 @@ use std::{ pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId}; +/// Periodically check if requests are taking too long. +const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2); + /// Possible failures occurring in the context of sending an outbound request and receiving the /// response. #[derive(Debug, Clone, thiserror::Error)] @@ -251,8 +254,14 @@ pub struct OutgoingResponse { /// Information stored about a pending request. struct PendingRequest { + /// The time when the request was sent to the libp2p request-response protocol. started_at: Instant, - response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + /// The channel to send the response back to the caller. + /// + /// This is wrapped in an `Option` to allow for the channel to be taken out + /// on force-detected timeouts. + response_tx: Option, ProtocolName), RequestFailure>>>, + /// Fallback request to send if the primary request fails. fallback_request: Option<(Vec, ProtocolName)>, } @@ -336,16 +345,20 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId } } +/// Details of a request-response protocol. +struct ProtocolDetails { + behaviour: Behaviour, + inbound_queue: Option>, + request_timeout: Duration, +} + /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. /// /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: HashMap< - ProtocolName, - (Behaviour, Option>), - >, + protocols: HashMap, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: HashMap, PendingRequest>, @@ -365,6 +378,14 @@ pub struct RequestResponsesBehaviour { /// Primarily used to get a reputation of a node. peer_store: Arc, + + /// Interval to check that the requests are not taking too long. + /// + /// We had issues in the past where libp2p did not produce a timeout event in due time. + /// + /// For more details, see: + /// - + periodic_request_check: tokio::time::Interval, } /// Generated by the response builder and waiting to be processed. @@ -393,7 +414,7 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = Behaviour::with_codec( + let behaviour = Behaviour::with_codec( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, @@ -405,7 +426,11 @@ impl RequestResponsesBehaviour { ); match protocols.entry(protocol.name) { - Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), + Entry::Vacant(e) => e.insert(ProtocolDetails { + behaviour, + inbound_queue: protocol.inbound_queue, + request_timeout: protocol.request_timeout, + }), Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())), }; } @@ -417,6 +442,7 @@ impl RequestResponsesBehaviour { pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), peer_store, + periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK), }) } @@ -437,9 +463,11 @@ impl RequestResponsesBehaviour { ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(protocol_name.deref()) + { Self::send_request_inner( - protocol, + behaviour, &mut self.pending_requests, target, protocol_name, @@ -474,7 +502,7 @@ impl RequestResponsesBehaviour { (protocol_name.to_string().into(), request_id).into(), PendingRequest { started_at: Instant::now(), - response_tx: pending_response, + response_tx: Some(pending_response), fallback_request, }, ); @@ -521,18 +549,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -548,19 +577,20 @@ impl NetworkBehaviour for RequestResponsesBehaviour { role_override: Endpoint, port_use: PortUse, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_outbound_connection( - connection_id, - peer, - addr, - role_override, - port_use, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -569,8 +599,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - for (protocol, _) in self.protocols.values_mut() { - protocol.on_swarm_event(event); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + behaviour.on_swarm_event(event); } } @@ -581,8 +611,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { event: THandlerOutEvent, ) { let p_name = event.0; - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - return proto.on_connection_handler_event(peer_id, connection_id, event.1) + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) { + return behaviour.on_connection_handler_event(peer_id, connection_id, event.1) } else { log::warn!( target: "sub-libp2p", @@ -594,6 +624,51 @@ impl NetworkBehaviour for RequestResponsesBehaviour { fn poll(&mut self, cx: &mut Context) -> Poll>> { 'poll_all: loop { + // Poll the periodic request check. + if self.periodic_request_check.poll_tick(cx).is_ready() { + self.pending_requests.retain(|id, req| { + let Some(ProtocolDetails { request_timeout, .. }) = + self.protocols.get(&id.protocol) + else { + log::warn!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.", + ); + } + } + return false + }; + + let elapsed = req.started_at.elapsed(); + if elapsed > *request_timeout { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.", + ); + } + } + + false + } else { + true + } + }); + } + // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { @@ -610,10 +685,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }; if let Ok(payload) = result { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(&*protocol_name) + { log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len()); - if protocol.send_response(inner_channel, Ok(payload)).is_err() { + if behaviour.send_response(inner_channel, Ok(payload)).is_err() { // Note: Failure is handled further below when receiving // `InboundFailure` event from request-response [`Behaviour`]. log::debug!( @@ -641,7 +718,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let mut fallback_requests = vec![]; // Poll request-responses protocols. - for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { + for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols + { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { let ev = match ev { // Main events we are interested in. @@ -696,7 +774,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Submit the request to the "response builder" passed by the user at // initialization. - if let Some(resp_builder) = resp_builder { + if let Some(resp_builder) = inbound_queue { // If the response builder is too busy, silently drop `tx`. This // will be reported by the corresponding request-response // [`Behaviour`] through an `InboundFailure::Omission` event. @@ -744,7 +822,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some(PendingRequest { started_at, response_tx, .. }) => { + Some(PendingRequest { + started_at, + response_tx: Some(response_tx), + .. + }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", @@ -760,13 +842,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .map_err(|_| RequestFailure::Obsolete); (started_at, delivered) }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, + peer, ); - debug_assert!(false); continue }, }; @@ -795,7 +877,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { { Some(PendingRequest { started_at, - response_tx, + response_tx: Some(response_tx), fallback_request, }) => { // Try using the fallback request if the protocol was not @@ -833,13 +915,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } started_at }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", request_id, + error, + peer ); - debug_assert!(false); continue }, }; @@ -904,7 +987,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Send out fallback requests. for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { - if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) { Self::send_request_inner( behaviour, &mut self.pending_requests, @@ -1073,7 +1156,7 @@ mod tests { use crate::mock::MockPeerStore; use assert_matches::assert_matches; - use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; + use futures::channel::oneshot; use libp2p::{ core::{ transport::{MemoryTransport, Transport}, @@ -1086,10 +1169,10 @@ mod tests { }; use std::{iter, time::Duration}; - struct TokioExecutor(tokio::runtime::Runtime); + struct TokioExecutor; impl Executor for TokioExecutor { fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); + tokio::spawn(f); } } @@ -1106,13 +1189,11 @@ mod tests { let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap(); - let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::new( transport, behaviour, keypair.public().to_peer_id(), - SwarmConfig::with_executor(TokioExecutor(runtime)) + SwarmConfig::with_executor(TokioExecutor {}) // This is taken care of by notification protocols in non-test environment // It is very slow in test environment for some reason, hence larger timeout .with_idle_connection_timeout(Duration::from_secs(10)), @@ -1125,34 +1206,27 @@ mod tests { (swarm, listen_addr) } - #[test] - fn basic_request_response_works() { + #[tokio::test] + async fn basic_request_response_works() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1176,84 +1250,69 @@ mod tests { let (mut swarm, _) = swarms.remove(0); // Running `swarm[0]` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { result.unwrap(); - break }, _ => {}, } } - - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name) - ); }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + let mut response_receiver = None; + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); } - #[test] - fn max_response_size_exceeded() { + #[tokio::test] + async fn max_response_size_exceeded() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this response exceeds the limit".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1278,59 +1337,52 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. let (mut swarm, _) = swarms.remove(0); - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - assert!(result.is_ok()); - }, - SwarmEvent::ConnectionClosed { .. } => { - break; - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert!(result.is_err()); - break + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + + let mut response_receiver = None; - match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::Io(_)) => {}, - request_failure => panic!("Unexpected failure: {request_failure:?}"), + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, } - }); + } + + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Io(_)) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } /// A `RequestId` is a unique identifier among either all inbound or all outbound requests for @@ -1343,11 +1395,10 @@ mod tests { /// without a `RequestId` collision. /// /// See [`ProtocolRequestId`] for additional information. - #[test] - fn request_id_collision() { + #[tokio::test] + async fn request_id_collision() { let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); - let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ @@ -1405,114 +1456,100 @@ mod tests { swarm_1.dial(listen_add_2).unwrap(); // Run swarm 2 in the background, receiving two requests. - pool.spawner() - .spawn_obj( - async move { - loop { - match swarm_2.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } + tokio::spawn(async move { + loop { + match swarm_2.select_next_some().await { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + result.unwrap(); + }, + _ => {}, } - .boxed() - .into(), - ) - .unwrap(); + } + }); // Handle both requests sent by swarm 1 to swarm 2 in the background. // // Make sure both requests overlap, by answering the first only after receiving the // second. - pool.spawner() - .spawn_obj( - async move { - let protocol_1_request = swarm_2_handler_1.next().await; - let protocol_2_request = swarm_2_handler_2.next().await; - - protocol_1_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - protocol_2_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + let protocol_1_request = swarm_2_handler_1.next().await; + let protocol_2_request = swarm_2_handler_2.next().await; + + protocol_1_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + protocol_2_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + }); // Have swarm 1 send two requests to swarm 2 and await responses. - pool.run_until(async move { - let mut response_receivers = None; - let mut num_responses = 0; - loop { - match swarm_1.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender_1, receiver_1) = oneshot::channel(); - let (sender_2, receiver_2) = oneshot::channel(); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"this is a request".to_vec(), - None, - sender_1, - IfDisconnected::ImmediateError, - ); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_2.clone(), - b"this is a request".to_vec(), - None, - sender_2, - IfDisconnected::ImmediateError, - ); - assert!(response_receivers.is_none()); - response_receivers = Some((receiver_1, receiver_2)); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - num_responses += 1; - result.unwrap(); - if num_responses == 2 { - break - } - }, - _ => {}, - } + let mut response_receivers = None; + let mut num_responses = 0; + + loop { + match swarm_1.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender_1, receiver_1) = oneshot::channel(); + let (sender_2, receiver_2) = oneshot::channel(); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"this is a request".to_vec(), + None, + sender_1, + IfDisconnected::ImmediateError, + ); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_2.clone(), + b"this is a request".to_vec(), + None, + sender_2, + IfDisconnected::ImmediateError, + ); + assert!(response_receivers.is_none()); + response_receivers = Some((receiver_1, receiver_2)); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + num_responses += 1; + result.unwrap(); + if num_responses == 2 { + break + } + }, + _ => {}, } - let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!( - response_receiver_1.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_1) - ); - assert_eq!( - response_receiver_2.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_2) - ); - }); + } + let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); } - #[test] - fn request_fallback() { + #[tokio::test] + async fn request_fallback() { let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); let protocol_name_2 = ProtocolName::from("/test/another"); - let mut pool = LocalPool::new(); let protocol_config_1 = ProtocolConfig { name: protocol_name_1.clone(), @@ -1550,39 +1587,31 @@ mod tests { let mut protocol_config_2 = protocol_config_2.clone(); protocol_config_2.inbound_queue = Some(tx_2); - pool.spawner() - .spawn_obj( - async move { - for _ in 0..2 { - if let Some(rq) = rx_1.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok( - b"this is a response on protocol /test/req-resp/1".to_vec() - ), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - - if let Some(rq) = rx_2.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/other"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response on protocol /test/other".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } + tokio::spawn(async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); } - .boxed() - .into(), - ) - .unwrap(); + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) }; @@ -1603,132 +1632,269 @@ mod tests { } // Running `older_swarm`` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - _ = older_swarm.0.select_next_some().await; - } - } - .boxed() - .into() - }) - .unwrap(); + tokio::spawn(async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + }); // Run the newer swarm. Attempt to make requests on all protocols. let (mut swarm, _) = new_swarm; let mut older_peer_id = None; - pool.run_until(async move { - let mut response_receiver = None; - // Try the new protocol with a fallback. - loop { - match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - older_peer_id = Some(peer_id); - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"request on protocol /test/req-resp/2".to_vec(), - Some(( - b"request on protocol /test/req-resp/1".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the old protocol with a useless fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1_fallback.clone(), - b"request on protocol /test/req-resp/1".to_vec(), - Some(( - b"dummy request, will fail if processed".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the new protocol with no fallback. Should fail. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1.clone(), - b"request on protocol /test/req-resp-2".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert_matches!( - result.unwrap_err(), - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) - ); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, } - assert!(response_receiver.await.unwrap().is_err()); - // Try the other protocol with no fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_2.clone(), - b"request on protocol /test/other".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); + } + + /// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error + /// even if the libp2p component hangs. + /// + /// For testing purposes, the communication happens on the `/test/req-resp/1` protocol. + /// + /// This is achieved by: + /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10 + /// seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in + /// substrate this is set to 1 second. + /// + /// - The first swarm introduces a delay of 2 seconds before responding to the request. + /// + /// - The second swarm must enforce the 1 second timeout. + #[tokio::test] + async fn enforce_outbound_timeouts() { + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1); + + // These swarms only speaks protocol_name. + let protocol_name = ProtocolName::from("/test/req-resp/1"); + + let protocol_config = ProtocolConfig { + name: protocol_name.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: REQUEST_TIMEOUT, // <-- important for the test + inbound_queue: None, + }; + + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. + let (mut first_swarm, _) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + if let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + + // Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than + // `REQUEST_TIMEOUT`. + tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await; + + // By the time the response is sent back, the second swarm + // received Timeout. + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"Second swarm already timedout".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config)) + }; + + let (mut second_swarm, second_address) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"This is the response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config.clone())) + }; + // Modify the second swarm to have a shorter timeout. + second_swarm + .behaviour_mut() + .protocols + .get_mut(&protocol_name) + .unwrap() + .request_timeout = REQUEST_TIMEOUT_SHORT; + + // Ask first swarm to dial the second swarm. + { + Swarm::dial(&mut first_swarm, second_address).unwrap(); + } + + // Running the first swarm in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + tokio::spawn(async move { loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break + let event = first_swarm.select_next_some().await; + match event { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + break; + }, + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) - ); }); + + // Run the second swarm. + // - on connection established send the request to the first swarm + // - expect to receive a timeout + let mut response_receiver = None; + loop { + let event = second_swarm.select_next_some().await; + + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + second_swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::ConnectionClosed { .. } => { + break; + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, + } + } + + // Expect the timeout. + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Timeout) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } }