diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 34d0e2e629..89c3caf4e2 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -316,11 +316,10 @@ impl Opts { let mut client = rt.block_on(async { if self.http2 { let io = tokio::net::TcpStream::connect(&addr).await.unwrap(); - let (tx, conn) = hyper::client::conn::http2::Builder::new() + let (tx, conn) = hyper::client::conn::http2::Builder::new(support::TokioExecutor) .initial_stream_window_size(self.http2_stream_window) .initial_connection_window_size(self.http2_conn_window) .adaptive_window(self.http2_adaptive_window) - .executor(support::TokioExecutor) .handshake(io) .await .unwrap(); diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 5b9e05470c..f54b7a90ca 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -57,16 +57,18 @@ pub struct Builder { /// /// This is a shortcut for `Builder::new().handshake(io)`. /// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake( +pub async fn handshake( + exec: E, io: T, ) -> crate::Result<(SendRequest, Connection)> where + E: Executor + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, { - Builder::new().handshake(io).await + Builder::new(exec).handshake(io).await } // ===== impl SendRequest @@ -244,23 +246,17 @@ where impl Builder { /// Creates a new connection builder. #[inline] - pub fn new() -> Builder { + pub fn new(exec: E) -> Builder + where + E: Executor + Send + Sync + 'static, + { Builder { - exec: Exec::Default, + exec: Exec::new(exec), timer: Time::Empty, h2_builder: Default::default(), } } - /// Provide an executor to execute background HTTP2 tasks. - pub fn executor(&mut self, exec: E) -> &mut Builder - where - E: Executor + Send + Sync + 'static, - { - self.exec = Exec::Executor(Arc::new(exec)); - self - } - /// Provide a timer to execute background HTTP2 tasks. pub fn timer(&mut self, timer: M) -> &mut Builder where diff --git a/src/common/exec.rs b/src/common/exec.rs index b7e3e9d7f7..ebe87e8279 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -16,30 +16,25 @@ pub trait ConnStreamExec: Clone { pub(crate) type BoxSendFuture = Pin + Send>>; -// Either the user provides an executor for background tasks, or we panic. -// TODO: with the `runtime`feature, `Exec::Default` used `tokio::spawn`. With the -// removal of the opt-in default runtime, this should be refactored. +// Executor must be provided by the user #[derive(Clone)] -pub(crate) enum Exec { - Default, - Executor(Arc + Send + Sync>), -} +pub(crate) struct Exec(Arc + Send + Sync>); // ===== impl Exec ===== impl Exec { + pub(crate) fn new(exec: E) -> Self + where + E: Executor + Send + Sync + 'static, + { + Self(Arc::new(exec)) + } + pub(crate) fn execute(&self, fut: F) where F: Future + Send + 'static, { - match *self { - Exec::Default => { - panic!("executor must be set"); - } - Exec::Executor(ref e) => { - e.execute(Box::pin(fut)); - } - } + self.0.execute(Box::pin(fut)) } } diff --git a/src/ffi/client.rs b/src/ffi/client.rs index d5cbd06519..a300076d8f 100644 --- a/src/ffi/client.rs +++ b/src/ffi/client.rs @@ -55,8 +55,7 @@ ffi_fn! { #[cfg(feature = "http2")] { if options.http2 { - return conn::http2::Builder::new() - .executor(options.exec.clone()) + return conn::http2::Builder::new(options.exec.clone()) .handshake::<_, crate::body::Incoming>(io) .await .map(|(tx, conn)| { diff --git a/tests/client.rs b/tests/client.rs index aa88917215..739f223b16 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1922,8 +1922,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake(io) .await .expect("http handshake"); @@ -1979,8 +1978,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (_client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (_client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) .keep_alive_timeout(Duration::from_secs(1)) @@ -2008,8 +2006,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) .keep_alive_timeout(Duration::from_secs(1)) @@ -2040,8 +2037,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) .keep_alive_timeout(Duration::from_secs(1)) @@ -2100,8 +2096,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) .keep_alive_timeout(Duration::from_secs(1)) @@ -2156,8 +2151,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake(io) .await .expect("http handshake"); @@ -2207,8 +2201,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake::<_, Empty>(io) .await .expect("http handshake"); diff --git a/tests/server.rs b/tests/server.rs index 5fe08848f7..68c53ce5c0 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2389,8 +2389,7 @@ async fn http2_keep_alive_with_responsive_client() { }); let tcp = connect_async(addr).await; - let (mut client, conn) = hyper::client::conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut client, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor) .handshake(tcp) .await .expect("http handshake"); @@ -3017,8 +3016,7 @@ impl TestClient { .unwrap(); if self.http2_only { - let (mut sender, conn) = hyper::client::conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor) .handshake(stream) .await .unwrap(); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index a8eb37dc35..e7e1e8c6bd 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -427,8 +427,7 @@ async fn async_test(cfg: __TestConfig) { let stream = TcpStream::connect(addr).await.unwrap(); let res = if http2_only { - let (mut sender, conn) = hyper::client::conn::http2::Builder::new() - .executor(TokioExecutor) + let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor) .handshake(stream) .await .unwrap(); @@ -526,11 +525,11 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) .unwrap(); let resp = if http2_only { - let (mut sender, conn) = hyper::client::conn::http2::Builder::new() - .executor(TokioExecutor) - .handshake(stream) - .await - .unwrap(); + let (mut sender, conn) = + hyper::client::conn::http2::Builder::new(TokioExecutor) + .handshake(stream) + .await + .unwrap(); tokio::task::spawn(async move { if let Err(err) = conn.await {