Skip to content

Commit

Permalink
showcase how transport extensions are available in http layer
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Dec 9, 2023
1 parent a56818d commit e0b68df
Showing 1 changed file with 35 additions and 27 deletions.
62 changes: 35 additions & 27 deletions examples/tokio_tcp_http_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use rama::{
http::StatusCode,
rt::graceful::Shutdown,
service::{limit::ConcurrentPolicy, Layer, Service},
state::Extendable,
stream::layer::BytesRWTrackerHandle,
tcp::server::TcpListener,
};
Expand Down Expand Up @@ -37,10 +36,6 @@ async fn main() {

tcp_listener.set_ttl(30).expect("set TTL");

// TODO:
// - support state passing from tcp listener to stream
// - find good way to pass state from stream to http

let mut http_server = http::HttpServer::auto();

http_server.http1_mut().preserve_header_case(true);
Expand All @@ -50,14 +45,14 @@ async fn main() {
.compression()
.trace()
.timeout(Duration::from_secs(10))
.layer(HttpLogLayer)
.service::<WebServer, _, _, _>(WebServer::new());

tcp_listener
.spawn()
.limit(ConcurrentPolicy::new(2))
.timeout(Duration::from_secs(30))
.bytes_tracker()
.layer(TcpLogLayer)
.serve_graceful(guard, web_server)
.await
.expect("serve incoming TCP connections");
Expand All @@ -69,51 +64,64 @@ async fn main() {
.expect("graceful shutdown");
}

type Request = http::Request;
type Response = http::Response<String>;

#[derive(Debug, Clone)]
pub struct TcpLogService<S> {
pub struct HttpLogService<S> {
service: S,
}

impl<S, Stream> Service<Stream> for TcpLogService<S>
impl<S, Body> Service<Request> for HttpLogService<S>
where
S: Service<Stream>,
Stream: Extendable,
S: Service<Request, Response = http::Response<Body>>,
S::Error: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;

async fn call(&self, stream: Stream) -> Result<Self::Response, Self::Error> {
let handle = stream
async fn call(&self, request: Request) -> Result<Self::Response, Self::Error> {
let uri = request.uri().clone();

let handle = request
.extensions()
.get::<BytesRWTrackerHandle>()
.expect("bytes tracker is enabled")
.clone();

let result = self.service.call(stream).await;

tracing::info!(
"bytes read: {}, bytes written: {}",
handle.read(),
handle.written(),
);

let result = self.service.call(request).await;
match &result {
Ok(response) => {
tracing::info!(
"{} > status: {} [ bytes read: {} ]",
uri,
response.status(),
handle.read(),
);
}
Err(err) => {
tracing::error!(
"{} > error: {:?} [ bytes read: {} ]",
uri,
err,
handle.read(),
);
}
}
result
}
}

pub struct TcpLogLayer;
pub struct HttpLogLayer;

impl<S> Layer<S> for TcpLogLayer {
type Service = TcpLogService<S>;
impl<S> Layer<S> for HttpLogLayer {
type Service = HttpLogService<S>;

fn layer(&self, service: S) -> Self::Service {
TcpLogService { service }
HttpLogService { service }
}
}

type Request = http::Request;
type Response = http::Response<String>;

#[derive(Debug, Clone)]
struct WebServer {
start_time: std::time::Instant,
Expand Down

0 comments on commit e0b68df

Please sign in to comment.