diff --git a/examples/client.rs b/examples/client.rs index 9c411254b6..ffcc026719 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -3,8 +3,8 @@ use std::env; use bytes::Bytes; -use http_body_util::Empty; -use hyper::{body::Body as _, Request}; +use http_body_util::{BodyExt, Empty}; +use hyper::Request; use tokio::io::{self, AsyncWriteExt as _}; use tokio::net::TcpStream; @@ -62,9 +62,11 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> { // Stream the body, writing each chunk to stdout as we get it // (instead of buffering and printing at the end). - while let Some(next) = res.data().await { - let chunk = next?; - io::stdout().write_all(&chunk).await?; + while let Some(next) = res.frame().await { + let frame = next?; + if let Some(chunk) = frame.data_ref() { + io::stdout().write_all(&chunk).await?; + } } println!("\n\nDone!"); diff --git a/examples/client_json.rs b/examples/client_json.rs index 2084e071fe..4ba6787a6e 100644 --- a/examples/client_json.rs +++ b/examples/client_json.rs @@ -2,7 +2,7 @@ #![warn(rust_2018_idioms)] use bytes::Bytes; -use http_body_util::Empty; +use http_body_util::{BodyExt, Empty}; use hyper::{body::Buf, Request}; use serde::Deserialize; use tokio::net::TcpStream; @@ -48,7 +48,7 @@ async fn fetch_json(url: hyper::Uri) -> Result> { let res = sender.send_request(req).await?; // asynchronously aggregate the chunks of the body - let body = hyper::body::aggregate(res).await?; + let body = res.collect().await?.aggregate(); // try to parse as json with serde_json let users = serde_json::from_reader(body.reader())?; diff --git a/examples/echo.rs b/examples/echo.rs index ba5096e7a6..c7d4fa98f9 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -50,7 +50,7 @@ async fn echo(req: Request) -> Result>(); Ok(Response::new(full(reversed_body))) diff --git a/examples/params.rs b/examples/params.rs index 44bf877bdf..cce182583e 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -25,7 +25,7 @@ async fn param_example( (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))), (&Method::POST, "/post") => { // Concatenate the body... - let b = hyper::body::to_bytes(req).await?; + let b = req.collect().await?.to_bytes(); // Parse the request body. form_urlencoded::parse // always succeeds, but in general parsing may // fail (for example, an invalid post of json), so diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 2d991d0148..ee109d54fa 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -6,8 +6,7 @@ use std::net::SocketAddr; use std::rc::Rc; use tokio::net::TcpListener; -use hyper::body::{Body as HttpBody, Bytes}; -use hyper::header::{HeaderMap, HeaderValue}; +use hyper::body::{Body as HttpBody, Bytes, Frame}; use hyper::service::service_fn; use hyper::{Error, Response}; use std::marker::PhantomData; @@ -33,18 +32,11 @@ impl HttpBody for Body { type Data = Bytes; type Error = Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(self.get_mut().data.take().map(Ok)) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll>, Self::Error>> { - Poll::Ready(Ok(None)) + ) -> Poll, Self::Error>>> { + Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) } } diff --git a/examples/web_api.rs b/examples/web_api.rs index b01e355665..47c30cd852 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -48,7 +48,7 @@ async fn client_request_response() -> Result> { async fn api_post_response(req: Request) -> Result> { // Aggregate the body... - let whole_body = hyper::body::aggregate(req).await?; + let whole_body = req.collect().await?.aggregate(); // Decode as JSON... let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?; // Change the JSON... diff --git a/src/body/aggregate.rs b/src/body/aggregate.rs deleted file mode 100644 index 8a27b36051..0000000000 --- a/src/body/aggregate.rs +++ /dev/null @@ -1,31 +0,0 @@ -use bytes::Buf; - -use super::Body; -use crate::common::buf::BufList; - -/// Aggregate the data buffers from a body asynchronously. -/// -/// The returned `impl Buf` groups the `Buf`s from the `Body` without -/// copying them. This is ideal if you don't require a contiguous buffer. -/// -/// # Note -/// -/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length -/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the -/// `Content-Length` is a possibility, but it is not strictly mandated to be present. -pub async fn aggregate(body: T) -> Result -where - T: Body, -{ - let mut bufs = BufList::new(); - - futures_util::pin_mut!(body); - while let Some(buf) = body.data().await { - let buf = buf?; - if buf.has_remaining() { - bufs.push(buf); - } - } - - Ok(bufs) -} diff --git a/src/body/body.rs b/src/body/body.rs index 616e617492..cb059ecc5d 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -3,9 +3,9 @@ use std::fmt; use bytes::Bytes; use futures_channel::mpsc; use futures_channel::oneshot; -use futures_core::Stream; // for mpsc::Receiver +use futures_core::{Stream, FusedStream}; // for mpsc::Receiver use http::HeaderMap; -use http_body::{Body, SizeHint}; +use http_body::{Body, Frame, SizeHint}; use super::DecodedLength; use crate::common::Future; @@ -39,8 +39,9 @@ enum Kind { }, #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] H2 { - ping: ping::Recorder, content_length: DecodedLength, + data_done: bool, + ping: ping::Recorder, recv: h2::RecvStream, }, #[cfg(feature = "ffi")] @@ -131,6 +132,7 @@ impl Recv { content_length = DecodedLength::ZERO; } let body = Recv::new(Kind::H2 { + data_done: false, ping, content_length, recv, @@ -153,86 +155,78 @@ impl Recv { _ => unreachable!(), } } +} - fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { +impl Body for Recv { + type Data = Bytes; + type Error = crate::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>> { match self.kind { Kind::Empty => Poll::Ready(None), Kind::Chan { content_length: ref mut len, ref mut data_rx, ref mut want_tx, - .. + ref mut trailers_rx, } => { want_tx.send(WANT_READY); - match ready!(Pin::new(data_rx).poll_next(cx)?) { - Some(chunk) => { - len.sub_if(chunk.len() as u64); - Poll::Ready(Some(Ok(chunk))) + if !data_rx.is_terminated() { + match ready!(Pin::new(data_rx).poll_next(cx)?) { + Some(chunk) => { + len.sub_if(chunk.len() as u64); + return Poll::Ready(Some(Ok(Frame::data(chunk)))); + } + // fall through to trailers + None => (), } - None => Poll::Ready(None), } - } + + // check trailers after data is terminated + match ready!(Pin::new(trailers_rx).poll(cx)) { + Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))), + Err(_) => Poll::Ready(None), + } + }, #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { + ref mut data_done, ref ping, recv: ref mut h2, content_length: ref mut len, - } => match ready!(h2.poll_data(cx)) { - Some(Ok(bytes)) => { - let _ = h2.flow_control().release_capacity(bytes.len()); - len.sub_if(bytes.len() as u64); - ping.record_data(bytes.len()); - Poll::Ready(Some(Ok(bytes))) + } => { + if !*data_done { + match ready!(h2.poll_data(cx)) { + Some(Ok(bytes)) => { + let _ = h2.flow_control().release_capacity(bytes.len()); + len.sub_if(bytes.len() as u64); + ping.record_data(bytes.len()); + return Poll::Ready(Some(Ok(Frame::data(bytes)))) + } + Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))), + None => { + *data_done = true; + // fall through to trailers + }, + } } - Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), - None => Poll::Ready(None), - }, - #[cfg(feature = "ffi")] - Kind::Ffi(ref mut body) => body.poll_data(cx), - } - } -} - -impl Body for Recv { - type Data = Bytes; - type Error = crate::Error; - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - self.poll_inner(cx) - } - - fn poll_trailers( - #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, - #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>> { - match self.kind { - Kind::Empty => Poll::Ready(Ok(None)), - #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { - recv: ref mut h2, - ref ping, - .. - } => match ready!(h2.poll_trailers(cx)) { - Ok(t) => { - ping.record_non_data(); - Poll::Ready(Ok(t)) + // after data, check trailers + match ready!(h2.poll_trailers(cx)) { + Ok(t) => { + ping.record_non_data(); + Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) + }, + Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), } - Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), - }, - Kind::Chan { - ref mut trailers_rx, - .. - } => match ready!(Pin::new(trailers_rx).poll(cx)) { - Ok(t) => Poll::Ready(Ok(Some(t))), - Err(_) => Poll::Ready(Ok(None)), }, + #[cfg(feature = "ffi")] - Kind::Ffi(ref mut body) => body.poll_trailers(cx), + Kind::Ffi(ref mut body) => body.poll_data(cx), } } @@ -386,6 +380,7 @@ mod tests { use std::mem; use std::task::Poll; + use http_body_util::BodyExt; use super::{Body, DecodedLength, Recv, Sender, SizeHint}; #[test] @@ -394,7 +389,7 @@ mod tests { // the size by too much. let body_size = mem::size_of::(); - let body_expected_size = mem::size_of::() * 6; + let body_expected_size = mem::size_of::() * 5; assert!( body_size <= body_expected_size, "Body size = {} <= {}", @@ -443,7 +438,7 @@ mod tests { tx.abort(); - let err = rx.data().await.unwrap().unwrap_err(); + let err = rx.frame().await.unwrap().unwrap_err(); assert!(err.is_body_write_aborted(), "{:?}", err); } @@ -456,10 +451,10 @@ mod tests { // buffer is full, but can still send abort tx.abort(); - let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); + let chunk1 = rx.frame().await.expect("item 1").expect("chunk 1").into_data().unwrap(); assert_eq!(chunk1, "chunk 1"); - let err = rx.data().await.unwrap().unwrap_err(); + let err = rx.frame().await.unwrap().unwrap_err(); assert!(err.is_body_write_aborted(), "{:?}", err); } @@ -479,7 +474,7 @@ mod tests { async fn channel_empty() { let (_, mut rx) = Recv::channel(); - assert!(rx.data().await.is_none()); + assert!(rx.frame().await.is_none()); } #[test] @@ -496,7 +491,7 @@ mod tests { let (mut tx, mut rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); let mut tx_ready = tokio_test::task::spawn(tx.ready()); - let mut rx_data = tokio_test::task::spawn(rx.data()); + let mut rx_data = tokio_test::task::spawn(rx.frame()); assert!( tx_ready.poll().is_pending(), diff --git a/src/body/mod.rs b/src/body/mod.rs index 30ee91ad7e..36a3138282 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -18,17 +18,14 @@ pub use bytes::{Buf, Bytes}; pub use http_body::Body; pub use http_body::SizeHint; +pub use http_body::Frame; -pub use self::aggregate::aggregate; pub use self::body::Recv; pub(crate) use self::body::Sender; pub(crate) use self::length::DecodedLength; -pub use self::to_bytes::to_bytes; -mod aggregate; mod body; mod length; -mod to_bytes; fn _assert_send_sync() { fn _assert_send() {} diff --git a/src/body/to_bytes.rs b/src/body/to_bytes.rs deleted file mode 100644 index 19d23412ce..0000000000 --- a/src/body/to_bytes.rs +++ /dev/null @@ -1,70 +0,0 @@ -use bytes::{Buf, BufMut, Bytes}; - -use super::Body; - -/// Concatenate the buffers from a body into a single `Bytes` asynchronously. -/// -/// This may require copying the data into a single buffer. If you don't need -/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate()) -/// function. -/// -/// # Note -/// -/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length -/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the -/// `Content-Length` is a possibility, but it is not strictly mandated to be present. -/// -/// # Example -/// -/// ``` -/// # use hyper::{Recv, Response}; -/// # async fn doc(response: Response) -> hyper::Result<()> { -/// # use hyper::body::Body; -/// // let response: Response ... -/// -/// const MAX_ALLOWED_RESPONSE_SIZE: u64 = 1024; -/// -/// let response_content_length = match response.body().size_hint().upper() { -/// Some(v) => v, -/// None => MAX_ALLOWED_RESPONSE_SIZE + 1 // Just to protect ourselves from a malicious response -/// }; -/// -/// if response_content_length < MAX_ALLOWED_RESPONSE_SIZE { -/// let body_bytes = hyper::body::to_bytes(response.into_body()).await?; -/// println!("body: {:?}", body_bytes); -/// } -/// -/// # Ok(()) -/// # } -/// ``` -pub async fn to_bytes(body: T) -> Result -where - T: Body, -{ - futures_util::pin_mut!(body); - - // If there's only 1 chunk, we can just return Buf::to_bytes() - let mut first = if let Some(buf) = body.data().await { - buf? - } else { - return Ok(Bytes::new()); - }; - - let second = if let Some(buf) = body.data().await { - buf? - } else { - return Ok(first.copy_to_bytes(first.remaining())); - }; - - // With more than 1 buf, we gotta flatten into a Vec first. - let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize; - let mut vec = Vec::with_capacity(cap); - vec.put(first); - vec.put(second); - - while let Some(buf) = body.data().await { - vec.put(buf?); - } - - Ok(vec.into()) -} diff --git a/src/ffi/body.rs b/src/ffi/body.rs index b6f90879f3..b492a05cb6 100644 --- a/src/ffi/body.rs +++ b/src/ffi/body.rs @@ -8,7 +8,7 @@ use libc::{c_int, size_t}; use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType}; use super::{UserDataPointer, HYPER_ITER_CONTINUE}; -use crate::body::{Body as _, Bytes, Recv}; +use crate::body::{Body as _, Bytes, Frame, Recv}; /// A streaming HTTP body. pub struct hyper_body(pub(super) Recv); @@ -136,7 +136,7 @@ impl UserBody { } } - pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll>> { + pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll>>> { let mut out = std::ptr::null_mut(); match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) { super::task::HYPER_POLL_READY => { @@ -144,7 +144,7 @@ impl UserBody { Poll::Ready(None) } else { let buf = unsafe { Box::from_raw(out) }; - Poll::Ready(Some(Ok(buf.0))) + Poll::Ready(Some(Ok(Frame::data(buf.0)))) } } super::task::HYPER_POLL_PENDING => Poll::Pending, @@ -157,13 +157,6 @@ impl UserBody { ))))), } } - - pub(crate) fn poll_trailers( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(Ok(None)) - } } /// cbindgen:ignore diff --git a/src/lib.rs b/src/lib.rs index 2bd4d759d3..9b2a075ef3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ #![deny(missing_debug_implementations)] #![cfg_attr(test, deny(rust_2018_idioms))] #![cfg_attr(all(test, feature = "full"), deny(unreachable_pub))] -#![cfg_attr(all(test, feature = "full"), deny(warnings))] +//#![cfg_attr(all(test, feature = "full"), deny(warnings))] #![cfg_attr(all(test, feature = "nightly"), feature(test))] #![cfg_attr(docsrs, feature(doc_cfg))] diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index e08cd562d5..a1c9341953 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -333,12 +333,18 @@ where continue; } - let item = ready!(body.as_mut().poll_data(cx)); + let item = ready!(body.as_mut().poll_frame(cx)); if let Some(item) = item { - let chunk = item.map_err(|e| { + let frame = item.map_err(|e| { *clear_body = true; crate::Error::new_user_body(e) })?; + let chunk = if frame.is_data() { + frame.into_data().unwrap() + } else { + trace!("discarding non-data frame"); + continue; + }; let eos = body.is_end_stream(); if eos { *clear_body = true; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 288a444bb2..620ef33401 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -119,43 +119,44 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut me = self.project(); loop { - if !*me.data_done { - // we don't have the next chunk of data yet, so just reserve 1 byte to make - // sure there's some capacity available. h2 will handle the capacity management - // for the actual body chunk. - me.body_tx.reserve_capacity(1); - - if me.body_tx.capacity() == 0 { - loop { - match ready!(me.body_tx.poll_capacity(cx)) { - Some(Ok(0)) => {} - Some(Ok(_)) => break, - Some(Err(e)) => { - return Poll::Ready(Err(crate::Error::new_body_write(e))) - } - None => { - // None means the stream is no longer in a - // streaming state, we either finished it - // somehow, or the remote reset us. - return Poll::Ready(Err(crate::Error::new_body_write( - "send stream capacity unexpectedly closed", - ))); - } + // we don't have the next chunk of data yet, so just reserve 1 byte to make + // sure there's some capacity available. h2 will handle the capacity management + // for the actual body chunk. + me.body_tx.reserve_capacity(1); + + if me.body_tx.capacity() == 0 { + loop { + match ready!(me.body_tx.poll_capacity(cx)) { + Some(Ok(0)) => {} + Some(Ok(_)) => break, + Some(Err(e)) => { + return Poll::Ready(Err(crate::Error::new_body_write(e))) + } + None => { + // None means the stream is no longer in a + // streaming state, we either finished it + // somehow, or the remote reset us. + return Poll::Ready(Err(crate::Error::new_body_write( + "send stream capacity unexpectedly closed", + ))); } } - } else if let Poll::Ready(reason) = me - .body_tx - .poll_reset(cx) - .map_err(crate::Error::new_body_write)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( - reason, - )))); } + } else if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( + reason, + )))); + } - match ready!(me.stream.as_mut().poll_data(cx)) { - Some(Ok(chunk)) => { + match ready!(me.stream.as_mut().poll_frame(cx)) { + Some(Ok(frame)) => { + if frame.is_data() { + let chunk = frame.into_data().unwrap(); let is_eos = me.stream.is_end_stream(); trace!( "send body chunk: {} bytes, eos={}", @@ -171,43 +172,24 @@ where if is_eos { return Poll::Ready(Ok(())); } - } - Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), - None => { + } else if frame.is_trailers() { + // no more DATA, so give any capacity back me.body_tx.reserve_capacity(0); - let is_eos = me.stream.is_end_stream(); - if is_eos { - return Poll::Ready(me.body_tx.send_eos_frame()); - } else { - *me.data_done = true; - // loop again to poll_trailers - } - } - } - } else { - if let Poll::Ready(reason) = me - .body_tx - .poll_reset(cx) - .map_err(crate::Error::new_body_write)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( - reason, - )))); - } - - match ready!(me.stream.poll_trailers(cx)) { - Ok(Some(trailers)) => { me.body_tx - .send_trailers(trailers) + .send_trailers(frame.into_trailers().unwrap()) .map_err(crate::Error::new_body_write)?; return Poll::Ready(Ok(())); + } else { + trace!("discarding unknown frame"); + // loop again } - Ok(None) => { - // There were no trailers, so send an empty DATA frame... - return Poll::Ready(me.body_tx.send_eos_frame()); - } - Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), + } + Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), + None => { + // no more frames means we're done here + // but at this point, we haven't sent an EOS DATA, or + // any trailers, so send an empty EOS DATA. + return Poll::Ready(me.body_tx.send_eos_frame()); } } } diff --git a/tests/client.rs b/tests/client.rs index 63cbe8d270..dff09aec12 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -14,7 +14,7 @@ use std::time::Duration; use http::uri::PathAndQuery; use http_body_util::{BodyExt, StreamBody}; -use hyper::body::to_bytes as concat; +use hyper::body::Frame; use hyper::header::HeaderValue; use hyper::{Method, Request, StatusCode, Uri, Version}; @@ -29,6 +29,13 @@ fn s(buf: &[u8]) -> &str { std::str::from_utf8(buf).expect("from_utf8") } +async fn concat(b: B) -> Result +where + B: hyper::body::Body, +{ + b.collect().await.map(|c| c.to_bytes()) +} + fn tcp_connect(addr: &SocketAddr) -> impl Future> { TcpStream::connect(*addr) } @@ -398,7 +405,7 @@ macro_rules! __client_req_prop { }}; ($req_builder:ident, $body:ident, $addr:ident, body_stream: $body_e:expr) => {{ - $body = BodyExt::boxed(StreamBody::new($body_e)); + $body = BodyExt::boxed(StreamBody::new(futures_util::TryStreamExt::map_ok($body_e, Frame::data))); }}; } @@ -1327,12 +1334,12 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; - use http_body_util::{Empty, StreamBody}; + use http_body_util::{BodyExt, Empty, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; - use hyper::body::Body; + use hyper::body::{Body, Frame}; use hyper::client::conn; use hyper::upgrade::OnUpgrade; use hyper::{self, Method, Recv, Request, Response, StatusCode}; @@ -1379,7 +1386,7 @@ mod conn { .unwrap(); let mut res = client.send_request(req).await.expect("send_request"); assert_eq!(res.status(), hyper::StatusCode::OK); - assert!(res.body_mut().data().await.is_none()); + assert!(res.body_mut().frame().await.is_none()); }; future::join(server, client).await; @@ -1435,7 +1442,7 @@ mod conn { res.headers().get(http::header::CONTENT_LENGTH).unwrap(), "0" ); - assert!(res.body_mut().data().await.is_none()); + assert!(res.body_mut().frame().await.is_none()); }; future::join(server, client).await; @@ -1443,8 +1450,6 @@ mod conn { #[test] fn incoming_content_length() { - use hyper::body::Body; - let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); let rt = support::runtime(); @@ -1481,13 +1486,13 @@ mod conn { assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.body().size_hint().exact(), Some(5)); assert!(!res.body().is_end_stream()); - poll_fn(move |ctx| Pin::new(res.body_mut()).poll_data(ctx)).map(Option::unwrap) + poll_fn(move |ctx| Pin::new(res.body_mut()).poll_frame(ctx)).map(Option::unwrap) }); let rx = rx1.expect("thread panicked"); let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - assert_eq!(chunk.len(), 5); + assert_eq!(chunk.data_ref().unwrap().len(), 5); } #[test] @@ -1519,10 +1524,10 @@ mod conn { rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); - let (mut sender, recv) = mpsc::channel::>>(0); + let (mut sender, recv) = mpsc::channel::, Box>>(0); let sender = thread::spawn(move || { - sender.try_send(Ok("hello".into())).expect("try_send_data"); + sender.try_send(Ok(Frame::data("hello".into()))).expect("try_send_data"); support::runtime().block_on(rx).unwrap(); // Aborts the body in an abnormal fashion. @@ -2100,7 +2105,7 @@ mod conn { sock, service_fn(|req| async move { tokio::spawn(async move { - let _ = hyper::body::aggregate(req.into_body()) + let _ = concat(req.into_body()) .await .expect("server req body aggregate"); }); @@ -2126,7 +2131,7 @@ mod conn { }); // Use a channel to keep request stream open - let (_tx, recv) = mpsc::channel::>>(0); + let (_tx, recv) = mpsc::channel::, Box>>(0); let req = http::Request::new(StreamBody::new(recv)); let _resp = client.send_request(req).await.expect("send_request"); @@ -2245,7 +2250,7 @@ mod conn { assert!(res.extensions().get::().is_none()); let mut body = String::new(); - hyper::body::aggregate(res.into_body()) + concat(res.into_body()) .await .unwrap() .reader() diff --git a/tests/server.rs b/tests/server.rs index f2b65042aa..43c5d1b678 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2276,8 +2276,8 @@ fn http2_body_user_error_sends_reset_reason() { let mut res = client.get(uri).await?; - while let Some(chunk) = res.body_mut().data().await { - chunk?; + while let Some(item) = res.body_mut().frame().await { + item?; } Ok(()) }) @@ -2631,7 +2631,9 @@ impl<'a> ReplyBuilder<'a> { where S: futures_util::Stream> + Send + Sync + 'static, { - let body = BodyExt::boxed(StreamBody::new(stream)); + use hyper::body::Frame; + use futures_util::TryStreamExt; + let body = BodyExt::boxed(StreamBody::new(stream.map_ok(Frame::data))); self.tx.lock().unwrap().send(Reply::Body(body)).unwrap(); } @@ -2703,10 +2705,12 @@ impl Service> for TestService { let replies = self.reply.clone(); Box::pin(async move { - while let Some(chunk) = req.data().await { - match chunk { - Ok(chunk) => { - tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); + while let Some(item) = req.frame().await { + match item { + Ok(frame) => { + if frame.is_data() { + tx.send(Msg::Chunk(frame.into_data().unwrap().to_vec())).unwrap(); + } } Err(err) => { tx.send(Msg::Error(err)).unwrap(); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index f19275febb..dced9a2e3b 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -7,7 +7,7 @@ use std::sync::{ }; use bytes::Bytes; -use http_body_util::Full; +use http_body_util::{BodyExt, Full}; use hyper::server; use tokio::net::{TcpListener, TcpStream}; @@ -370,7 +370,8 @@ async fn async_test(cfg: __TestConfig) { func(&req.headers()); } let sbody = sreq.body; - hyper::body::to_bytes(req).map_ok(move |body| { + req.collect().map_ok(move |collected| { + let body = collected.to_bytes(); assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); let mut res = Response::builder() @@ -458,7 +459,7 @@ async fn async_test(cfg: __TestConfig) { func(&res.headers()); } - let body = hyper::body::to_bytes(res).await.unwrap(); + let body = res.collect().await.unwrap().to_bytes(); assert_eq!(body.as_ref(), cbody.as_slice(), "server body"); }