Skip to content

Commit

Permalink
feat(proto,server,service): completely remove server module's relianc…
Browse files Browse the repository at this point in the history
…e on tower::Service
  • Loading branch information
tomkarw committed Aug 18, 2022
1 parent 22bdbb4 commit 1c71ec6
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 147 deletions.
44 changes: 5 additions & 39 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ pub(crate) trait Dispatch {
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
fn should_poll(&self) -> bool;
}

cfg_server! {
use crate::service::TowerHttpService;
use crate::service::HttpService;

pub(crate) struct Server<S: TowerHttpService<B>, B> {
pub(crate) struct Server<S: HttpService<B>, B> {
in_flight: Pin<Box<Option<S::Future>>>,
pub(crate) service: S,
}
Expand Down Expand Up @@ -235,15 +234,6 @@ where
}

fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// can dispatch receive, or does it still care about, an incoming message?
match ready!(self.dispatch.poll_ready(cx)) {
Ok(()) => (),
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
return Poll::Ready(Ok(()));
}
}
// dispatch is ready for a message, try to read one
match ready!(self.conn.poll_read_head(cx)) {
Some(Ok((mut head, body_len, wants))) => {
Expand Down Expand Up @@ -454,7 +444,7 @@ impl<'a, T> Drop for OptGuard<'a, T> {
cfg_server! {
impl<S, B> Server<S, B>
where
S: TowerHttpService<B>,
S: HttpService<B>,
{
pub(crate) fn new(service: S) -> Server<S, B> {
Server {
Expand All @@ -469,11 +459,11 @@ cfg_server! {
}

// Service is never pinned
impl<S: TowerHttpService<B>, B> Unpin for Server<S, B> {}
impl<S: HttpService<B>, B> Unpin for Server<S, B> {}

impl<S, Bs> Dispatch for Server<S, Body>
where
S: TowerHttpService<Body, ResBody = Bs>,
S: HttpService<Body, ResBody = Bs>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bs: HttpBody,
{
Expand Down Expand Up @@ -519,17 +509,6 @@ cfg_server! {
Ok(())
}

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
if self.in_flight.is_some() {
Poll::Pending
} else {
self.service.poll_ready(cx).map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
}
}

fn should_poll(&self) -> bool {
self.in_flight.is_some()
}
Expand Down Expand Up @@ -631,19 +610,6 @@ cfg_client! {
}
}

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
match self.callback {
Some(ref mut cb) => match cb.poll_canceled(cx) {
Poll::Ready(()) => {
trace!("callback receiver has dropped");
Poll::Ready(Err(()))
}
Poll::Pending => Poll::Ready(Ok(())),
},
None => Poll::Ready(Err(())),
}
}

fn should_poll(&self) -> bool {
self.callback.is_none()
}
Expand Down
42 changes: 5 additions & 37 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::headers;
use crate::proto::h2::ping::Recorder;
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
use crate::proto::Dispatched;
use crate::service::TowerHttpService;
use crate::service::HttpService;

use crate::upgrade::{OnUpgrade, Pending, Upgraded};
use crate::{Body, Response};
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Default for Config {
pin_project! {
pub(crate) struct Server<T, S, B, E>
where
S: TowerHttpService<Body>,
S: HttpService<Body>,
B: HttpBody,
{
exec: E,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
impl<T, S, B, E> Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite + Unpin,
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
E: ConnStreamExec<S::Future, B>,
Expand Down Expand Up @@ -181,7 +181,7 @@ where
impl<T, S, B, E> Future for Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite + Unpin,
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
E: ConnStreamExec<S::Future, B>,
Expand Down Expand Up @@ -236,46 +236,14 @@ where
exec: &mut E,
) -> Poll<crate::Result<()>>
where
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<S::Future, B>,
{
if self.closing.is_none() {
loop {
self.poll_ping(cx);

// Check that the service is ready to accept a new request.
//
// - If not, just drive the connection some.
// - If ready, try to accept a new request from the connection.
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
// use `poll_closed` instead of `poll_accept`,
// in order to avoid accepting a request.
ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
trace!("incoming connection complete");
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(err)) => {
let err = crate::Error::new_user_service(err);
debug!("service closed: {}", err);

let reason = err.h2_reason();
if reason == Reason::NO_ERROR {
// NO_ERROR is only used for graceful shutdowns...
trace!("interpreting NO_ERROR user error as graceful_shutdown");
self.conn.graceful_shutdown();
} else {
trace!("abruptly shutting down with {:?}", reason);
self.conn.abrupt_shutdown(reason);
}
self.closing = Some(err);
break;
}
}

// When the service is ready, accepts an incoming request.
match ready!(self.conn.poll_accept(cx)) {
Some(Ok((req, mut respond))) => {
trace!("incoming request");
Expand Down
22 changes: 11 additions & 11 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ cfg_feature! {
use crate::common::Never;
use crate::common::exec::{ConnStreamExec, Exec};
use crate::proto;
use crate::service::TowerHttpService;
use crate::service::HttpService;

pub(super) use self::upgrades::UpgradeableConnection;
}
Expand Down Expand Up @@ -123,7 +123,7 @@ pin_project! {
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
pub struct Connection<T, S, E = Exec>
where
S: TowerHttpService<Body>,
S: HttpService<Body>,
{
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
fallback: Fallback<E>,
Expand Down Expand Up @@ -151,7 +151,7 @@ pin_project! {
#[project = ProtoServerProj]
pub(super) enum ProtoServer<T, B, S, E = Exec>
where
S: TowerHttpService<Body>,
S: HttpService<Body>,
B: HttpBody,
{
H1 {
Expand Down Expand Up @@ -598,7 +598,7 @@ impl<E> Http<E> {
/// ```
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
where
S: TowerHttpService<Body, ResBody = Bd>,
S: HttpService<Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: HttpBody + 'static,
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand Down Expand Up @@ -678,7 +678,7 @@ impl<E> Http<E> {
#[cfg(any(feature = "http1", feature = "http2"))]
impl<I, B, S, E> Connection<I, S, E>
where
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: HttpBody + 'static,
Expand Down Expand Up @@ -848,7 +848,7 @@ where
#[cfg(any(feature = "http1", feature = "http2"))]
impl<I, B, S, E> Future for Connection<I, S, E>
where
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static,
B: HttpBody + 'static,
Expand Down Expand Up @@ -895,7 +895,7 @@ where
#[cfg(any(feature = "http1", feature = "http2"))]
impl<I, S> fmt::Debug for Connection<I, S>
where
S: TowerHttpService<Body>,
S: HttpService<Body>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
Expand Down Expand Up @@ -928,7 +928,7 @@ impl Default for ConnectionMode {
impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
where
T: AsyncRead + AsyncWrite + Unpin,
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand Down Expand Up @@ -963,14 +963,14 @@ mod upgrades {
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, S, E>
where
S: TowerHttpService<Body>,
S: HttpService<Body>,
{
pub(super) inner: Connection<T, S, E>,
}

impl<I, B, S, E> UpgradeableConnection<I, S, E>
where
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: HttpBody + 'static,
Expand All @@ -988,7 +988,7 @@ mod upgrades {

impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
where
S: TowerHttpService<Body, ResBody = B>,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: HttpBody + 'static,
Expand Down
4 changes: 2 additions & 2 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ mod http;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
mod oneshot;
mod service;
mod tower_http;
mod util;

#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
pub(super) use self::http::HttpService;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
pub(super) use self::oneshot::{oneshot, Oneshot};
pub(super) use self::tower_http::TowerHttpService;

pub use self::util::service_fn;
58 changes: 0 additions & 58 deletions src/service/tower_http.rs

This file was deleted.

0 comments on commit 1c71ec6

Please sign in to comment.