diff --git a/src/client/mod.rs b/src/client/mod.rs index dbf7550651..cc4137d292 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -91,6 +91,7 @@ use http::uri::Scheme; use body::{Body, Payload}; use common::Exec; +use common::lazy as hyper_lazy; use self::connect::{Connect, Destination}; use self::pool::{Pool, Poolable, Reservation}; @@ -274,7 +275,7 @@ where C: Connect + Sync + 'static, let dst = Destination { uri: url, }; - future::lazy(move || { + hyper_lazy(move || { if let Some(connecting) = pool.connecting(&pool_key) { Either::A(connector.connect(dst) .map_err(::Error::new_connect) @@ -318,9 +319,43 @@ where C: Connect + Sync + 'static, }) }; - let race = checkout.select(connect) - .map(|(pooled, _work)| pooled) - .or_else(|(e, other)| { + let executor = self.executor.clone(); + // The order of the `select` is depended on below... + let race = checkout.select2(connect) + .map(move |either| match either { + // Checkout won, connect future may have been started or not. + // + // If it has, let it finish and insert back into the pool, + // so as to not waste the socket... + Either::A((checked_out, connecting)) => { + // This depends on the `select` above having the correct + // order, such that if the checkout future were ready + // immediately, the connect future will never have been + // started. + // + // If it *wasn't* ready yet, then the connect future will + // have been started... + if connecting.started() { + let bg = connecting + .map(|_pooled| { + // dropping here should just place it in + // the Pool for us... + }) + .map_err(|err| { + trace!("background connect error: {}", err); + }); + // An execute error here isn't important, we're just trying + // to prevent a waste of a socket... + let _ = executor.execute(bg); + } + checked_out + }, + // Connect won, checkout can just be dropped. + Either::B((connected, _checkout)) => { + connected + }, + }) + .or_else(|either| match either { // Either checkout or connect could get canceled: // // 1. Connect is canceled if this is HTTP/2 and there is @@ -329,10 +364,19 @@ where C: Connect + Sync + 'static, // idle connection reliably. // // In both cases, we should just wait for the other future. - if e.is_canceled() { - Either::A(other.map_err(ClientError::Normal)) - } else { - Either::B(future::err(ClientError::Normal(e))) + Either::A((err, connecting)) => { + if err.is_canceled() { + Either::A(Either::A(connecting.map_err(ClientError::Normal))) + } else { + Either::B(future::err(ClientError::Normal(err))) + } + }, + Either::B((err, checkout)) => { + if err.is_canceled() { + Either::A(Either::B(checkout.map_err(ClientError::Normal))) + } else { + Either::B(future::err(ClientError::Normal(err))) + } } }); diff --git a/src/client/pool.rs b/src/client/pool.rs index 11277fd56c..d4c4e23d70 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -151,6 +151,26 @@ impl Pool { } } + #[cfg(feature = "runtime")] + #[cfg(test)] + pub(super) fn h1_key(&self, s: &str) -> Key { + (Arc::new(s.to_string()), Ver::Http1) + } + + #[cfg(feature = "runtime")] + #[cfg(test)] + pub(super) fn idle_count(&self, key: &Key) -> usize { + self + .inner + .connections + .lock() + .unwrap() + .idle + .get(key) + .map(|list| list.len()) + .unwrap_or(0) + } + fn take(&self, key: &Key) -> Option> { let entry = { let mut inner = self.inner.connections.lock().unwrap(); diff --git a/src/client/tests.rs b/src/client/tests.rs index 7c59e3cfdf..d496209bbf 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,8 +1,9 @@ #![cfg(feature = "runtime")] extern crate pretty_env_logger; -use futures::Async; +use futures::{Async, Future, Stream}; use futures::future::poll_fn; +use futures::sync::oneshot; use tokio::runtime::current_thread::Runtime; use mock::MockConnector; @@ -73,8 +74,6 @@ fn conn_reset_after_write() { { let req = Request::builder() .uri("http://mock.local/a") - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Default::default()) .unwrap(); let res1 = client.request(req); @@ -110,3 +109,95 @@ fn conn_reset_after_write() { } } +#[test] +fn checkout_win_allows_connect_future_to_be_pooled() { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + + let (tx, rx) = oneshot::channel::<()>(); + let sock1 = connector.mock("http://mock.local"); + let sock2 = connector.mock_fut("http://mock.local", rx); + + let client = Client::builder() + .build::<_, ::Body>(connector); + + client.pool.no_timer(); + + let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse"); + + // First request just sets us up to have a connection able to be put + // back in the pool. *However*, it doesn't insert immediately. The + // body has 1 pending byte, and we will only drain in request 2, once + // the connect future has been started. + let mut body = { + let res1 = client.get(uri.clone()) + .map(|res| res.into_body().concat2()); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx")); + Ok(Async::Ready(())) + }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); + + rt.block_on(res1.join(srv1)).expect("res1").0 + }; + + + // The second request triggers the only mocked connect future, but then + // the drained body allows the first socket to go back to the pool, + // "winning" the checkout race. + { + let res2 = client.get(uri.clone()); + let drain = poll_fn(move || { + body.poll() + }); + let srv2 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx")); + Ok(Async::Ready(())) + }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); + + rt.block_on(res2.join(drain).join(srv2)).expect("res2"); + } + + // "Release" the mocked connect future, and let the runtime spin once so + // it's all setup... + { + let mut tx = Some(tx); + let client = &client; + let key = client.pool.h1_key("http://mock.local"); + let mut tick_cnt = 0; + let fut = poll_fn(move || { + tx.take(); + + if client.pool.idle_count(&key) == 0 { + tick_cnt += 1; + assert!(tick_cnt < 10, "ticked too many times waiting for idle"); + trace!("no idle yet; tick count: {}", tick_cnt); + ::futures::task::current().notify(); + Ok(Async::NotReady) + } else { + Ok::<_, ()>(Async::Ready(())) + } + }); + rt.block_on(fut).unwrap(); + } + + // Third request just tests out that the "loser" connection was pooled. If + // it isn't, this will panic since the MockConnector doesn't have any more + // mocks to give out. + { + let res3 = client.get(uri); + let srv3 = poll_fn(|| { + try_ready!(sock2.read(&mut [0u8; 512])); + try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: ::std::io::Error| panic!("srv3 poll_fn error: {}", e)); + + rt.block_on(res3.join(srv3)).expect("res3"); + } +} + + diff --git a/src/common/lazy.rs b/src/common/lazy.rs new file mode 100644 index 0000000000..10dfe17992 --- /dev/null +++ b/src/common/lazy.rs @@ -0,0 +1,63 @@ +use std::mem; + +use futures::{Future, IntoFuture, Poll}; + +pub(crate) fn lazy(func: F) -> Lazy +where + F: FnOnce() -> R, + R: IntoFuture, +{ + Lazy { + inner: Inner::Init(func), + } +} + +pub struct Lazy { + inner: Inner +} + +enum Inner { + Init(F), + Fut(R), + Empty, +} + +impl Lazy +where + F: FnOnce() -> R, + R: IntoFuture, +{ + pub fn started(&self) -> bool { + match self.inner { + Inner::Init(_) => false, + Inner::Fut(_) | + Inner::Empty => true, + } + } +} + +impl Future for Lazy +where + F: FnOnce() -> R, + R: IntoFuture, +{ + type Item = R::Item; + type Error = R::Error; + + fn poll(&mut self) -> Poll { + match self.inner { + Inner::Fut(ref mut f) => return f.poll(), + _ => (), + } + + match mem::replace(&mut self.inner, Inner::Empty) { + Inner::Init(func) => { + let mut fut = func().into_future(); + let ret = fut.poll(); + self.inner = Inner::Fut(fut); + ret + }, + _ => unreachable!("lazy state wrong"), + } + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 1bfa980fc9..3cf1e04c1d 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,8 +1,10 @@ mod buf; mod exec; pub(crate) mod io; +mod lazy; mod never; pub(crate) use self::buf::StaticBuf; pub(crate) use self::exec::Exec; +pub(crate) use self::lazy::lazy; pub use self::never::Never; diff --git a/src/mock.rs b/src/mock.rs index b3c239c52d..010b6cf9e3 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex}; use bytes::Buf; use futures::{Async, Poll}; +#[cfg(feature = "runtime")] +use futures::Future; use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -50,6 +52,7 @@ impl> PartialEq for MockCursor { impl Write for MockCursor { fn write(&mut self, data: &[u8]) -> io::Result { + trace!("MockCursor::write; len={}", data.len()); self.vec.extend(data); Ok(data.len()) } @@ -62,7 +65,13 @@ impl Write for MockCursor { impl Read for MockCursor { fn read(&mut self, buf: &mut [u8]) -> io::Result { (&self.vec[self.pos..]).read(buf).map(|n| { + trace!("MockCursor::read; len={}", n); self.pos += n; + if self.pos == self.vec.len() { + trace!("MockCursor::read to end, clearing"); + self.pos = 0; + self.vec.clear(); + } n }) } @@ -165,7 +174,11 @@ impl AsyncIo { #[cfg(feature = "runtime")] fn close(&mut self) { self.block_in(1); - assert_eq!(self.inner.vec.len(), self.inner.pos); + assert_eq!( + self.inner.vec.len(), + self.inner.pos, + "AsyncIo::close(), but cursor not consumed", + ); self.inner.vec.truncate(0); self.inner.pos = 0; } @@ -309,6 +322,31 @@ struct DuplexInner { write: AsyncIo, } +#[cfg(feature = "runtime")] +impl Duplex { + pub(crate) fn channel() -> (Duplex, DuplexHandle) { + let mut inner = DuplexInner { + handle_read_task: None, + read: AsyncIo::new_buf(Vec::new(), 0), + write: AsyncIo::new_buf(Vec::new(), ::std::usize::MAX), + }; + + inner.read.park_tasks(true); + inner.write.park_tasks(true); + + let inner = Arc::new(Mutex::new(inner)); + + let duplex = Duplex { + inner: inner.clone(), + }; + let handle = DuplexHandle { + inner: inner, + }; + + (duplex, handle) + } +} + #[cfg(feature = "runtime")] impl Read for Duplex { fn read(&mut self, buf: &mut [u8]) -> io::Result { @@ -372,8 +410,8 @@ impl DuplexHandle { pub fn write(&self, bytes: &[u8]) -> Poll { let mut inner = self.inner.lock().unwrap(); - assert!(inner.read.inner.vec.is_empty()); assert_eq!(inner.read.inner.pos, 0); + assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty"); inner .read .inner @@ -388,15 +426,20 @@ impl DuplexHandle { impl Drop for DuplexHandle { fn drop(&mut self) { trace!("mock duplex handle drop"); - let mut inner = self.inner.lock().unwrap(); - inner.read.close(); - inner.write.close(); + if !::std::thread::panicking() { + let mut inner = self.inner.lock().unwrap(); + inner.read.close(); + inner.write.close(); + } } } +#[cfg(feature = "runtime")] +type BoxedConnectFut = Box + Send>; + #[cfg(feature = "runtime")] pub struct MockConnector { - mocks: Mutex>>, + mocks: Mutex>>, } #[cfg(feature = "runtime")] @@ -408,28 +451,25 @@ impl MockConnector { } pub fn mock(&mut self, key: &str) -> DuplexHandle { - let key = key.to_owned(); - let mut inner = DuplexInner { - handle_read_task: None, - read: AsyncIo::new_buf(Vec::new(), 0), - write: AsyncIo::new_buf(Vec::new(), ::std::usize::MAX), - }; - - inner.read.park_tasks(true); - inner.write.park_tasks(true); + use futures::future; + self.mock_fut(key, future::ok::<_, ()>(())) + } - let inner = Arc::new(Mutex::new(inner)); + pub fn mock_fut(&mut self, key: &str, fut: F) -> DuplexHandle + where + F: Future + Send + 'static, + { + let key = key.to_owned(); - let duplex = Duplex { - inner: inner.clone(), - }; - let handle = DuplexHandle { - inner: inner, - }; + let (duplex, handle) = Duplex::channel(); + let fut = Box::new(fut.then(move |_| { + trace!("MockConnector mocked fut ready"); + Ok((duplex, Connected::new())) + })); self.mocks.lock().unwrap().entry(key) .or_insert(Vec::new()) - .push(duplex); + .push(fut); handle } @@ -439,10 +479,9 @@ impl MockConnector { impl Connect for MockConnector { type Transport = Duplex; type Error = io::Error; - type Future = ::futures::future::FutureResult<(Self::Transport, Connected), Self::Error>; + type Future = BoxedConnectFut; fn connect(&self, dst: Destination) -> Self::Future { - use futures::future; trace!("mock connect: {:?}", dst); let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() { format!(":{}", port) @@ -453,6 +492,24 @@ impl Connect for MockConnector { let mocks = mocks.get_mut(&key) .expect(&format!("unknown mocks uri: {}", key)); assert!(!mocks.is_empty(), "no additional mocks for {}", key); - future::ok((mocks.remove(0), Connected::new())) + mocks.remove(0) + } +} + + +#[cfg(feature = "runtime")] +impl Drop for MockConnector { + fn drop(&mut self) { + if !::std::thread::panicking() { + let mocks = self.mocks.lock().unwrap(); + for (key, mocks) in mocks.iter() { + assert_eq!( + mocks.len(), + 0, + "not all mocked connects for {:?} were used", + key, + ); + } + } } }