From 6530a00a8e3449a8fd7e4ed6ad1231b6b1579c38 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sat, 21 Jul 2018 16:17:08 -0700 Subject: [PATCH] fix(http1): reduce closed connections when body is dropped If a user makes use of `Body::is_end_stream` to optimize so as to not need to do make a final poll just to receive `None`, previously the connection would not have progressed its reading state to a finished body, and so the connection would be closed. Now, upon reading any chunk, the connection state will check if it can know that the body would be finished, and progresses to a body finished state sooner. The integration tests were amplified by adding a naive hyper proxy as a secondary test, which happens to make use of that optimization, and thus caught the issue. --- src/client/tests.rs | 9 +++- src/proto/h1/conn.rs | 26 +++++----- tests/integration.rs | 44 ++++++++++++++++ tests/support/mod.rs | 121 ++++++++++++++++++++++++++++++++++--------- 4 files changed, 163 insertions(+), 37 deletions(-) diff --git a/src/client/tests.rs b/src/client/tests.rs index d496209bbf..7df6a7efe5 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -137,7 +137,14 @@ fn checkout_win_allows_connect_future_to_be_pooled() { .map(|res| res.into_body().concat2()); let srv1 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); - try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx")); + // Chunked is used so as to force 2 body reads. + try_ready!(sock1.write(b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 1\r\nx\r\n\ + 0\r\n\r\n\ + ")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 2b1603e26e..358cdfb660 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -180,29 +180,31 @@ where I: AsyncRead + AsyncWrite, pub fn read_body(&mut self) -> Poll, io::Error> { debug_assert!(self.can_read_body()); - trace!("Conn::read_body"); - let (reading, ret) = match self.state.reading { Reading::Body(ref mut decoder) => { match decoder.decode(&mut self.io) { Ok(Async::Ready(slice)) => { - let (reading, chunk) = if !slice.is_empty() { - return Ok(Async::Ready(Some(Chunk::from(slice)))); - } else if decoder.is_eof() { + let (reading, chunk) = if decoder.is_eof() { debug!("incoming body completed"); - (Reading::KeepAlive, None) - } else { - trace!("decode stream unexpectedly ended"); - // this should actually be unreachable: - // the decoder will return an UnexpectedEof if there were - // no bytes to read and it isn't eof yet... + (Reading::KeepAlive, if !slice.is_empty() { + Some(Chunk::from(slice)) + } else { + None + }) + } else if slice.is_empty() { + error!("decode stream unexpectedly ended"); + // This should be unreachable, since all 3 decoders + // either set eof=true or return an Err when reading + // an empty slice... (Reading::Closed, None) + } else { + return Ok(Async::Ready(Some(Chunk::from(slice)))); }; (reading, Ok(Async::Ready(chunk))) }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { - trace!("decode stream error: {}", e); + debug!("decode stream error: {}", e); (Reading::Closed, Err(e)) }, } diff --git a/tests/integration.rs b/tests/integration.rs index 7a9a11277c..e593284bba 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -62,6 +62,50 @@ t! { ; } +t! { + get_body_2_keeps_alive, + client: + request: + uri: "/", + ; + response: + status: 200, + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + request: + uri: "/", + ; + response: + status: 200, + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + server: + request: + uri: "/", + ; + response: + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + request: + uri: "/", + ; + response: + headers: { + "content-length" => 11, + }, + body: "hello world", + ; +} + t! { get_strip_connection_header, client: diff --git a/tests/support/mod.rs b/tests/support/mod.rs index be60a959b8..4052465564 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -2,6 +2,13 @@ pub extern crate futures; pub extern crate hyper; pub extern crate tokio; +use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; +use std::time::Duration; + +use hyper::{Body, Client, Request, Response, Server, Version}; +use hyper::client::HttpConnector; +use hyper::service::service_fn; + pub use std::net::SocketAddr; pub use self::futures::{future, Future, Stream}; pub use self::futures::sync::oneshot; @@ -44,6 +51,16 @@ macro_rules! t { )); } + __run_test(__TestConfig { + client_version: 2, + client_msgs: c.clone(), + server_version: 2, + server_msgs: s.clone(), + parallel: true, + connections: 1, + proxy: false, + }); + __run_test(__TestConfig { client_version: 2, client_msgs: c, @@ -51,8 +68,8 @@ macro_rules! t { server_msgs: s, parallel: true, connections: 1, + proxy: true, }); - } ); ( @@ -104,6 +121,27 @@ macro_rules! t { server_msgs: s.clone(), parallel: false, connections: 1, + proxy: false, + }); + + __run_test(__TestConfig { + client_version: 2, + client_msgs: c.clone(), + server_version: 2, + server_msgs: s.clone(), + parallel: false, + connections: 1, + proxy: false, + }); + + __run_test(__TestConfig { + client_version: 1, + client_msgs: c.clone(), + server_version: 1, + server_msgs: s.clone(), + parallel: false, + connections: 1, + proxy: true, }); __run_test(__TestConfig { @@ -113,6 +151,7 @@ macro_rules! t { server_msgs: s, parallel: false, connections: 1, + proxy: true, }); } ); @@ -185,14 +224,11 @@ pub struct __TestConfig { pub parallel: bool, pub connections: usize, + pub proxy: bool, } pub fn __run_test(cfg: __TestConfig) { extern crate pretty_env_logger; - use hyper::{Body, Client, Request, Response, Version}; - use hyper::client::HttpConnector; - use std::sync::{Arc, Mutex}; - use std::time::Duration; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().expect("new rt"); @@ -254,31 +290,39 @@ pub fn __run_test(cfg: __TestConfig) { ) .expect("serve_addr"); - let addr = serve.incoming_ref().local_addr(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (success_tx, success_rx) = oneshot::channel(); + let mut addr = serve.incoming_ref().local_addr(); let expected_connections = cfg.connections; let server = serve .fold(0, move |cnt, connecting| { + let cnt = cnt + 1; + assert!( + cnt <= expected_connections, + "server expected {} connections, received {}", + expected_connections, + cnt + ); let fut = connecting .map_err(|never| -> hyper::Error { match never {} }) .flatten() .map_err(|e| panic!("server connection error: {}", e)); ::tokio::spawn(fut); - Ok::<_, hyper::Error>(cnt + 1) - }) - .map(move |cnt| { - assert_eq!(cnt, expected_connections); - }) - .map_err(|e| panic!("serve error: {}", e)) - .select2(shutdown_rx) - .map(move |_| { - let _ = success_tx.send(()); + Ok::<_, hyper::Error>(cnt) }) - .map_err(|_| panic!("shutdown not ok")); + .map(|_| ()) + .map_err(|e| panic!("serve error: {}", e)); rt.spawn(server); + if cfg.proxy { + let (proxy_addr, proxy) = naive_proxy(ProxyConfig { + connections: cfg.connections, + dst: addr, + version: cfg.server_version, + }); + rt.spawn(proxy); + addr = proxy_addr; + } + let make_request = Arc::new(move |client: &Client, creq: __CReq, cres: __CRes| { let uri = format!("http://{}{}", addr, creq.uri); @@ -335,12 +379,41 @@ pub fn __run_test(cfg: __TestConfig) { Box::new(client_futures.map(|_| ())) }; - let client_futures = client_futures.map(move |_| { - let _ = shutdown_tx.send(()); - }); - rt.spawn(client_futures); - rt.block_on(success_rx - .map_err(|_| "something panicked")) + let client_futures = client_futures.map(|_| ()); + rt.block_on(client_futures) .expect("shutdown succeeded"); } +struct ProxyConfig { + connections: usize, + dst: SocketAddr, + version: usize, +} + +fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { + let client = Client::builder() + .keep_alive_timeout(Duration::from_secs(10)) + .http2_only(cfg.version == 2) + .build_http::(); + + let dst_addr = cfg.dst; + let max_connections = cfg.connections; + let counter = AtomicUsize::new(0); + + let srv = Server::bind(&([127, 0, 0, 1], 0).into()) + .serve(move || { + let prev = counter.fetch_add(1, Ordering::Relaxed); + assert!(max_connections >= prev + 1, "proxy max connections"); + let client = client.clone(); + service_fn(move |mut req| { + let uri = format!("http://{}{}", dst_addr, req.uri().path()) + .parse() + .expect("proxy new uri parse"); + *req.uri_mut() = uri; + client.request(req) + }) + + }); + let proxy_addr = srv.local_addr(); + (proxy_addr, srv.map_err(|err| panic!("proxy error: {}", err))) +}