Skip to content

Commit

Permalink
feat(body): update Body trait to use Frames
Browse files Browse the repository at this point in the history
The `Body` trait was adjusted to be forwards compatible with adding new
frame types. That resulted in changing from `poll_data` and `poll_trailers`
to a single `poll_frame` function. More can be learned from the proposal
in #2840.

BREAKING CHANGE: The polling functions of the `Body` trait have been
  redesigned.

  The free functions `hyper::body::to_bytes` and `aggregate` have been
  removed. Similar functionality is on
  `http_body_util::BodyExt::collect`.
  • Loading branch information
seanmonstar committed Oct 24, 2022
1 parent 0766d3f commit 13414ab
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 297 deletions.
12 changes: 7 additions & 5 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::env;

use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::Body as _, Request};
use http_body_util::{BodyExt, Empty};
use hyper::Request;
use tokio::io::{self, AsyncWriteExt as _};
use tokio::net::TcpStream;

Expand Down Expand Up @@ -62,9 +62,11 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {

// Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end).
while let Some(next) = res.data().await {
let chunk = next?;
io::stdout().write_all(&chunk).await?;
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
io::stdout().write_all(&chunk).await?;
}
}

println!("\n\nDone!");
Expand Down
4 changes: 2 additions & 2 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![warn(rust_2018_idioms)]

use bytes::Bytes;
use http_body_util::Empty;
use http_body_util::{BodyExt, Empty};
use hyper::{body::Buf, Request};
use serde::Deserialize;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let res = sender.send_request(req).await?;

// asynchronously aggregate the chunks of the body
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();

// try to parse as json with serde_json
let users = serde_json::from_reader(body.reader())?;
Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn echo(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error
return Ok(resp);
}

let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let whole_body = req.collect().await?.to_bytes();

let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(full(reversed_body)))
Expand Down
2 changes: 1 addition & 1 deletion examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn param_example(
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
(&Method::POST, "/post") => {
// Concatenate the body...
let b = hyper::body::to_bytes(req).await?;
let b = req.collect().await?.to_bytes();
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
16 changes: 4 additions & 12 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::net::SocketAddr;
use std::rc::Rc;
use tokio::net::TcpListener;

use hyper::body::{Body as HttpBody, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::body::{Body as HttpBody, Bytes, Frame};
use hyper::service::service_fn;
use hyper::{Error, Response};
use std::marker::PhantomData;
Expand All @@ -33,18 +32,11 @@ impl HttpBody for Body {
type Data = Bytes;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(self.get_mut().data.take().map(Ok))
}

fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d))))
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {

async fn api_post_response(req: Request<Recv>) -> Result<Response<BoxBody>> {
// Aggregate the body...
let whole_body = hyper::body::aggregate(req).await?;
let whole_body = req.collect().await?.aggregate();
// Decode as JSON...
let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?;
// Change the JSON...
Expand Down
31 changes: 0 additions & 31 deletions src/body/aggregate.rs

This file was deleted.

129 changes: 62 additions & 67 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::fmt;
use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
use futures_core::{Stream, FusedStream}; // for mpsc::Receiver
use http::HeaderMap;
use http_body::{Body, SizeHint};
use http_body::{Body, Frame, SizeHint};

use super::DecodedLength;
use crate::common::Future;
Expand Down Expand Up @@ -39,8 +39,9 @@ enum Kind {
},
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
H2 {
ping: ping::Recorder,
content_length: DecodedLength,
data_done: bool,
ping: ping::Recorder,
recv: h2::RecvStream,
},
#[cfg(feature = "ffi")]
Expand Down Expand Up @@ -131,6 +132,7 @@ impl Recv {
content_length = DecodedLength::ZERO;
}
let body = Recv::new(Kind::H2 {
data_done: false,
ping,
content_length,
recv,
Expand All @@ -153,86 +155,78 @@ impl Recv {
_ => unreachable!(),
}
}
}

fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
impl Body for Recv {
type Data = Bytes;
type Error = crate::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.kind {
Kind::Empty => Poll::Ready(None),
Kind::Chan {
content_length: ref mut len,
ref mut data_rx,
ref mut want_tx,
..
ref mut trailers_rx,
} => {
want_tx.send(WANT_READY);

match ready!(Pin::new(data_rx).poll_next(cx)?) {
Some(chunk) => {
len.sub_if(chunk.len() as u64);
Poll::Ready(Some(Ok(chunk)))
if !data_rx.is_terminated() {
match ready!(Pin::new(data_rx).poll_next(cx)?) {
Some(chunk) => {
len.sub_if(chunk.len() as u64);
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
// fall through to trailers
None => (),
}
None => Poll::Ready(None),
}
}

// check trailers after data is terminated
match ready!(Pin::new(trailers_rx).poll(cx)) {
Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
Err(_) => Poll::Ready(None),
}
},
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 {
ref mut data_done,
ref ping,
recv: ref mut h2,
content_length: ref mut len,
} => match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
ping.record_data(bytes.len());
Poll::Ready(Some(Ok(bytes)))
} => {
if !*data_done {
match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
ping.record_data(bytes.len());
return Poll::Ready(Some(Ok(Frame::data(bytes))))
}
Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))),
None => {
*data_done = true;
// fall through to trailers
},
}
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
None => Poll::Ready(None),
},

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),
}
}
}

impl Body for Recv {
type Data = Bytes;
type Error = crate::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.poll_inner(cx)
}

fn poll_trailers(
#[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
#[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
Kind::Empty => Poll::Ready(Ok(None)),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 {
recv: ref mut h2,
ref ping,
..
} => match ready!(h2.poll_trailers(cx)) {
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t))
// after data, check trailers
match ready!(h2.poll_trailers(cx)) {
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
},
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
},
Kind::Chan {
ref mut trailers_rx,
..
} => match ready!(Pin::new(trailers_rx).poll(cx)) {
Ok(t) => Poll::Ready(Ok(Some(t))),
Err(_) => Poll::Ready(Ok(None)),
},

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),
Kind::Ffi(ref mut body) => body.poll_data(cx),
}
}

Expand Down Expand Up @@ -386,6 +380,7 @@ mod tests {
use std::mem;
use std::task::Poll;

use http_body_util::BodyExt;
use super::{Body, DecodedLength, Recv, Sender, SizeHint};

#[test]
Expand All @@ -394,7 +389,7 @@ mod tests {
// the size by too much.

let body_size = mem::size_of::<Recv>();
let body_expected_size = mem::size_of::<u64>() * 6;
let body_expected_size = mem::size_of::<u64>() * 5;
assert!(
body_size <= body_expected_size,
"Body size = {} <= {}",
Expand Down Expand Up @@ -443,7 +438,7 @@ mod tests {

tx.abort();

let err = rx.data().await.unwrap().unwrap_err();
let err = rx.frame().await.unwrap().unwrap_err();
assert!(err.is_body_write_aborted(), "{:?}", err);
}

Expand All @@ -456,10 +451,10 @@ mod tests {
// buffer is full, but can still send abort
tx.abort();

let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
let chunk1 = rx.frame().await.expect("item 1").expect("chunk 1").into_data().unwrap();
assert_eq!(chunk1, "chunk 1");

let err = rx.data().await.unwrap().unwrap_err();
let err = rx.frame().await.unwrap().unwrap_err();
assert!(err.is_body_write_aborted(), "{:?}", err);
}

Expand All @@ -479,7 +474,7 @@ mod tests {
async fn channel_empty() {
let (_, mut rx) = Recv::channel();

assert!(rx.data().await.is_none());
assert!(rx.frame().await.is_none());
}

#[test]
Expand All @@ -496,7 +491,7 @@ mod tests {
let (mut tx, mut rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);

let mut tx_ready = tokio_test::task::spawn(tx.ready());
let mut rx_data = tokio_test::task::spawn(rx.data());
let mut rx_data = tokio_test::task::spawn(rx.frame());

assert!(
tx_ready.poll().is_pending(),
Expand Down
5 changes: 1 addition & 4 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
pub use bytes::{Buf, Bytes};
pub use http_body::Body;
pub use http_body::SizeHint;
pub use http_body::Frame;

pub use self::aggregate::aggregate;
pub use self::body::Recv;
pub(crate) use self::body::Sender;
pub(crate) use self::length::DecodedLength;
pub use self::to_bytes::to_bytes;

mod aggregate;
mod body;
mod length;
mod to_bytes;

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
Expand Down
Loading

0 comments on commit 13414ab

Please sign in to comment.