diff --git a/examples/hello.rs b/examples/hello.rs index b9ba116a14..8ed34e2bdd 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -13,6 +13,7 @@ fn hello(_: Request, res: Response) { } fn main() { - hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap(); + let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000) + .listen(hello).unwrap(); println!("Listening on http://127.0.0.1:3000"); } diff --git a/examples/server.rs b/examples/server.rs index ca270dfc76..17c580bc84 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -51,7 +51,6 @@ fn echo(mut req: Request, mut res: Response) { fn main() { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); - let mut listening = server.listen(echo).unwrap(); + let _guard = server.listen(echo).unwrap(); println!("Listening on http://127.0.0.1:1337"); - listening.await(); } diff --git a/src/lib.rs b/src/lib.rs index 592c8d6ef0..4af33e7d07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![feature(core, collections, hash, io, os, path, std_misc, - slicing_syntax, box_syntax)] + slicing_syntax, box_syntax, unsafe_destructor)] #![deny(missing_docs)] #![cfg_attr(test, deny(warnings))] #![cfg_attr(test, feature(alloc, test))] @@ -130,12 +130,16 @@ extern crate "rustc-serialize" as serialize; extern crate time; extern crate url; extern crate openssl; -#[macro_use] extern crate log; -#[cfg(test)] extern crate test; extern crate "unsafe-any" as uany; extern crate cookie; extern crate unicase; +#[macro_use] +extern crate log; + +#[cfg(test)] +extern crate test; + pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port}; pub use mimewrapper::mime; pub use url::Url; diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs new file mode 100644 index 0000000000..3c0e0b5e15 --- /dev/null +++ b/src/server/acceptor.rs @@ -0,0 +1,95 @@ +use std::thread::{Thread, JoinGuard}; +use std::sync::Arc; +use std::sync::mpsc; +use net::NetworkAcceptor; + +pub struct AcceptorPool { + acceptor: A +} + +impl AcceptorPool { + /// Create a thread pool to manage the acceptor. + pub fn new(acceptor: A) -> AcceptorPool { + AcceptorPool { acceptor: acceptor } + } + + /// Runs the acceptor pool. Blocks until the acceptors are closed. + /// + /// ## Panics + /// + /// Panics if threads == 0. + pub fn accept(self, + work: F, + threads: usize) -> JoinGuard<'static, ()> { + assert!(threads != 0, "Can't accept on 0 threads."); + + // Replace with &F when Send changes land. + let work = Arc::new(work); + + let (super_tx, supervisor_rx) = mpsc::channel(); + + let spawn = + move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); + + // Go + for _ in 0..threads { spawn() } + + // Spawn the supervisor + Thread::scoped(move || for () in supervisor_rx.iter() { spawn() }) + } +} + +fn spawn_with(supervisor: mpsc::Sender<()>, work: Arc, mut acceptor: A) +where A: NetworkAcceptor, + F: Fn(::Stream) + Send + Sync { + use std::old_io::EndOfFile; + + Thread::spawn(move || { + let sentinel = Sentinel::new(supervisor, ()); + + loop { + match acceptor.accept() { + Ok(stream) => work(stream), + Err(ref e) if e.kind == EndOfFile => { + debug!("Server closed."); + sentinel.cancel(); + return; + }, + + Err(e) => { + error!("Connection failed: {}", e); + } + } + } + }); +} + +struct Sentinel { + value: Option, + supervisor: mpsc::Sender, + active: bool +} + +impl Sentinel { + fn new(channel: mpsc::Sender, data: T) -> Sentinel { + Sentinel { + value: Some(data), + supervisor: channel, + active: true + } + } + + fn cancel(mut self) { self.active = false; } +} + +#[unsafe_destructor] +impl Drop for Sentinel { + fn drop(&mut self) { + // If we were cancelled, get out of here. + if !self.active { return; } + + // Respawn ourselves + let _ = self.supervisor.send(self.value.take().unwrap()); + } +} + diff --git a/src/server/mod.rs b/src/server/mod.rs index 43061e7676..01fa9184bd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,10 +1,8 @@ //! HTTP Server -use std::old_io::{Listener, EndOfFile, BufferedReader, BufferedWriter}; +use std::old_io::{Listener, BufferedReader, BufferedWriter}; use std::old_io::net::ip::{IpAddr, Port, SocketAddr}; use std::os; -use std::sync::{Arc, TaskPool}; -use std::thread::{Builder, JoinGuard}; - +use std::thread::JoinGuard; pub use self::request::Request; pub use self::response::Response; @@ -19,9 +17,13 @@ use net::{NetworkListener, NetworkStream, NetworkAcceptor, HttpAcceptor, HttpListener}; use version::HttpVersion::{Http10, Http11}; +use self::acceptor::AcceptorPool; + pub mod request; pub mod response; +mod acceptor; + /// A server can listen on a TCP socket. /// /// Once listening, it will create a `Request`/`Response` pair for each @@ -71,71 +73,14 @@ S: NetworkStream + Clone + Send> Server { let acceptor = try!(self.listener.listen((self.ip, self.port))); let socket = try!(acceptor.socket_name()); - let mut captured = acceptor.clone(); - let guard = Builder::new().name("hyper acceptor".to_string()).scoped(move || { - let handler = Arc::new(handler); - debug!("threads = {:?}", threads); - let pool = TaskPool::new(threads); - for conn in captured.incoming() { - match conn { - Ok(mut stream) => { - debug!("Incoming stream"); - let handler = handler.clone(); - pool.execute(move || { - let addr = match stream.peer_name() { - Ok(addr) => addr, - Err(e) => { - error!("Peer Name error: {:?}", e); - return; - } - }; - let mut rdr = BufferedReader::new(stream.clone()); - let mut wrt = BufferedWriter::new(stream); - - let mut keep_alive = true; - while keep_alive { - let mut res = Response::new(&mut wrt); - let req = match Request::new(&mut rdr, addr) { - Ok(req) => req, - Err(e@HttpIoError(_)) => { - debug!("ioerror in keepalive loop = {:?}", e); - return; - } - Err(e) => { - //TODO: send a 400 response - error!("request error = {:?}", e); - return; - } - }; - - keep_alive = match (req.version, req.headers.get::()) { - (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, - (Http11, Some(conn)) if conn.contains(&Close) => false, - _ => true - }; - res.version = req.version; - handler.handle(req, res); - debug!("keep_alive = {:?}", keep_alive); - } - - }); - }, - Err(ref e) if e.kind == EndOfFile => { - debug!("server closed"); - break; - }, - Err(e) => { - error!("Connection failed: {}", e); - continue; - } - } - } - }); + debug!("threads = {:?}", threads); + let pool = AcceptorPool::new(acceptor.clone()); + let work = move |stream| handle_connection(stream, &handler); Ok(Listening { - acceptor: acceptor, - guard: Some(guard), + _guard: pool.accept(work, threads), socket: socket, + acceptor: acceptor }) } @@ -146,22 +91,56 @@ S: NetworkStream + Clone + Send> Server { } +fn handle_connection(mut stream: S, handler: &H) +where S: NetworkStream + Clone, H: Handler { + debug!("Incoming stream"); + let addr = match stream.peer_name() { + Ok(addr) => addr, + Err(e) => { + error!("Peer Name error: {:?}", e); + return; + } + }; + + let mut rdr = BufferedReader::new(stream.clone()); + let mut wrt = BufferedWriter::new(stream); + + let mut keep_alive = true; + while keep_alive { + let mut res = Response::new(&mut wrt); + let req = match Request::new(&mut rdr, addr) { + Ok(req) => req, + Err(e@HttpIoError(_)) => { + debug!("ioerror in keepalive loop = {:?}", e); + return; + } + Err(e) => { + //TODO: send a 400 response + error!("request error = {:?}", e); + return; + } + }; + + keep_alive = match (req.version, req.headers.get::()) { + (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, + (Http11, Some(conn)) if conn.contains(&Close) => false, + _ => true + }; + res.version = req.version; + handler.handle(req, res); + debug!("keep_alive = {:?}", keep_alive); + } +} + /// A listening server, which can later be closed. pub struct Listening { acceptor: A, - guard: Option>, + _guard: JoinGuard<'static, ()>, /// The socket addresses that the server is bound to. pub socket: SocketAddr, } impl Listening { - /// Causes the current thread to wait for this listening to complete. - pub fn await(&mut self) { - if let Some(guard) = self.guard.take() { - let _ = guard.join(); - } - } - /// Stop the server from listening to its socket address. pub fn close(&mut self) -> HttpResult<()> { debug!("closing server");