Skip to content

Commit

Permalink
improve ergonomics of tcp echo example
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Nov 4, 2023
1 parent 867bee2 commit d532583
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 39 deletions.
6 changes: 2 additions & 4 deletions rama/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ matchit = "0.7.3"
pin-project-lite = "0.2.13"
tokio = { version = "1.33.0", features = ["net", "io-util"] }
tokio-graceful = "0.1.5"
tower-async-layer = "0.1.1"
tower-async-service = "0.1.1"
tower-async = { version = "0.1.1", features = ["util"] }
tracing = "0.1.40"

[dev-dependencies]
tokio = { version = "1.33.0", features = ["full"] }
tokio-test = "0.4.3"
tower-async = { version = "0.1.1", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
19 changes: 4 additions & 15 deletions rama/examples/tokio_tcp_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::time::Duration;

use rama::graceful::{Shutdown, ShutdownGuardAdderLayer};
use rama::server::tcp::TcpListener;
use rama::service::spawn::SpawnLayer;
use rama::stream::service::EchoService;
use rama::Service;

use tower_async::{service_fn, ServiceBuilder};
use tracing::metadata::LevelFilter;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

Expand All @@ -23,22 +22,12 @@ async fn main() {
let shutdown = Shutdown::default();

shutdown.spawn_task_fn(|guard| async {
let guard = guard.downgrade();
TcpListener::bind("127.0.0.1:8080")
.await
.expect("bind TCP Listener")
.serve(service_fn(|stream| async {
let guard = guard.clone();
tokio::spawn(async move {
ServiceBuilder::new()
.layer(ShutdownGuardAdderLayer::new(guard))
.service(EchoService::new())
.call(stream)
.await
.expect("call EchoService");
});
Ok::<(), std::convert::Infallible>(())
}))
.layer(ShutdownGuardAdderLayer::new(guard.downgrade()))
.layer(SpawnLayer::new())
.serve::<_, EchoService, _>(EchoService::new())
.await
.expect("serve incoming TCP connections");
});
Expand Down
15 changes: 9 additions & 6 deletions rama/src/graceful.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
pub use tokio_graceful::*;
pub use tokio_graceful::{Shutdown, ShutdownGuard, WeakShutdownGuard};

use crate::{state::Extendable, Layer, Service};
use crate::{
service::{Layer, Service},
state::Extendable,
};

pub struct ShutdownGuardAdder<S> {
inner: S,
guard: ShutdownGuard,
guard: WeakShutdownGuard,
}

impl<S> ShutdownGuardAdder<S> {
fn new(inner: S, guard: ShutdownGuard) -> Self {
fn new(inner: S, guard: WeakShutdownGuard) -> Self {
Self { inner, guard }
}
}
Expand All @@ -22,7 +25,7 @@ where
type Error = S::Error;

async fn call(&mut self, mut request: Request) -> Result<Self::Response, Self::Error> {
let guard = self.guard.clone();
let guard = self.guard.clone().upgrade();
request.extensions_mut().insert(guard);
self.inner.call(request).await
}
Expand All @@ -42,6 +45,6 @@ impl<S> Layer<S> for ShutdownGuardAdderLayer {
type Service = ShutdownGuardAdder<S>;

fn layer(&self, inner: S) -> Self::Service {
ShutdownGuardAdder::new(inner, self.guard.clone().upgrade())
ShutdownGuardAdder::new(inner, self.guard.clone())
}
}
6 changes: 3 additions & 3 deletions rama/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#![feature(async_fn_in_trait)]
#![feature(return_type_notation)]
#![allow(incomplete_features)]

pub mod graceful;
pub mod server;
pub mod service;
pub mod state;
pub mod stream;

pub use tower_async_layer::Layer;
pub use tower_async_service::Service;
39 changes: 32 additions & 7 deletions rama/src/server/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,34 @@ use std::{io, net::SocketAddr};

use tokio::net::{TcpListener as TokioTcpListener, ToSocketAddrs};

use crate::Service;
use crate::service::{
util::{Identity, Stack},
Layer, Service, ServiceBuilder,
};

use super::TcpStream;

pub struct TcpListener {
pub struct TcpListener<L> {
inner: TokioTcpListener,
builder: ServiceBuilder<L>,
}

impl TcpListener {
impl TcpListener<Identity> {
/// Creates a new TcpListener, which will be bound to the specified address.
///
/// The returned listener is ready for accepting connections.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the `local_addr`
/// method.
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let inner = TokioTcpListener::bind(addr).await?;
Ok(TcpListener { inner })
let builder = ServiceBuilder::new();
Ok(TcpListener { inner, builder })
}
}

impl<L> TcpListener<L> {
/// Returns the local address that this listener is bound to.
///
/// This can be useful, for example, when binding to port 0 to figure out
Expand All @@ -48,14 +55,32 @@ impl TcpListener {
self.inner.set_ttl(ttl)
}

/// Adds a layer to the service.
///
/// This method can be used to add a middleware to the service.
pub fn layer<M>(self, layer: M) -> TcpListener<Stack<M, L>>
where
M: tower_async::layer::Layer<L>,
{
TcpListener {
inner: self.inner,
builder: self.builder.layer(layer),
}
}
}

impl<L> TcpListener<L> {
/// Serve connections from this listener with the given service.
///
/// This method will block the current listener for each incoming connection,
/// the underlying service can choose to spawn a task to handle the accepted stream.
pub async fn serve<T, S, E>(self, mut service: S) -> TcpServeResult<T, E>
pub async fn serve<T, S, E>(self, service: S) -> TcpServeResult<T, E>
where
S: Service<TcpStream, Response = T, Error = E>,
L: Layer<S>,
L::Service: Service<TcpStream, Response = T, Error = E>,
{
let mut service = self.builder.service(service);

loop {
let (stream, _) = self.inner.accept().await?;
let stream = TcpStream::new(stream);
Expand Down
7 changes: 7 additions & 0 deletions rama/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub use tower_async::{service_fn, Layer, Service, ServiceBuilder};

pub mod util {
pub use tower_async::layer::util::{Identity, Stack};
}

pub mod spawn;
62 changes: 62 additions & 0 deletions rama/src/service/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::{
graceful::ShutdownGuard,
service::{Layer, Service},
state::Extendable,
};

pub struct SpawnService<S> {
inner: S,
}

impl<S> SpawnService<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<S, Request> Service<Request> for SpawnService<S>
where
S: Service<Request, call(): Send> + Clone + Send + 'static,
S::Error: std::error::Error,
Request: Extendable + Send + 'static,
{
type Response = ();
type Error = std::convert::Infallible;

async fn call(&mut self, request: Request) -> Result<Self::Response, Self::Error> {
let mut service = self.inner.clone();
if let Some(guard) = request.extensions().get::<ShutdownGuard>() {
guard.clone().spawn_task(async move {
if let Err(err) = service.call(request).await {
tracing::error!(
error = &err as &dyn std::error::Error,
"graceful service error"
);
}
});
} else {
tokio::spawn(async move {
if let Err(err) = service.call(request).await {
tracing::error!(error = &err as &dyn std::error::Error, "service error");
}
});
}
Ok(())
}
}

pub struct SpawnLayer(());

impl SpawnLayer {
pub fn new() -> Self {
Self(())
}
}

impl<S> Layer<S> for SpawnLayer {
type Service = SpawnService<S>;

fn layer(&self, inner: S) -> Self::Service {
SpawnService::new(inner)
}
}
4 changes: 2 additions & 2 deletions rama/src/stream/service/echo.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::io::Error;

use crate::{stream::Stream, Service};
use crate::{service::Service, stream::Stream};

/// An async service which echoes the incoming bytes back on the same stream.
///
/// # Example
///
/// ```rust
/// use rama::{stream::service::EchoService, Service};
/// use rama::{service::Service, stream::service::EchoService};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
4 changes: 2 additions & 2 deletions rama/src/stream/service/forward.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{io::Error, pin::Pin};

use crate::{stream::Stream, Service};
use crate::{service::Service, stream::Stream};

/// Async service which forwards the incoming connection bytes to the given destination,
/// and forwards the response back from the destination to the incoming connection.
///
/// # Example
///
/// ```rust
/// use rama::{stream::service::ForwardService, Service};
/// use rama::{service::Service, stream::service::ForwardService};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down

0 comments on commit d532583

Please sign in to comment.