Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(request-response): Report failure when streams are at capacity #5417

Merged
merged 10 commits into from
Jun 4, 2024
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.26.3

- Report failure when streams are at capacity.
See [PR 5417](/~https://github.com/libp2p/rust-libp2p/pull/5417).

- Report dial IO errors to the user.
See [PR 5429](/~https://github.com/libp2p/rust-libp2p/pull/5429).

Expand Down
8 changes: 7 additions & 1 deletion protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ where
}
};

// Inbound connections are reported to the upper layer from within the above task,
// so by failing to schedule it, it means the upper layer will never know about the
// inbound request. Because of that we do not report any inbound failure.
if self
.worker_streams
.try_push(RequestId::Inbound(request_id), recv.boxed())
Expand Down Expand Up @@ -204,7 +207,10 @@ where
.try_push(RequestId::Outbound(request_id), send.boxed())
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
self.pending_events.push_back(Event::OutboundStreamFailed {
request_id: message.request_id,
error: io::Error::new(io::ErrorKind::Other, "max sub-streams reached"),
});
}
}

Expand Down
67 changes: 64 additions & 3 deletions protocols/request-response/tests/error_reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,58 @@ async fn report_outbound_timeout_on_read_response() {
futures::future::select(server_task, client_task).await;
}

#[async_std::test]
async fn report_outbound_failure_on_max_streams() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

// `swarm2` will be able to handle only 1 stream per time.
let swarm2_config = request_response::Config::default()
.with_request_timeout(Duration::from_millis(100))
.with_max_concurrent_streams(1);

let (peer1_id, mut swarm1) = new_swarm();
let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config);

swarm1.listen().with_memory_addr_external().await;
swarm2.connect(&mut swarm1).await;

let swarm1_task = async move {
let _req_id = swarm1
.behaviour_mut()
.send_request(&peer2_id, Action::FailOnMaxStreams);

// Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead.
wait_no_events(&mut swarm1).await;
};

// Expects OutboundFailure::Io failure.
let swarm2_task = async move {
let (peer, _inbound_req_id, action, _resp_channel) =
wait_request(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(action, Action::FailOnMaxStreams);

// A task for sending back a response is already scheduled so max concurrent
// streams is reached and no new tasks can be sheduled.
//
// We produce the failure by creating new request before we response.
let outbound_req_id = swarm2
.behaviour_mut()
.send_request(&peer1_id, Action::FailOnMaxStreams);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(req_id_done, outbound_req_id);
assert!(matches!(error, OutboundFailure::Io(_)));
};

let swarm1_task = pin!(swarm1_task);
let swarm2_task = pin!(swarm2_task);
futures::future::select(swarm1_task, swarm2_task).await;
}

#[async_std::test]
async fn report_inbound_failure_on_read_request() {
let _ = tracing_subscriber::fmt()
Expand Down Expand Up @@ -332,6 +384,7 @@ enum Action {
FailOnWriteRequest,
FailOnWriteResponse,
TimeoutOnWriteResponse,
FailOnMaxStreams,
}

impl From<Action> for u8 {
Expand All @@ -343,6 +396,7 @@ impl From<Action> for u8 {
Action::FailOnWriteRequest => 3,
Action::FailOnWriteResponse => 4,
Action::TimeoutOnWriteResponse => 5,
Action::FailOnMaxStreams => 6,
}
}
}
Expand All @@ -358,6 +412,7 @@ impl TryFrom<u8> for Action {
3 => Ok(Action::FailOnWriteRequest),
4 => Ok(Action::FailOnWriteResponse),
5 => Ok(Action::TimeoutOnWriteResponse),
6 => Ok(Action::FailOnMaxStreams),
_ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")),
}
}
Expand Down Expand Up @@ -468,11 +523,10 @@ impl Codec for TestCodec {
}
}

fn new_swarm_with_timeout(
timeout: Duration,
fn new_swarm_with_config(
cfg: request_response::Config,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full));
let cfg = request_response::Config::default().with_request_timeout(timeout);

let swarm =
Swarm::new_ephemeral(|_| request_response::Behaviour::<TestCodec>::new(protocols, cfg));
Expand All @@ -481,6 +535,13 @@ fn new_swarm_with_timeout(
(peed_id, swarm)
}

fn new_swarm_with_timeout(
timeout: Duration,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let cfg = request_response::Config::default().with_request_timeout(timeout);
new_swarm_with_config(cfg)
}

fn new_swarm() -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
new_swarm_with_timeout(Duration::from_millis(100))
}
Expand Down
Loading