diff --git a/examples/multi_server.rs b/examples/multi_server.rs new file mode 100644 index 0000000000..a997076476 --- /dev/null +++ b/examples/multi_server.rs @@ -0,0 +1,82 @@ +#![deny(warnings)] +extern crate hyper; +extern crate futures; +extern crate tokio_core; +extern crate pretty_env_logger; + +use futures::future::FutureResult; + +use hyper::{Get, StatusCode}; +use tokio_core::reactor::Core; +use hyper::header::ContentLength; +use hyper::server::{Http, Service, Request, Response}; + +static INDEX1: &'static [u8] = b"The 1st service!"; +static INDEX2: &'static [u8] = b"The 2nd service!"; + +struct Service1; +struct Service2; + +impl Service for Service1 { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = FutureResult; + + fn call(&self, req: Request) -> Self::Future { + futures::future::ok(match (req.method(), req.path()) { + (&Get, "/") => { + Response::new() + .with_header(ContentLength(INDEX1.len() as u64)) + .with_body(INDEX1) + }, + _ => { + Response::new() + .with_status(StatusCode::NotFound) + } + }) + } + +} + +impl Service for Service2 { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = FutureResult; + + fn call(&self, req: Request) -> Self::Future { + futures::future::ok(match (req.method(), req.path()) { + (&Get, "/") => { + Response::new() + .with_header(ContentLength(INDEX2.len() as u64)) + .with_body(INDEX2) + }, + _ => { + Response::new() + .with_status(StatusCode::NotFound) + } + }) + } + +} + + +fn main() { + pretty_env_logger::init().unwrap(); + let addr1 = "127.0.0.1:1337".parse().unwrap(); + let addr2 = "127.0.0.1:1338".parse().unwrap(); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap(); + let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap(); + + println!("Listening on http://{}", srv1.local_addr().unwrap()); + println!("Listening on http://{}", srv2.local_addr().unwrap()); + + handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>())); + handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>())); + core.run(futures::future::empty::<(), ()>()).unwrap(); +} diff --git a/src/server/mod.rs b/src/server/mod.rs index b4f06f655a..a5d5eacc88 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -16,10 +16,9 @@ use std::net::SocketAddr; use std::rc::{Rc, Weak}; use std::time::Duration; -use futures::future; use futures::task::{self, Task}; +use futures::future::{self, Select, Map}; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; -use futures::future::Map; #[cfg(feature = "compat")] use http; @@ -41,6 +40,26 @@ use proto::Body; pub use proto::response::Response; pub use proto::request::Request; +// The `Server` can be created use its own `Core`, or an shared `Handle`. +enum Reactor { + // Own its `Core` + Core(Core), + // Share `Handle` with others + Handle(Handle), +} + +impl Reactor { + /// Returns a handle to the underlying event loop that this server will be + /// running on. + #[inline] + pub fn handle(&self) -> Handle { + match *self { + Reactor::Core(ref core) => core.handle(), + Reactor::Handle(ref handle) => handle.clone(), + } + } +} + /// An instance of the HTTP protocol, and implementation of tokio-proto's /// `ServerProto` trait. /// @@ -63,12 +82,23 @@ where B: Stream, { protocol: Http, new_service: S, - core: Core, + reactor: Reactor, listener: TcpListener, shutdown_timeout: Duration, no_proto: bool, } +/// The Future of an Server. +pub struct ServerFuture +where B: Stream, + B::Item: AsRef<[u8]>, +{ + server: Server, + info: Rc>, + shutdown_signal: F, + shutdown: Option>, +} + impl + 'static> Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. @@ -118,7 +148,30 @@ impl + 'static> Http { Ok(Server { new_service: new_service, - core: core, + reactor: Reactor::Core(core), + listener: listener, + protocol: self.clone(), + shutdown_timeout: Duration::new(1, 0), + }) + } + + /// This method allows the ability to share a `Core` with multiple servers. + /// + /// Bind the provided `addr` and return a server with a shared `Core`. + /// + /// This is method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided as well, creating a new service per + /// connection. + pub fn bind_handle(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result> + where S: NewService, Error = ::Error> + 'static, + Bd: Stream, + { + let listener = TcpListener::bind(addr, &handle)?; + + Ok(Server { + new_service: new_service, + reactor: Reactor::Handle(handle.clone()), listener: listener, protocol: self.clone(), shutdown_timeout: Duration::new(1, 0), @@ -544,7 +597,7 @@ impl Server /// Returns a handle to the underlying event loop that this server will be /// running on. pub fn handle(&self) -> Handle { - self.core.handle() + self.reactor.handle() } /// Configure the amount of time this server will wait for a "graceful @@ -566,6 +619,21 @@ impl Server self } + /// Configure the `shutdown_signal`. + pub fn shutdown_signal(self, signal: F) -> ServerFuture + where F: Future + { + ServerFuture { + server: self, + info: Rc::new(RefCell::new(Info { + active: 0, + blocker: None, + })), + shutdown_signal: signal, + shutdown: None, + } + } + /// Execute this server infinitely. /// /// This method does not currently return, but it will return an error if @@ -590,7 +658,13 @@ impl Server pub fn run_until(self, shutdown_signal: F) -> ::Result<()> where F: Future, { - let Server { protocol, new_service, mut core, listener, shutdown_timeout, no_proto } = self; + let Server { protocol, new_service, reactor, listener, shutdown_timeout, no_proto } = self; + + let mut core = match reactor { + Reactor::Core(core) => core, + _ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"), + }; + let handle = core.handle(); // Mini future to track the number of active services @@ -649,12 +723,96 @@ impl Server } } +impl Future for Server + where S: NewService, Error = ::Error> + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + if let Reactor::Core(_) = self.reactor { + panic!("Server owns its core, use `Server::run()` to run the service!") + } + + loop { + match self.listener.accept() { + Ok((socket, addr)) => { + // TODO: use the NotifyService + match self.new_service.new_service() { + Ok(srv) => self.protocol.bind_connection(&self.handle(), + socket, + addr, + srv), + Err(e) => debug!("internal error: {:?}", e), + } + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => debug!("internal error: {:?}", e), + } + } + } +} + +impl Future for ServerFuture + where F: Future, + S: NewService, Error = ::Error> + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + if let Some(ref mut shutdown) = self.shutdown { + match shutdown.poll() { + Ok(Async::Ready(_)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err((e, _)) => debug!("internal error: {:?}", e), + } + } else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() { + match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) { + Ok(timeout) => { + let wait = WaitUntilZero { info: self.info.clone() }; + self.shutdown = Some(wait.select(timeout)) + }, + Err(e) => debug!("internal error: {:?}", e), + } + } else { + match self.server.listener.accept() { + Ok((socket, addr)) => { + match self.server.new_service.new_service() { + Ok(inner_srv) => { + let srv = NotifyService { + inner: inner_srv, + info: Rc::downgrade(&self.info), + }; + self.info.borrow_mut().active += 1; + self.server.protocol.bind_connection(&self.server.handle(), + socket, + addr, + srv) + }, + Err(e) => debug!("internal error: {:?}", e), + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => debug!("internal error: {:?}", e), + } + } + } + } +} + + impl> fmt::Debug for Server where B::Item: AsRef<[u8]> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") - .field("core", &"...") + .field("reactor", &"...") .field("listener", &self.listener) .field("new_service", &self.new_service) .field("protocol", &self.protocol) @@ -662,6 +820,20 @@ where B::Item: AsRef<[u8]> } } +impl > fmt::Debug for ServerFuture +where B::Item: AsRef<[u8]>, +F: Future +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ServerFuture") + .field("server", &self.server) + .field("info", &"...") + .field("shutdown_signal", &"...") + .field("shutdown", &"...") + .finish() + } +} + struct NotifyService { inner: S, info: Weak>,