Skip to content

Commit

Permalink
start preparing for http server code utils
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Nov 7, 2023
1 parent 5ebf5af commit 2245492
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 11 deletions.
37 changes: 33 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rama/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ http = "0.2.9"
matchit = "0.7.3"
pin-project-lite = "0.2.13"
rustls = "0.22.0-alpha.3"
serde = "1.0.192"
serde_urlencoded = "0.7.1"
tokio = { version = "1.33.0", features = ["net", "io-util"] }
tokio-graceful = "0.1.5"
tokio-rustls = "0.25.0-alpha.1"
Expand Down
3 changes: 2 additions & 1 deletion rama/examples/tokio_tcp_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use rama::{
server::tcp::TcpListener,
service::{limit::ConcurrentPolicy, Layer, Service},
state::Extendable,
stream::service::{BytesRWTrackerHandle, EchoService},
stream::layer::BytesRWTrackerHandle,
stream::service::EchoService,
};

use tracing::metadata::LevelFilter;
Expand Down
45 changes: 45 additions & 0 deletions rama/src/net/http/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use http::{
header::{AsHeaderName, GetAll},
HeaderValue, Request, Response,
};

pub trait HeaderValueGetter {
fn header_value<K>(&self, key: K) -> Option<&HeaderValue>
where
K: AsHeaderName;
fn header_values<K>(&self, key: K) -> GetAll<'_, HeaderValue>
where
K: AsHeaderName;
}

impl<Body> HeaderValueGetter for Request<Body> {
fn header_value<K>(&self, key: K) -> Option<&HeaderValue>
where
K: AsHeaderName,
{
self.headers().get(key)
}

fn header_values<K>(&self, key: K) -> GetAll<'_, HeaderValue>
where
K: AsHeaderName,
{
self.headers().get_all(key)
}
}

impl<Body> HeaderValueGetter for Response<Body> {
fn header_value<K>(&self, key: K) -> Option<&HeaderValue>
where
K: AsHeaderName,
{
self.headers().get(key)
}

fn header_values<K>(&self, key: K) -> GetAll<'_, HeaderValue>
where
K: AsHeaderName,
{
self.headers().get_all(key)
}
}
2 changes: 2 additions & 0 deletions rama/src/net/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod headers;
pub use headers::HeaderValueGetter;
6 changes: 6 additions & 0 deletions rama/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod tcp;
pub use tcp::TcpStream;

pub mod http;

pub use tokio::net::ToSocketAddrs;
114 changes: 114 additions & 0 deletions rama/src/net/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};

use tokio::net::TcpStream as TokioTcpStream;

use crate::{
state::{Extendable, Extensions},
stream::{AsyncRead, AsyncWrite, ReadBuf},
};

pin_project_lite::pin_project! {
#[derive(Debug)]
pub struct TcpStream<S> {
#[pin]
inner: S,
extensions: Extensions,
}
}

impl<S> TcpStream<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
extensions: Extensions::new(),
}
}

pub fn into_parts(self) -> (S, Extensions) {
(self.inner, self.extensions)
}

pub fn from_parts(inner: S, extensions: Extensions) -> Self {
Self { inner, extensions }
}
}

impl TcpStream<TokioTcpStream> {
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.peer_addr()
}

pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}

pub fn ttl(&self) -> io::Result<u32> {
self.inner.ttl()
}

pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.inner.set_ttl(ttl)
}
}

impl<S> Extendable for TcpStream<S> {
fn extensions(&self) -> &Extensions {
&self.extensions
}

fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.extensions
}
}

impl<S> AsyncRead for TcpStream<S>
where
S: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}

impl<S> AsyncWrite for TcpStream<S>
where
S: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
}
1 change: 1 addition & 0 deletions rama/src/server/http/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions rama/src/server/http/layer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions rama/src/server/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod header;
pub mod layer;
pub mod service;

mod conn;
Expand Down
6 changes: 3 additions & 3 deletions rama/src/server/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ impl<L> TcpListener<L> {
/// This can be used to track the number of bytes read and written,
/// by using the [`BytesRWTrackerHandle`] found in the extensions.
///
/// [`BytesRWTrackerHandle`]: crate::stream::service::BytesRWTrackerHandle
pub fn bytes_tracker(self) -> TcpListener<Stack<crate::stream::service::BytesTrackerLayer, L>> {
self.layer(crate::stream::service::BytesTrackerLayer::new())
/// [`BytesRWTrackerHandle`]: crate::stream::layer::BytesRWTrackerHandle
pub fn bytes_tracker(self) -> TcpListener<Stack<crate::stream::layer::BytesTrackerLayer, L>> {
self.layer(crate::stream::layer::BytesTrackerLayer::new())
}

/// Fail requests that take longer than `timeout`.
Expand Down
2 changes: 2 additions & 0 deletions rama/src/stream/layer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod tracker;
pub use tracker::{BytesRWTrackerHandle, BytesTrackerLayer, BytesTrackerService};
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions rama/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

pub mod layer;
pub mod service;

pub trait Stream: AsyncRead + AsyncWrite {}
Expand Down
3 changes: 0 additions & 3 deletions rama/src/stream/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,3 @@ pub use echo::EchoService;

mod forward;
pub use forward::ForwardService;

mod tracker;
pub use tracker::{BytesRWTrackerHandle, BytesTrackerLayer, BytesTrackerService};

0 comments on commit 2245492

Please sign in to comment.