From 6e530e68906eee870a15778c8c280350a65c0193 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 18 Jan 2017 12:34:32 -0800 Subject: [PATCH] Generalize TcpServer::serve's service a bit The services accepted by `TcpServer::serve` wire up the service's request/response types the underlying request/response types of the `BindServer` implementation. These types tend to be what we want for the simple case (non streaming), but for the streaming case they have the `tokio_proto::streaming::Message` type wired in. The `Message` type typically isn't what libraries want as the request/response types for their services, so this PR generalizes to use `From` and `Into` to ensure that custom types can still be used so long as they can be converted. I think this sould be backwards compatible as well (not causing inference regressions), hopefully at least. --- src/tcp_server.rs | 74 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/src/tcp_server.rs b/src/tcp_server.rs index 513f7e52..0c41f67d 100644 --- a/src/tcp_server.rs +++ b/src/tcp_server.rs @@ -6,10 +6,11 @@ use std::thread; use BindServer; use futures::stream::Stream; +use futures::future::{Then, Future}; use net2; use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::reactor::{Core, Handle}; -use tokio_service::NewService; +use tokio_service::{NewService, Service}; // TODO: Add more options, e.g.: // - max concurrent requests @@ -80,9 +81,14 @@ impl TcpServer where /// /// This method will block the current thread until the server is shut down. pub fn serve(&self, new_service: S) where - S: NewService + Send + Sync + 'static, + S: NewService + Send + Sync + 'static, + S::Instance: 'static, + P::ServiceError: 'static, + P::ServiceResponse: 'static, + P::ServiceRequest: 'static, + S::Request: From, + S::Response: Into, + S::Error: Into, { let new_service = Arc::new(new_service); self.with_handle(move |_| new_service.clone()) @@ -98,9 +104,14 @@ impl TcpServer where /// This method will block the current thread until the server is shut down. pub fn with_handle(&self, new_service: F) where F: Fn(&Handle) -> S + Send + Sync + 'static, - S: NewService + Send + Sync + 'static, + S: NewService + Send + Sync + 'static, + S::Instance: 'static, + P::ServiceError: 'static, + P::ServiceResponse: 'static, + P::ServiceRequest: 'static, + S::Request: From, + S::Response: Into, + S::Error: Into, { let proto = self.proto.clone(); let new_service = Arc::new(new_service); @@ -127,10 +138,48 @@ impl TcpServer where fn serve(binder: Arc

, addr: SocketAddr, workers: usize, new_service: &F) where P: BindServer, F: Fn(&Handle) -> S, - S: NewService + 'static, + S: NewService + Send + Sync, + S::Instance: 'static, + P::ServiceError: 'static, + P::ServiceResponse: 'static, + P::ServiceRequest: 'static, + S::Request: From, + S::Response: Into, + S::Error: Into, { + struct WrapService { + inner: S, + _marker: PhantomData (Request, Response, Error)>, + } + + impl Service for WrapService + where S: Service, + S::Request: From, + S::Response: Into, + S::Error: Into, + { + type Request = Request; + type Response = Response; + type Error = Error; + type Future = Then, + fn(Result) -> Result>; + + fn call(&self, req: Request) -> Self::Future { + fn change_types(r: Result) -> Result + where A: Into, + B: Into, + { + match r { + Ok(e) => Ok(e.into()), + Err(e) => Err(e.into()), + } + } + + self.inner.call(S::Request::from(req)).then(change_types) + } + } + let mut core = Core::new().unwrap(); let handle = core.handle(); let new_service = new_service(&handle); @@ -141,7 +190,10 @@ fn serve(binder: Arc

, addr: SocketAddr, workers: usize, new_se let service = try!(new_service.new_service()); // Bind it! - binder.bind_server(&handle, socket, service); + binder.bind_server(&handle, socket, WrapService { + inner: service, + _marker: PhantomData, + }); Ok(()) });