From 52214f391c0a18dc66d1ccff9c0c004c5da85002 Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Wed, 21 Jul 2021 02:06:09 +0200 Subject: [PATCH] fix(client): retry when pool checkout returns closed HTTP2 connection (#2585) When http2_only is true, we never try to open a new connection if there is one open already, which means that if the existing connection that gets checked out of the pool is closed, then the request won't happen. --- src/client/client.rs | 41 ++++++++++++++++++++++++++++++++++------- src/client/pool.rs | 23 ++++++++++++++++++----- src/error.rs | 2 +- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src/client/client.rs b/src/client/client.rs index a5d8dcfaf7..f94d4154b8 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -11,7 +11,9 @@ use http::{Method, Request, Response, Uri, Version}; use super::conn; use super::connect::{self, sealed::Connect, Alpn, Connected, Connection}; -use super::pool::{self, Key as PoolKey, Pool, Poolable, Pooled, Reservation}; +use super::pool::{ + self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation, +}; #[cfg(feature = "tcp")] use super::HttpConnector; use crate::body::{Body, HttpBody}; @@ -223,7 +225,17 @@ where mut req: Request, pool_key: PoolKey, ) -> Result, ClientError> { - let mut pooled = self.connection_for(pool_key).await?; + let mut pooled = match self.connection_for(pool_key).await { + Ok(pooled) => pooled, + Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)), + Err(ClientConnectError::H2CheckoutIsClosed(reason)) => { + return Err(ClientError::Canceled { + connection_reused: true, + req, + reason, + }) + } + }; if pooled.is_http1() { if req.version() == Version::HTTP_2 { @@ -321,7 +333,7 @@ where async fn connection_for( &self, pool_key: PoolKey, - ) -> Result>, ClientError> { + ) -> Result>, ClientConnectError> { // This actually races 2 different futures to try to get a ready // connection the fastest, and to reduce connection churn. // @@ -337,6 +349,7 @@ where // and then be inserted into the pool as an idle connection. let checkout = self.pool.checkout(pool_key.clone()); let connect = self.connect_to(pool_key); + let is_ver_h2 = self.config.ver == Ver::Http2; // The order of the `select` is depended on below... @@ -380,16 +393,25 @@ where // In both cases, we should just wait for the other future. Either::Left((Err(err), connecting)) => { if err.is_canceled() { - connecting.await.map_err(ClientError::Normal) + connecting.await.map_err(ClientConnectError::Normal) } else { - Err(ClientError::Normal(err)) + Err(ClientConnectError::Normal(err)) } } Either::Right((Err(err), checkout)) => { if err.is_canceled() { - checkout.await.map_err(ClientError::Normal) + checkout.await.map_err(move |err| { + if is_ver_h2 + && err.is_canceled() + && err.find_source::().is_some() + { + ClientConnectError::H2CheckoutIsClosed(err) + } else { + ClientConnectError::Normal(err) + } + }) } else { - Err(ClientError::Normal(err)) + Err(ClientConnectError::Normal(err)) } } } @@ -722,6 +744,11 @@ impl ClientError { } } +enum ClientConnectError { + Normal(crate::Error), + H2CheckoutIsClosed(crate::Error), +} + /// A marker to identify what version a pooled connection is. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(super) enum Ver { diff --git a/src/client/pool.rs b/src/client/pool.rs index 94f73f6afd..9beca9f472 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; +use std::error::Error as StdError; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, Weak}; @@ -560,28 +561,40 @@ pub(super) struct Checkout { waiter: Option>, } +#[derive(Debug)] +pub(super) struct CheckoutIsClosedError; + +impl StdError for CheckoutIsClosedError {} + +impl fmt::Display for CheckoutIsClosedError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("checked out connection was closed") + } +} + impl Checkout { fn poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll>>> { - static CANCELED: &str = "pool checkout failed"; if let Some(mut rx) = self.waiter.take() { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Ok(value)) => { if value.is_open() { Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) } else { - Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED)))) + Poll::Ready(Some(Err( + crate::Error::new_canceled().with(CheckoutIsClosedError) + ))) } } Poll::Pending => { self.waiter = Some(rx); Poll::Pending } - Poll::Ready(Err(_canceled)) => { - Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED)))) - } + Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( + crate::Error::new_canceled().with("request has been canceled") + ))), } } else { Poll::Ready(None) diff --git a/src/error.rs b/src/error.rs index 3eb6243701..cc601ef9d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -214,7 +214,7 @@ impl Error { &self.inner.kind } - fn find_source(&self) -> Option<&E> { + pub(crate) fn find_source(&self) -> Option<&E> { let mut cause = self.source(); while let Some(err) = cause { if let Some(ref typed) = err.downcast_ref() {