From 770c1cac3695ab67570c823e6e709f9ea57e2606 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 5 Jun 2024 10:29:27 +0200 Subject: [PATCH] Make Future implementation on Connection unconditional on executor being Send + Sync. --- src/client/conn/http2.rs | 170 ++++++++++++++++++++++++++++++++++++++- src/proto/h2/client.rs | 2 +- 2 files changed, 170 insertions(+), 2 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 6d967713d8..e034dabfef 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -230,7 +230,7 @@ where B::Data: Send, E: Unpin, B::Error: Into>, - E: Http2ClientConnExec + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec + Unpin { type Output = crate::Result<()>; @@ -457,3 +457,171 @@ where } } } + +#[cfg(test)] +mod tests { + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_non_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor; + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(LocalTokioExecutor, io).await.unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData> + } + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(LocalTokioExecutor { _x: Default::default() }, io).await.unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData> + } + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(LocalTokioExecutor { _x: Default::default() }, io).await.unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor; + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(TokioExecutor, io).await.unwrap(); + + tokio::task::spawn(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { // !Send, !Sync + _x: std::marker::PhantomData> + } + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(TokioExecutor { _x: Default::default() }, io).await.unwrap(); + + tokio::task::spawn_local(async move { // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { // !Sync + _x: std::marker::PhantomData> + } + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>(TokioExecutor { _x: Default::default() }, io).await.unwrap(); + + tokio::task::spawn_local(async move { // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } +} \ No newline at end of file diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 7cb6c6ed5b..c735b614cd 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -611,7 +611,7 @@ where B: Body + 'static + Unpin, B::Data: Send, B::Error: Into>, - E: Http2ClientConnExec + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec + Unpin, T: Read + Write + Unpin, { type Output = crate::Result;