From 9e8fc8fca195f470a82be5bfb2fd8019c044b97a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 24 Aug 2022 13:01:26 -0700 Subject: [PATCH] feat(body): remove "full" constructors from `Body` (#2958) The constructors of `hyper::Body` from a full bytes are removed. Along with `Body::empty()`. BREAKING CHANGE: Use the types from `http-body-util`. Co-authored-by: Xuanwo --- Cargo.toml | 5 -- benches/pipeline.rs | 9 ++- benches/server.rs | 7 +- examples/client.rs | 6 +- examples/client_json.rs | 5 +- examples/echo.rs | 26 +++++-- examples/hello.rs | 6 +- examples/http_proxy.rs | 23 ++++-- examples/multi_server.rs | 10 +-- examples/params.rs | 33 ++++++--- examples/send_file.rs | 12 ++-- examples/service_struct_impl.rs | 8 ++- examples/state.rs | 11 ++- examples/tower_server.rs | 60 ---------------- examples/upgrades.rs | 12 ++-- examples/web_api.rs | 32 +++++---- src/body/body.rs | 121 +++++--------------------------- src/body/mod.rs | 27 ------- src/body/to_bytes.rs | 5 +- src/client/conn/http1.rs | 25 ------- src/client/conn/http2.rs | 25 ------- src/client/conn/mod.rs | 40 +++-------- src/client/dispatch.rs | 2 +- src/ffi/body.rs | 2 +- src/ffi/http_types.rs | 2 +- src/proto/h1/conn.rs | 18 ----- src/proto/h1/dispatch.rs | 23 +++--- src/proto/h1/encode.rs | 33 --------- src/proto/h2/server.rs | 1 - src/server/conn.rs | 7 +- src/service/util.rs | 4 +- tests/client.rs | 45 ++++++------ tests/server.rs | 46 ++++++------ tests/support/mod.rs | 8 ++- 34 files changed, 236 insertions(+), 463 deletions(-) delete mode 100644 examples/tower_server.rs diff --git a/Cargo.toml b/Cargo.toml index e91f9080ce..7e6646461b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,11 +181,6 @@ name = "state" path = "examples/state.rs" required-features = ["full"] -[[example]] -name = "tower_server" -path = "examples/tower_server.rs" -required-features = ["full"] - [[example]] name = "upgrades" path = "examples/upgrades.rs" diff --git a/benches/pipeline.rs b/benches/pipeline.rs index 772e5480cc..29625edfa0 100644 --- a/benches/pipeline.rs +++ b/benches/pipeline.rs @@ -3,17 +3,20 @@ extern crate test; +use std::convert::Infallible; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::mpsc; use std::time::Duration; +use bytes::Bytes; +use http_body_util::Full; use tokio::net::TcpListener; use tokio::sync::oneshot; use hyper::server::conn::Http; use hyper::service::service_fn; -use hyper::{Body, Response}; +use hyper::Response; const PIPELINED_REQUESTS: usize = 16; @@ -43,7 +46,9 @@ fn hello_world_16(b: &mut test::Bencher) { .serve_connection( stream, service_fn(|_| async { - Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) + Ok::<_, Infallible>(Response::new(Full::new(Bytes::from( + "Hello, World!", + )))) }), ) .await diff --git a/benches/server.rs b/benches/server.rs index 9b87c85d5c..1b7f050fac 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -8,8 +8,9 @@ use std::net::{SocketAddr, TcpListener, TcpStream}; use std::sync::mpsc; use std::time::Duration; +use bytes::Bytes; use futures_util::{stream, StreamExt}; -use http_body_util::{BodyExt, StreamBody}; +use http_body_util::{BodyExt, Full, StreamBody}; use tokio::sync::oneshot; use hyper::server::conn::Http; @@ -87,8 +88,8 @@ macro_rules! bench_server { }}; } -fn body(b: &'static [u8]) -> hyper::Body { - b.into() +fn body(b: &'static [u8]) -> Full { + Full::new(b.into()) } #[bench] diff --git a/examples/client.rs b/examples/client.rs index 774b3cd47d..aa400ea616 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -2,7 +2,9 @@ #![warn(rust_2018_idioms)] use std::env; -use hyper::{body::HttpBody as _, Body, Request}; +use bytes::Bytes; +use http_body_util::Empty; +use hyper::{body::HttpBody as _, Request}; use tokio::io::{self, AsyncWriteExt as _}; use tokio::net::TcpStream; @@ -51,7 +53,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> { let req = Request::builder() .uri(url) .header(hyper::header::HOST, authority.as_str()) - .body(Body::empty())?; + .body(Empty::::new())?; let mut res = sender.send_request(req).await?; diff --git a/examples/client_json.rs b/examples/client_json.rs index 31337283ba..3c1a7ffd39 100644 --- a/examples/client_json.rs +++ b/examples/client_json.rs @@ -1,7 +1,8 @@ #![deny(warnings)] #![warn(rust_2018_idioms)] -use hyper::Body; +use bytes::Bytes; +use http_body_util::Empty; use hyper::{body::Buf, Request}; use serde::Deserialize; use tokio::net::TcpStream; @@ -42,7 +43,7 @@ async fn fetch_json(url: hyper::Uri) -> Result> { let req = Request::builder() .uri(url) .header(hyper::header::HOST, authority.as_str()) - .body(Body::empty())?; + .body(Empty::::new())?; let res = sender.send_request(req).await?; diff --git a/examples/echo.rs b/examples/echo.rs index e3a2170061..47f88a0fa0 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -2,6 +2,8 @@ use std::net::SocketAddr; +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::body::HttpBody as _; use hyper::server::conn::Http; use hyper::service::service_fn; @@ -10,15 +12,15 @@ use tokio::net::TcpListener; /// This is our service handler. It receives a Request, routes on its /// path, and returns a Future of a Response. -async fn echo(req: Request) -> Result, hyper::Error> { +async fn echo(req: Request) -> Result>, hyper::Error> { match (req.method(), req.uri().path()) { // Serve some instructions at / - (&Method::GET, "/") => Ok(Response::new(Body::from( + (&Method::GET, "/") => Ok(Response::new(full( "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`", ))), // Simply echo the body back to the client. - (&Method::POST, "/echo") => Ok(Response::new(req.into_body())), + (&Method::POST, "/echo") => Ok(Response::new(req.into_body().boxed())), // TODO: Fix this, broken in PR #2896 // Convert to uppercase before sending back to client using a stream. @@ -43,7 +45,7 @@ async fn echo(req: Request) -> Result, hyper::Error> { // 64kbs of data. let max = req.body().size_hint().upper().unwrap_or(u64::MAX); if max > 1024 * 64 { - let mut resp = Response::new(Body::from("Body too big")); + let mut resp = Response::new(full("Body too big")); *resp.status_mut() = hyper::StatusCode::PAYLOAD_TOO_LARGE; return Ok(resp); } @@ -51,18 +53,30 @@ async fn echo(req: Request) -> Result, hyper::Error> { let whole_body = hyper::body::to_bytes(req.into_body()).await?; let reversed_body = whole_body.iter().rev().cloned().collect::>(); - Ok(Response::new(Body::from(reversed_body))) + Ok(Response::new(full(reversed_body))) } // Return the 404 Not Found for other routes. _ => { - let mut not_found = Response::default(); + let mut not_found = Response::new(empty()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } } } +fn empty() -> BoxBody { + Empty::::new() + .map_err(|never| match never {}) + .boxed() +} + +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + #[tokio::main] async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); diff --git a/examples/hello.rs b/examples/hello.rs index 528b0c6527..6acf47d228 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -3,13 +3,15 @@ use std::convert::Infallible; use std::net::SocketAddr; +use bytes::Bytes; +use http_body_util::Full; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{Body, Request, Response}; use tokio::net::TcpListener; -async fn hello(_: Request) -> Result, Infallible> { - Ok(Response::new(Body::from("Hello World!"))) +async fn hello(_: Request) -> Result>, Infallible> { + Ok(Response::new(Full::new(Bytes::from("Hello World!")))) } #[tokio::main] diff --git a/examples/http_proxy.rs b/examples/http_proxy.rs index b072bbe34c..bddae68484 100644 --- a/examples/http_proxy.rs +++ b/examples/http_proxy.rs @@ -2,6 +2,8 @@ use std::net::SocketAddr; +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::client::conn::Builder; use hyper::server::conn::Http; use hyper::service::service_fn; @@ -41,7 +43,7 @@ async fn main() -> Result<(), Box> { } } -async fn proxy(req: Request) -> Result, hyper::Error> { +async fn proxy(req: Request) -> Result>, hyper::Error> { println!("req: {:?}", req); if Method::CONNECT == req.method() { @@ -70,10 +72,10 @@ async fn proxy(req: Request) -> Result, hyper::Error> { } }); - Ok(Response::new(Body::empty())) + Ok(Response::new(empty())) } else { eprintln!("CONNECT host is not socket addr: {:?}", req.uri()); - let mut resp = Response::new(Body::from("CONNECT must be to a socket address")); + let mut resp = Response::new(full("CONNECT must be to a socket address")); *resp.status_mut() = http::StatusCode::BAD_REQUEST; Ok(resp) @@ -96,7 +98,8 @@ async fn proxy(req: Request) -> Result, hyper::Error> { } }); - sender.send_request(req).await + let resp = sender.send_request(req).await?; + Ok(resp.map(|b| b.boxed())) } } @@ -104,6 +107,18 @@ fn host_addr(uri: &http::Uri) -> Option { uri.authority().and_then(|auth| Some(auth.to_string())) } +fn empty() -> BoxBody { + Empty::::new() + .map_err(|never| match never {}) + .boxed() +} + +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + // Create a TCP connection to host:port, build a tunnel between the connection and // the upgraded connection async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> { diff --git a/examples/multi_server.rs b/examples/multi_server.rs index d69f65fb9b..be084e04e7 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -3,7 +3,9 @@ use std::net::SocketAddr; +use bytes::Bytes; use futures_util::future::join; +use http_body_util::Full; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{Body, Request, Response}; @@ -12,12 +14,12 @@ use tokio::net::TcpListener; static INDEX1: &[u8] = b"The 1st service!"; static INDEX2: &[u8] = b"The 2nd service!"; -async fn index1(_: Request) -> Result, hyper::Error> { - Ok(Response::new(Body::from(INDEX1))) +async fn index1(_: Request) -> Result>, hyper::Error> { + Ok(Response::new(Full::new(Bytes::from(INDEX1)))) } -async fn index2(_: Request) -> Result, hyper::Error> { - Ok(Response::new(Body::from(INDEX2))) +async fn index2(_: Request) -> Result>, hyper::Error> { + Ok(Response::new(Full::new(Bytes::from(INDEX2)))) } #[tokio::main] diff --git a/examples/params.rs b/examples/params.rs index a0ca3e1b76..02a1b7a5ad 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -1,12 +1,15 @@ // #![deny(warnings)] // FIXME: /~https://github.com/rust-lang/rust/issues/62411 #![warn(rust_2018_idioms)] +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{Body, Method, Request, Response, StatusCode}; use tokio::net::TcpListener; use std::collections::HashMap; +use std::convert::Infallible; use std::net::SocketAddr; use url::form_urlencoded; @@ -15,9 +18,11 @@ static MISSING: &[u8] = b"Missing field"; static NOTNUMERIC: &[u8] = b"Number field is not numeric"; // Using service_fn, we can turn this function into a `Service`. -async fn param_example(req: Request) -> Result, hyper::Error> { +async fn param_example( + req: Request, +) -> Result>, hyper::Error> { match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())), + (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))), (&Method::POST, "/post") => { // Concatenate the body... let b = hyper::body::to_bytes(req).await?; @@ -43,7 +48,7 @@ async fn param_example(req: Request) -> Result, hyper::Erro } else { return Ok(Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) + .body(full(MISSING)) .unwrap()); }; let number = if let Some(n) = params.get("number") { @@ -52,13 +57,13 @@ async fn param_example(req: Request) -> Result, hyper::Erro } else { return Ok(Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(NOTNUMERIC.into()) + .body(full(NOTNUMERIC)) .unwrap()); } } else { return Ok(Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) + .body(full(MISSING)) .unwrap()); }; @@ -69,7 +74,7 @@ async fn param_example(req: Request) -> Result, hyper::Erro // responses such as InternalServiceError may be // needed here, too. let body = format!("Hello {}, your number is {}", name, number); - Ok(Response::new(body.into())) + Ok(Response::new(full(body))) } (&Method::GET, "/get") => { let query = if let Some(q) = req.uri().query() { @@ -77,7 +82,7 @@ async fn param_example(req: Request) -> Result, hyper::Erro } else { return Ok(Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) + .body(full(MISSING)) .unwrap()); }; let params = form_urlencoded::parse(query.as_bytes()) @@ -88,19 +93,27 @@ async fn param_example(req: Request) -> Result, hyper::Erro } else { return Ok(Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) + .body(full(MISSING)) .unwrap()); }; let body = format!("You requested {}", page); - Ok(Response::new(body.into())) + Ok(Response::new(full(body))) } _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::empty()) + .body(empty()) .unwrap()), } } +fn empty() -> BoxBody { + Empty::::new().boxed() +} + +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()).boxed() +} + #[tokio::main] async fn main() -> Result<(), Box> { pretty_env_logger::init(); diff --git a/examples/send_file.rs b/examples/send_file.rs index 5bc90d57f5..699456b3fe 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -5,6 +5,8 @@ use std::net::SocketAddr; use hyper::server::conn::Http; use tokio::net::TcpListener; +use bytes::Bytes; +use http_body_util::Full; use hyper::service::service_fn; use hyper::{Body, Method, Request, Response, Result, StatusCode}; @@ -34,7 +36,7 @@ async fn main() -> std::result::Result<(), Box> { } } -async fn response_examples(req: Request) -> Result> { +async fn response_examples(req: Request) -> Result>> { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/index.html") => simple_file_send(INDEX).await, (&Method::GET, "/no_file.html") => { @@ -46,17 +48,17 @@ async fn response_examples(req: Request) -> Result> { } /// HTTP status code 404 -fn not_found() -> Response { +fn not_found() -> Response> { Response::builder() .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) + .body(Full::new(NOTFOUND.into())) .unwrap() } -async fn simple_file_send(filename: &str) -> Result> { +async fn simple_file_send(filename: &str) -> Result>> { if let Ok(contents) = tokio::fs::read(filename).await { let body = contents.into(); - return Ok(Response::new(body)); + return Ok(Response::new(Full::new(body))); } Ok(not_found()) diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index 0f1ca81ce1..f2a3aba8b6 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -1,3 +1,5 @@ +use bytes::Bytes; +use http_body_util::Full; use hyper::server::conn::Http; use hyper::service::Service; use hyper::{Body, Request, Response}; @@ -36,7 +38,7 @@ struct Svc { } impl Service> for Svc { - type Response = Response; + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; @@ -45,8 +47,8 @@ impl Service> for Svc { } fn call(&mut self, req: Request) -> Self::Future { - fn mk_response(s: String) -> Result, hyper::Error> { - Ok(Response::builder().body(Body::from(s)).unwrap()) + fn mk_response(s: String) -> Result>, hyper::Error> { + Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap()) } let res = match req.uri().path() { diff --git a/examples/state.rs b/examples/state.rs index 3590e6c981..ed3974c1e4 100644 --- a/examples/state.rs +++ b/examples/state.rs @@ -6,8 +6,10 @@ use std::sync::{ Arc, }; +use bytes::Bytes; +use http_body_util::Full; use hyper::{server::conn::Http, service::service_fn}; -use hyper::{Body, Error, Response}; +use hyper::{Error, Response}; use tokio::net::TcpListener; #[tokio::main] @@ -36,7 +38,12 @@ async fn main() -> Result<(), Box> { // Get the current count, and also increment by 1, in a single // atomic operation. let count = counter.fetch_add(1, Ordering::AcqRel); - async move { Ok::<_, Error>(Response::new(Body::from(format!("Request #{}", count)))) } + async move { + Ok::<_, Error>(Response::new(Full::new(Bytes::from(format!( + "Request #{}", + count + ))))) + } }); if let Err(err) = Http::new().serve_connection(stream, service).await { diff --git a/examples/tower_server.rs b/examples/tower_server.rs deleted file mode 100644 index feaa3de09c..0000000000 --- a/examples/tower_server.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![deny(warnings)] - -use std::net::SocketAddr; -use std::task::{Context, Poll}; - -use futures_util::future; -use hyper::server::conn::Http; -use hyper::service::Service; -use hyper::{Body, Request, Response}; -use tokio::net::TcpListener; - -const ROOT: &str = "/"; - -#[derive(Debug)] -pub struct Svc; - -impl Service> for Svc { - type Response = Response; - type Error = hyper::Error; - type Future = future::Ready>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - - fn call(&mut self, req: Request) -> Self::Future { - let rsp = Response::builder(); - - let uri = req.uri(); - if uri.path() != ROOT { - let body = Body::from(Vec::new()); - let rsp = rsp.status(404).body(body).unwrap(); - return future::ok(rsp); - } - - let body = Body::from(Vec::from(&b"heyo!"[..])); - let rsp = rsp.status(200).body(body).unwrap(); - future::ok(rsp) - } -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - - let addr: SocketAddr = "127.0.0.1:1337".parse().unwrap(); - - let listener = TcpListener::bind(addr).await?; - println!("Listening on http://{}", addr); - - loop { - let (stream, _) = listener.accept().await?; - - tokio::task::spawn(async move { - if let Err(err) = Http::new().serve_connection(stream, Svc).await { - println!("Failed to serve connection: {:?}", err); - } - }); - } -} diff --git a/examples/upgrades.rs b/examples/upgrades.rs index de78eea76b..08082af89a 100644 --- a/examples/upgrades.rs +++ b/examples/upgrades.rs @@ -1,18 +1,20 @@ #![deny(warnings)] // Note: `hyper::upgrade` docs link to this upgrade. +use std::net::SocketAddr; use std::str; -use hyper::server::conn::Http; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::watch; +use bytes::Bytes; +use http_body_util::Empty; use hyper::header::{HeaderValue, UPGRADE}; +use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::upgrade::Upgraded; use hyper::{Body, Request, Response, StatusCode}; -use std::net::SocketAddr; // A simple type alias so as to DRY. type Result = std::result::Result>; @@ -36,8 +38,8 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> { } /// Our server HTTP handler to initiate HTTP upgrades. -async fn server_upgrade(mut req: Request) -> Result> { - let mut res = Response::new(Body::empty()); +async fn server_upgrade(mut req: Request) -> Result>> { + let mut res = Response::new(Empty::new()); // Send a 400 to any request that doesn't have // an `Upgrade` header. @@ -91,7 +93,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> { let req = Request::builder() .uri(format!("http://{}/", addr)) .header(UPGRADE, "foobar") - .body(Body::empty()) + .body(Empty::::new()) .unwrap(); let stream = TcpStream::connect(addr).await?; diff --git a/examples/web_api.rs b/examples/web_api.rs index 7db23681c2..d2ac47dbf4 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -2,7 +2,8 @@ use std::net::SocketAddr; -use bytes::Buf; +use bytes::{Buf, Bytes}; +use http_body_util::{BodyExt, Full}; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{header, Body, Method, Request, Response, StatusCode}; @@ -10,6 +11,7 @@ use tokio::net::{TcpListener, TcpStream}; type GenericError = Box; type Result = std::result::Result; +type BoxBody = http_body_util::combinators::BoxBody; static INDEX: &[u8] = b"test.html"; static INTERNAL_SERVER_ERROR: &[u8] = b"Internal Server Error"; @@ -17,12 +19,12 @@ static NOTFOUND: &[u8] = b"Not Found"; static POST_DATA: &str = r#"{"original": "data"}"#; static URL: &str = "http://127.0.0.1:1337/json_api"; -async fn client_request_response() -> Result> { +async fn client_request_response() -> Result> { let req = Request::builder() .method(Method::POST) .uri(URL) .header(header::CONTENT_TYPE, "application/json") - .body(POST_DATA.into()) + .body(Full::new(Bytes::from(POST_DATA))) .unwrap(); let host = req.uri().host().expect("uri has no host"); @@ -39,12 +41,12 @@ async fn client_request_response() -> Result> { let web_res = sender.send_request(req).await?; - let res_body = web_res.into_body(); + let res_body = web_res.into_body().boxed(); Ok(Response::new(res_body)) } -async fn api_post_response(req: Request) -> Result> { +async fn api_post_response(req: Request) -> Result> { // Aggregate the body... let whole_body = hyper::body::aggregate(req).await?; // Decode as JSON... @@ -56,28 +58,28 @@ async fn api_post_response(req: Request) -> Result> { let response = Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(json))?; + .body(full(json))?; Ok(response) } -async fn api_get_response() -> Result> { +async fn api_get_response() -> Result> { let data = vec!["foo", "bar"]; let res = match serde_json::to_string(&data) { Ok(json) => Response::builder() .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(json)) + .body(full(json)) .unwrap(), Err(_) => Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(INTERNAL_SERVER_ERROR.into()) + .body(full(INTERNAL_SERVER_ERROR)) .unwrap(), }; Ok(res) } -async fn response_examples(req: Request) -> Result> { +async fn response_examples(req: Request) -> Result> { match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::GET, "/index.html") => Ok(Response::new(INDEX.into())), + (&Method::GET, "/") | (&Method::GET, "/index.html") => Ok(Response::new(full(INDEX))), (&Method::GET, "/test.html") => client_request_response().await, (&Method::POST, "/json_api") => api_post_response(req).await, (&Method::GET, "/json_api") => api_get_response().await, @@ -85,12 +87,18 @@ async fn response_examples(req: Request) -> Result> { // Return 404 not found response. Ok(Response::builder() .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) + .body(full(NOTFOUND)) .unwrap()) } } } +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + #[tokio::main] async fn main() -> Result<()> { pretty_env_logger::init(); diff --git a/src/body/body.rs b/src/body/body.rs index 008c37404f..ac6318a3d3 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::fmt; use bytes::Bytes; @@ -30,7 +29,8 @@ pub struct Body { } enum Kind { - Once(Option), + #[allow(dead_code)] + Empty, Chan { content_length: DecodedLength, want_tx: watch::Sender, @@ -71,21 +71,6 @@ const WANT_PENDING: usize = 1; const WANT_READY: usize = 2; impl Body { - /// Create an empty `Body` stream. - /// - /// # Example - /// - /// ``` - /// use hyper::{Body, Request}; - /// - /// // create a `GET /` request - /// let get = Request::new(Body::empty()); - /// ``` - #[inline] - pub fn empty() -> Body { - Body::new(Kind::Once(None)) - } - /// Create a `Body` stream with an associated sender half. /// /// Useful when wanting to stream chunks from another thread. @@ -123,6 +108,16 @@ impl Body { Body { kind } } + #[allow(dead_code)] + pub(crate) fn empty() -> Body { + Body::new(Kind::Empty) + } + + #[cfg(feature = "ffi")] + pub(crate) fn ffi() -> Body { + Body::new(Kind::Ffi(crate::ffi::UserBody::new())) + } + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] pub(crate) fn h2( recv: h2::RecvStream, @@ -160,7 +155,7 @@ impl Body { fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { match self.kind { - Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), + Kind::Empty => Poll::Ready(None), Kind::Chan { content_length: ref mut len, ref mut data_rx, @@ -197,23 +192,6 @@ impl Body { Kind::Ffi(ref mut body) => body.poll_data(cx), } } - - #[cfg(feature = "http1")] - pub(super) fn take_full_data(&mut self) -> Option { - if let Kind::Once(ref mut chunk) = self.kind { - chunk.take() - } else { - None - } - } -} - -impl Default for Body { - /// Returns `Body::empty()`. - #[inline] - fn default() -> Body { - Body::empty() - } } impl HttpBody for Body { @@ -232,6 +210,7 @@ impl HttpBody for Body { #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, ) -> Poll, Self::Error>> { match self.kind { + Kind::Empty => Poll::Ready(Ok(None)), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { recv: ref mut h2, @@ -253,13 +232,12 @@ impl HttpBody for Body { }, #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_trailers(cx), - _ => Poll::Ready(Ok(None)), } } fn is_end_stream(&self) -> bool { match self.kind { - Kind::Once(ref val) => val.is_none(), + Kind::Empty => true, Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), @@ -282,8 +260,7 @@ impl HttpBody for Body { } match self.kind { - Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), - Kind::Once(None) => SizeHint::with_exact(0), + Kind::Empty => SizeHint::with_exact(0), Kind::Chan { content_length, .. } => opt_len!(content_length), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { content_length, .. } => opt_len!(content_length), @@ -299,13 +276,10 @@ impl fmt::Debug for Body { struct Streaming; #[derive(Debug)] struct Empty; - #[derive(Debug)] - struct Full<'a>(&'a Bytes); let mut builder = f.debug_tuple("Body"); match self.kind { - Kind::Once(None) => builder.field(&Empty), - Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), + Kind::Empty => builder.field(&Empty), _ => builder.field(&Streaming), }; @@ -313,65 +287,6 @@ impl fmt::Debug for Body { } } -impl From for Body { - #[inline] - fn from(chunk: Bytes) -> Body { - if chunk.is_empty() { - Body::empty() - } else { - Body::new(Kind::Once(Some(chunk))) - } - } -} - -impl From> for Body { - #[inline] - fn from(vec: Vec) -> Body { - Body::from(Bytes::from(vec)) - } -} - -impl From<&'static [u8]> for Body { - #[inline] - fn from(slice: &'static [u8]) -> Body { - Body::from(Bytes::from(slice)) - } -} - -impl From> for Body { - #[inline] - fn from(cow: Cow<'static, [u8]>) -> Body { - match cow { - Cow::Borrowed(b) => Body::from(b), - Cow::Owned(o) => Body::from(o), - } - } -} - -impl From for Body { - #[inline] - fn from(s: String) -> Body { - Body::from(Bytes::from(s.into_bytes())) - } -} - -impl From<&'static str> for Body { - #[inline] - fn from(slice: &'static str) -> Body { - Body::from(Bytes::from(slice.as_bytes())) - } -} - -impl From> for Body { - #[inline] - fn from(cow: Cow<'static, str>) -> Body { - match cow { - Cow::Borrowed(b) => Body::from(b), - Cow::Owned(o) => Body::from(o), - } - } -} - impl Sender { /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { @@ -506,8 +421,6 @@ mod tests { assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); } - eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); - eq(Body::empty(), SizeHint::with_exact(0), "empty"); eq(Body::channel().1, SizeHint::new(), "channel"); diff --git a/src/body/mod.rs b/src/body/mod.rs index 5e2181e941..8c6789daa5 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -29,33 +29,6 @@ mod body; mod length; mod to_bytes; -/// An optimization to try to take a full body if immediately available. -/// -/// This is currently limited to *only* `hyper::Body`s. -#[cfg(feature = "http1")] -pub(crate) fn take_full_data(body: &mut T) -> Option { - use std::any::{Any, TypeId}; - - // This static type check can be optimized at compile-time. - if TypeId::of::() == TypeId::of::() { - let mut full = (body as &mut dyn Any) - .downcast_mut::() - .expect("must be Body") - .take_full_data(); - // This second cast is required to make the type system happy. - // Without it, the compiler cannot reason that the type is actually - // `T::Data`. Oh wells. - // - // It's still a measurable win! - (&mut full as &mut dyn Any) - .downcast_mut::>() - .expect("must be T::Data") - .take() - } else { - None - } -} - fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} diff --git a/src/body/to_bytes.rs b/src/body/to_bytes.rs index 3fbb859c9d..b56c4716df 100644 --- a/src/body/to_bytes.rs +++ b/src/body/to_bytes.rs @@ -17,11 +17,10 @@ use super::HttpBody; /// # Example /// /// ``` -/// # async fn doc() -> hyper::Result<()> { /// # use hyper::{Body, Response}; +/// # async fn doc(response: Response) -> hyper::Result<()> { /// # use hyper::body::HttpBody; -/// # -/// let response = Response::new(Body::from("response body")); +/// // let response: Response ... /// /// const MAX_ALLOWED_RESPONSE_SIZE: u64 = 1024; /// diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 7cf246c5b3..a1b0cc9858 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -120,31 +120,6 @@ where /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will /// be serialized as-is. - /// - /// # Example - /// - /// ``` - /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; - /// # use hyper::Body; - /// use hyper::Request; - /// - /// # async fn doc(mut tx: SendRequest) -> hyper::Result<()> { - /// // build a Request - /// let req = Request::builder() - /// .uri("/foo/bar") - /// .header(HOST, "hyper.rs") - /// .body(Body::empty()) - /// .unwrap(); - /// - /// // send it and await a Response - /// let res = tx.send_request(req).await?; - /// // assert the Response - /// assert!(res.status().is_success()); - /// # Ok(()) - /// # } - /// # fn main() {} - /// ``` pub fn send_request(&mut self, req: Request) -> impl Future>> { let sent = self.dispatch.send(req); diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 3be24ed080..98ba5311a0 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -114,31 +114,6 @@ where /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will /// be serialized as-is. - /// - /// # Example - /// - /// ``` - /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; - /// # use hyper::Body; - /// use hyper::Request; - /// - /// # async fn doc(mut tx: SendRequest) -> hyper::Result<()> { - /// // build a Request - /// let req = Request::builder() - /// .uri("/foo/bar") - /// .header(HOST, "hyper.rs") - /// .body(Body::empty()) - /// .unwrap(); - /// - /// // send it and await a Response - /// let res = tx.send_request(req).await?; - /// // assert the Response - /// assert!(res.status().is_success()); - /// # Ok(()) - /// # } - /// # fn main() {} - /// ``` pub fn send_request(&mut self, req: Request) -> impl Future>> { let sent = self.dispatch.send(req); diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 7880c65a95..19ec8f7130 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -13,10 +13,12 @@ //! ```no_run //! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))] //! # mod rt { -//! use tower::ServiceExt; +//! use bytes::Bytes; //! use http::{Request, StatusCode}; +//! use http_body_util::Empty; //! use hyper::{client::conn, Body}; //! use tokio::net::TcpStream; +//! use tower::ServiceExt; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -35,7 +37,7 @@ //! // We need to manually add the host header because SendRequest does not //! .header("Host", "example.com") //! .method("GET") -//! .body(Body::from(""))?; +//! .body(Empty::::new())?; //! let response = request_sender.send_request(request).await?; //! assert!(response.status() == StatusCode::OK); //! @@ -45,7 +47,7 @@ //! let request = Request::builder() //! .header("Host", "example.com") //! .method("GET") -//! .body(Body::from(""))?; +//! .body(Empty::::new())?; //! let response = request_sender.send_request(request).await?; //! assert!(response.status() == StatusCode::OK); //! Ok(()) @@ -123,11 +125,14 @@ pin_project! { /// /// This is a shortcut for `Builder::new().handshake(io)`. /// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake( +pub async fn handshake( io: T, -) -> crate::Result<(SendRequest, Connection)> +) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: HttpBody + 'static, + B::Data: Send, + B::Error: Into>, { Builder::new().handshake(io).await } @@ -245,31 +250,6 @@ where /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will /// be serialized as-is. - /// - /// # Example - /// - /// ``` - /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; - /// # use hyper::Body; - /// use hyper::Request; - /// - /// # async fn doc(mut tx: SendRequest) -> hyper::Result<()> { - /// // build a Request - /// let req = Request::builder() - /// .uri("/foo/bar") - /// .header(HOST, "hyper.rs") - /// .body(Body::empty()) - /// .unwrap(); - /// - /// // send it and await a Response - /// let res = tx.send_request(req).await?; - /// // assert the Response - /// assert!(res.status().is_success()); - /// # Ok(()) - /// # } - /// # fn main() {} - /// ``` pub fn send_request(&mut self, req: Request) -> ResponseFuture { let inner = match self.dispatch.send(req) { Ok(rx) => ResponseFutureState::Waiting(rx), diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 68de43b347..5e896a9ffe 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -387,7 +387,7 @@ mod tests { let (mut tx, mut rx) = channel::, Response>(); b.iter(move || { - let _ = tx.send(Request::default()).unwrap(); + let _ = tx.send(Request::new(Body::empty())).unwrap(); rt.block_on(async { loop { let poll_once = PollOnce(&mut rx); diff --git a/src/ffi/body.rs b/src/ffi/body.rs index 39ba5beffb..a455dae0e3 100644 --- a/src/ffi/body.rs +++ b/src/ffi/body.rs @@ -33,7 +33,7 @@ ffi_fn! { /// /// If not configured, this body acts as an empty payload. fn hyper_body_new() -> *mut hyper_body { - Box::into_raw(Box::new(hyper_body(Body::empty()))) + Box::into_raw(Box::new(hyper_body(Body::ffi()))) } ?= ptr::null_mut() } diff --git a/src/ffi/http_types.rs b/src/ffi/http_types.rs index ea10f139cb..f96a58d165 100644 --- a/src/ffi/http_types.rs +++ b/src/ffi/http_types.rs @@ -335,7 +335,7 @@ ffi_fn! { /// /// It is safe to free the response even after taking ownership of its body. fn hyper_response_body(resp: *mut hyper_response) -> *mut hyper_body { - let body = std::mem::take(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut()); + let body = std::mem::replace(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut(), crate::Body::empty()); Box::into_raw(Box::new(hyper_body(body))) } ?= std::ptr::null_mut() } diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 37ab380f8b..2db8380c4c 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -522,24 +522,6 @@ where } } - pub(crate) fn write_full_msg(&mut self, head: MessageHead, body: B) { - if let Some(encoder) = - self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) - { - let is_last = encoder.is_last(); - // Make sure we don't write a body if we weren't actually allowed - // to do so, like because its a HEAD request. - if !encoder.is_eof() { - encoder.danger_full_buf(body, self.io.write_buf()); - } - self.state.writing = if is_last { - Writing::Closed - } else { - Writing::KeepAlive - } - } - } - fn encode_head( &mut self, mut head: MessageHead, diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 677131bfdd..5a5daf6d43 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -8,9 +8,7 @@ use tracing::{debug, trace}; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, HttpBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; -use crate::proto::{ - BodyLength, Conn, Dispatched, MessageHead, RequestHead, -}; +use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; pub(crate) struct Dispatcher { @@ -295,16 +293,7 @@ where && self.dispatch.should_poll() { if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { - let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; - - // Check if the body knows its full data immediately. - // - // If so, we can skip a bit of bookkeeping that streaming - // bodies need to do. - if let Some(full) = crate::body::take_full_data(&mut body) { - self.conn.write_full_msg(head, full); - return Poll::Ready(Ok(())); - } + let (head, body) = msg.map_err(crate::Error::new_user_service)?; let body_type = if body.is_end_stream() { self.body_rx.set(None); @@ -708,9 +697,15 @@ mod tests { let dispatcher = Dispatcher::new(Client::new(rx), conn); let _dispatcher = tokio::spawn(async move { dispatcher.await }); + let body = { + let (mut tx, body) = crate::Body::new_channel(DecodedLength::new(4), false); + tx.try_send_data("reee".into()).unwrap(); + body + }; + let req = crate::Request::builder() .method("POST") - .body(crate::Body::from("reee")) + .body(body) .unwrap(); let res = tx.try_send(req).unwrap().await.expect("response"); diff --git a/src/proto/h1/encode.rs b/src/proto/h1/encode.rs index f0aa261a4f..cb4a7841fe 100644 --- a/src/proto/h1/encode.rs +++ b/src/proto/h1/encode.rs @@ -180,39 +180,6 @@ impl Encoder { } } } - - /// Encodes the full body, without verifying the remaining length matches. - /// - /// This is used in conjunction with HttpBody::__hyper_full_data(), which - /// means we can trust that the buf has the correct size (the buf itself - /// was checked to make the headers). - pub(super) fn danger_full_buf(self, msg: B, dst: &mut WriteBuf>) - where - B: Buf, - { - debug_assert!(msg.remaining() > 0, "encode() called with empty buf"); - debug_assert!( - match self.kind { - Kind::Length(len) => len == msg.remaining() as u64, - _ => true, - }, - "danger_full_buf length mismatches" - ); - - match self.kind { - Kind::Chunked => { - let len = msg.remaining(); - trace!("encoding chunked {}B", len); - let buf = ChunkSize::new(len) - .chain(msg) - .chain(b"\r\n0\r\n\r\n" as &'static [u8]); - dst.buffer(buf); - } - _ => { - dst.buffer(msg); - } - } - } } impl Buf for EncodedBuf diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index d24e6bac5f..0a539692de 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -503,7 +503,6 @@ where } } - if !body.is_end_stream() { // automatically set Content-Length from body... if let Some(len) = body.size_hint().exact() { diff --git a/src/server/conn.rs b/src/server/conn.rs index 9b6140b141..a447f2bcc0 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -11,7 +11,8 @@ //! # #[cfg(all(feature = "http1", feature = "runtime"))] //! # mod rt { //! use http::{Request, Response, StatusCode}; -//! use hyper::{server::conn::Http, service::service_fn, Body}; +//! use http_body_util::Full; +//! use hyper::{server::conn::Http, service::service_fn, body::Bytes}; //! use std::{net::SocketAddr, convert::Infallible}; //! use tokio::net::TcpListener; //! @@ -34,8 +35,8 @@ //! } //! } //! -//! async fn hello(_req: Request) -> Result, Infallible> { -//! Ok(Response::new(Body::from("Hello World!"))) +//! async fn hello(_req: Request) -> Result>, Infallible> { +//! Ok(Response::new(Full::new(Bytes::from("Hello World!")))) //! } //! # } //! ``` diff --git a/src/service/util.rs b/src/service/util.rs index 7cba1206f1..241d685b11 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -11,12 +11,14 @@ use crate::{Request, Response}; /// # Example /// /// ``` +/// use bytes::Bytes; /// use hyper::{Body, Request, Response, Version}; +/// use http_body_util::Full; /// use hyper::service::service_fn; /// /// let service = service_fn(|req: Request| async move { /// if req.version() == Version::HTTP_11 { -/// Ok(Response::new(Body::from("Hello World"))) +/// Ok(Response::new(Full::::from("Hello World"))) /// } else { /// // Note: it's usually better to return a Response /// // with an appropriate StatusCode instead of an Err. diff --git a/tests/client.rs b/tests/client.rs index 5360b71ad5..6540a04459 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1332,9 +1332,10 @@ mod conn { use std::thread; use std::time::Duration; - use bytes::Buf; + use bytes::{Buf, Bytes}; use futures_channel::oneshot; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use http_body_util::Empty; use hyper::upgrade::OnUpgrade; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; @@ -1379,7 +1380,7 @@ mod conn { let req = Request::builder() .uri("/a") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let mut res = client.send_request(req).await.expect("send_request"); assert_eq!(res.status(), hyper::StatusCode::OK); @@ -1423,7 +1424,7 @@ mod conn { let req = Request::builder() .uri("/a") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let mut res = client.send_request(req).await.expect("send_request"); assert_eq!(res.status(), hyper::StatusCode::OK); @@ -1479,7 +1480,7 @@ mod conn { let req = Request::builder() .uri("/") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res = client.send_request(req).and_then(move |mut res| { assert_eq!(res.status(), hyper::StatusCode::OK); @@ -1576,7 +1577,7 @@ mod conn { let req = Request::builder() .uri("http://hyper.local/a") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res = client.send_request(req).and_then(move |res| { @@ -1622,7 +1623,7 @@ mod conn { let req = Request::builder() .uri("/a") .version(hyper::Version::HTTP_2) - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res = client.send_request(req).and_then(move |res| { @@ -1663,7 +1664,7 @@ mod conn { let req = Request::builder() .uri("/a") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); @@ -1673,7 +1674,7 @@ mod conn { // pipelined request will hit NotReady, and thus should return an Error::Cancel let req = Request::builder() .uri("/b") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res2 = client.send_request(req).map(|result| { let err = result.expect_err("res2"); @@ -1734,7 +1735,7 @@ mod conn { let req = Request::builder() .uri("/a") - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); @@ -1821,7 +1822,7 @@ mod conn { let req = Request::builder() .method("CONNECT") .uri(addr.to_string()) - .body(Default::default()) + .body(Empty::::new()) .unwrap(); let res = client .send_request(req) @@ -1886,7 +1887,7 @@ mod conn { res = listener.accept() => { let (stream, _) = res.unwrap(); - let service = service_fn(|_:Request| future::ok::, hyper::Error>(Response::new(Body::empty()))); + let service = service_fn(|_:Request| future::ok::<_, hyper::Error>(Response::new(Empty::::new()))); let mut shdn_rx = shdn_rx.clone(); tokio::task::spawn(async move { @@ -1913,7 +1914,7 @@ mod conn { let io = tcp_connect(&addr).await.expect("tcp connect"); let (mut client, conn) = conn::Builder::new() .http2_only(true) - .handshake::<_, Body>(io) + .handshake(io) .await .expect("http handshake"); @@ -1928,7 +1929,7 @@ mod conn { let req = Request::builder() .uri(format!("http://{}/", addr)) - .body(Body::empty()) + .body(Empty::::new()) .expect("request builder"); client.send_request(req).await.expect("req1 send"); @@ -2046,7 +2047,7 @@ mod conn { .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) - .handshake::<_, Body>(io) + .handshake(io) .await .expect("http handshake"); @@ -2055,7 +2056,7 @@ mod conn { assert!(err.is_timeout()); }); - let req = http::Request::new(hyper::Body::empty()); + let req = http::Request::new(Empty::::new()); let err = client .send_request(req) .await @@ -2098,7 +2099,7 @@ mod conn { .await .expect("server req body aggregate"); }); - Ok::<_, hyper::Error>(http::Response::new(hyper::Body::empty())) + Ok::<_, hyper::Error>(http::Response::new(Empty::::new())) }), ) .await @@ -2153,7 +2154,7 @@ mod conn { let mut body = req.into_body(); - let mut send_stream = respond.send_response(Response::default(), false).unwrap(); + let mut send_stream = respond.send_response(Response::new(()), false).unwrap(); send_stream.send_data("Bread?".into(), true).unwrap(); @@ -2167,7 +2168,7 @@ mod conn { let io = tcp_connect(&addr).await.expect("tcp connect"); let (mut client, conn) = conn::Builder::new() .http2_only(true) - .handshake::<_, Body>(io) + .handshake(io) .await .expect("http handshake"); @@ -2176,7 +2177,7 @@ mod conn { }); let req = Request::connect("localhost") - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(); let res = client.send_request(req).await.expect("send_request"); assert_eq!(res.status(), StatusCode::OK); @@ -2223,7 +2224,7 @@ mod conn { let io = tcp_connect(&addr).await.expect("tcp connect"); let (mut client, conn) = conn::Builder::new() .http2_only(true) - .handshake::<_, Body>(io) + .handshake::<_, Empty>(io) .await .expect("http handshake"); @@ -2231,9 +2232,7 @@ mod conn { conn.await.expect("client conn shouldn't error"); }); - let req = Request::connect("localhost") - .body(hyper::Body::empty()) - .unwrap(); + let req = Request::connect("localhost").body(Empty::new()).unwrap(); let res = client.send_request(req).await.expect("send_request"); assert_eq!(res.status(), StatusCode::BAD_REQUEST); assert!(res.extensions().get::().is_none()); diff --git a/tests/server.rs b/tests/server.rs index 40e01b1d1e..72581d806c 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -20,7 +20,7 @@ use futures_util::future::{self, Either, FutureExt, TryFutureExt}; use h2::client::SendRequest; use h2::{RecvStream, SendStream}; use http::header::{HeaderName, HeaderValue}; -use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; @@ -963,7 +963,7 @@ async fn expect_continue_waits_for_body_poll() { drop(req); Response::builder() .status(StatusCode::BAD_REQUEST) - .body(hyper::Body::empty()) + .body(Empty::::new()) }) }), ) @@ -1256,7 +1256,7 @@ async fn http1_allow_half_close() { socket, service_fn(|_| { tokio::time::sleep(Duration::from_millis(500)) - .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) + .map(|_| Ok::<_, hyper::Error>(Response::new(Empty::::new()))) }), ) .await @@ -1317,7 +1317,7 @@ async fn returning_1xx_response_is_error() { Ok::<_, hyper::Error>( Response::builder() .status(StatusCode::CONTINUE) - .body(Body::empty()) + .body(Empty::::new()) .unwrap(), ) }), @@ -1382,7 +1382,7 @@ async fn header_read_timeout_slow_writes() { service_fn(|_| { let res = Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(); future::ready(Ok::<_, hyper::Error>(res)) }), @@ -1457,7 +1457,7 @@ async fn header_read_timeout_slow_writes_multiple_requests() { service_fn(|_| { let res = Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(); future::ready(Ok::<_, hyper::Error>(res)) }), @@ -1503,7 +1503,7 @@ async fn upgrades() { let res = Response::builder() .status(101) .header("upgrade", "foobar") - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(); future::ready(Ok::<_, hyper::Error>(res)) }), @@ -1557,7 +1557,7 @@ async fn http_connect() { service_fn(|_| { let res = Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(); future::ready(Ok::<_, hyper::Error>(res)) }), @@ -1618,7 +1618,7 @@ async fn upgrades_new() { Response::builder() .status(101) .header("upgrade", "foobar") - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -1655,7 +1655,7 @@ async fn upgrades_ignored() { tokio::spawn(async move { let svc = service_fn(move |req: Request| { assert_eq!(req.headers()["upgrade"], "yolo"); - future::ok::<_, hyper::Error>(Response::new(hyper::Body::empty())) + future::ok::<_, hyper::Error>(Response::new(Empty::::new())) }); loop { @@ -1678,7 +1678,7 @@ async fn upgrades_ignored() { .uri(&*url) .header("upgrade", "yolo") .header("connection", "upgrade") - .body(hyper::Body::empty()) + .body(Empty::::new()) .expect("make_req") }; @@ -1726,7 +1726,7 @@ async fn http_connect_new() { future::ok::<_, hyper::Error>( Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -1808,7 +1808,7 @@ async fn h2_connect() { future::ok::<_, hyper::Error>( Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -1920,7 +1920,7 @@ async fn h2_connect_multiplex() { future::ok::<_, hyper::Error>( Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -1995,7 +1995,7 @@ async fn h2_connect_large_body() { future::ok::<_, hyper::Error>( Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -2067,7 +2067,7 @@ async fn h2_connect_empty_frames() { future::ok::<_, hyper::Error>( Response::builder() .status(200) - .body(hyper::Body::empty()) + .body(Empty::::new()) .unwrap(), ) }); @@ -2518,7 +2518,7 @@ async fn http2_keep_alive_with_responsive_client() { let tcp = connect_async(addr).await; let (mut client, conn) = hyper::client::conn::Builder::new() .http2_only(true) - .handshake::<_, Body>(tcp) + .handshake(tcp) .await .expect("http handshake"); @@ -2528,7 +2528,7 @@ async fn http2_keep_alive_with_responsive_client() { tokio::time::sleep(Duration::from_secs(4)).await; - let req = http::Request::new(hyper::Body::empty()); + let req = http::Request::new(Empty::::new()); client.send_request(req).await.expect("client.send_request"); } @@ -2849,16 +2849,16 @@ const HELLO: &str = "hello"; struct HelloWorld; impl tower_service::Service> for HelloWorld { - type Response = Response; + type Response = Response>; type Error = hyper::Error; - type Future = future::Ready, Self::Error>>; + type Future = future::Ready>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Ok(()).into() } fn call(&mut self, _req: Request) -> Self::Future { - let response = Response::new(HELLO.into()); + let response = Response::new(Full::new(HELLO.into())); future::ok(response) } } @@ -3132,13 +3132,13 @@ impl TestClient { Request::builder() .uri(uri) .method(Method::GET) - .body(Body::empty()) + .body(Empty::::new()) .unwrap(), ) .await } - async fn request(&self, req: Request) -> Result, hyper::Error> { + async fn request(&self, req: Request>) -> Result, hyper::Error> { let host = req.uri().host().expect("uri has no host"); let port = req.uri().port_u16().expect("uri has no port"); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 781e71baee..3028105433 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -6,6 +6,8 @@ use std::sync::{ Arc, Mutex, }; +use bytes::Bytes; +use http_body_util::Full; use hyper::client::conn::Builder; use hyper::server::conn::Http; use tokio::net::{TcpListener, TcpStream}; @@ -371,7 +373,7 @@ async fn async_test(cfg: __TestConfig) { let mut res = Response::builder() .status(sres.status) - .body(Body::from(sres.body)) + .body(Full::new(Bytes::from(sres.body))) .expect("Response::build"); *res.headers_mut() = sres.headers; res @@ -405,7 +407,7 @@ async fn async_test(cfg: __TestConfig) { .method(creq.method) .uri(uri) //.headers(creq.headers) - .body(creq.body.into()) + .body(Full::new(Bytes::from(creq.body))) .expect("Request::build"); *req.headers_mut() = creq.headers; let cstatus = cres.status; @@ -417,7 +419,7 @@ async fn async_test(cfg: __TestConfig) { let (mut sender, conn) = hyper::client::conn::Builder::new() .http2_only(http2_only) - .handshake::(stream) + .handshake(stream) .await .unwrap();