diff --git a/src/common/mod.rs b/src/common/mod.rs index 400836ad3f..7cbca68594 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -4,8 +4,10 @@ pub(crate) mod exec; pub(crate) mod io; mod lazy; mod never; +pub(crate) mod task; pub(crate) use self::buf::StaticBuf; pub(crate) use self::exec::Exec; pub(crate) use self::lazy::{lazy, Started as Lazy}; pub use self::never::Never; +pub(crate) use self::task::YieldNow; diff --git a/src/common/task.rs b/src/common/task.rs new file mode 100644 index 0000000000..d47d23214d --- /dev/null +++ b/src/common/task.rs @@ -0,0 +1,40 @@ +use futures::{Async, Poll, task::Task}; + +use super::Never; + +/// A type to help "yield" a future, such that it is re-scheduled immediately. +/// +/// Useful for spin counts, so a future doesn't hog too much time. +#[derive(Debug)] +pub(crate) struct YieldNow { + cached_task: Option, +} + +impl YieldNow { + pub(crate) fn new() -> YieldNow { + YieldNow { + cached_task: None, + } + } + + /// Returns `Ok(Async::NotReady)` always, while also notifying the + /// current task so that it is rescheduled immediately. + /// + /// Since it never returns `Async::Ready` or `Err`, those types are + /// set to `Never`. + pub(crate) fn poll_yield(&mut self) -> Poll { + // Check for a cached `Task` first... + if let Some(ref t) = self.cached_task { + if t.will_notify_current() { + t.notify(); + return Ok(Async::NotReady); + } + } + + // No cached task, or not current, so get a new one... + let t = ::futures::task::current(); + t.notify(); + self.cached_task = Some(t); + Ok(Async::NotReady) + } +} diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index da2d600c15..69a06de104 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -7,7 +7,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; use body::internal::FullDataArg; -use common::Never; +use common::{Never, YieldNow}; use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; use super::Http1Transaction; use service::Service; @@ -18,6 +18,10 @@ pub(crate) struct Dispatcher { body_tx: Option<::body::Sender>, body_rx: Option, is_closing: bool, + /// If the poll loop reaches its max spin count, it will yield by notifying + /// the task immediately. This will cache that `Task`, since it usually is + /// the same one. + yield_now: YieldNow, } pub(crate) trait Dispatch { @@ -58,6 +62,7 @@ where body_tx: None, body_rx: None, is_closing: false, + yield_now: YieldNow::new(), } } @@ -98,7 +103,29 @@ where fn poll_inner(&mut self, should_shutdown: bool) -> Poll { T::update_date(); - loop { + try_ready!(self.poll_loop()); + + if self.is_done() { + if let Some(pending) = self.conn.pending_upgrade() { + self.conn.take_error()?; + return Ok(Async::Ready(Dispatched::Upgrade(pending))); + } else if should_shutdown { + try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown)); + } + self.conn.take_error()?; + Ok(Async::Ready(Dispatched::Shutdown)) + } else { + Ok(Async::NotReady) + } + } + + fn poll_loop(&mut self) -> Poll<(), ::Error> { + // Limit the looping on this connection, in case it is ready far too + // often, so that other futures don't starve. + // + // 16 was chosen arbitrarily, as that is number of pipelined requests + // benchmarks often use. Perhaps it should be a config option instead. + for _ in 0..16 { self.poll_read()?; self.poll_write()?; self.poll_flush()?; @@ -112,21 +139,19 @@ where // Using this instead of task::current() and notify() inside // the Conn is noticeably faster in pipelined benchmarks. if !self.conn.wants_read_again() { - break; + //break; + return Ok(Async::Ready(())); } } - if self.is_done() { - if let Some(pending) = self.conn.pending_upgrade() { - self.conn.take_error()?; - return Ok(Async::Ready(Dispatched::Upgrade(pending))); - } else if should_shutdown { - try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown)); - } - self.conn.take_error()?; - Ok(Async::Ready(Dispatched::Shutdown)) - } else { - Ok(Async::NotReady) + trace!("poll_loop yielding (self = {:p})", self); + + match self.yield_now.poll_yield() { + Ok(Async::NotReady) => Ok(Async::NotReady), + // maybe with `!` this can be cleaner... + // but for now, just doing this to eliminate branches + Ok(Async::Ready(never)) | + Err(never) => match never {} } }