-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): implement Clone for GracefulShutdown #136
base: master
Are you sure you want to change the base?
feat(server): implement Clone for GracefulShutdown #136
Conversation
Not that I'm against adding this, but knowing the use cases would help. There's a part of me that worries if having multiple clones could be confusing when |
hyperium/tonic#1788 is a use case of this change. |
32d1c48
to
eb6167a
Compare
3e306ec
to
d3b844d
Compare
d3b844d
to
670aec7
Compare
@seanmonstar, when you need to Personally the amount of code it takes to do your own graceful shutdown is very little, so I will likely keep using async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: &hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
// This can take too long; thus should be done inside `tokio::spawn`;
// however we can't since that `move`s `sender`.
if let Ok(io) = acceptor.accept(val.0).await {
let con = sender.watch(server.serve_connection(hyper_util::rt::TokioIo::new(io), app.clone()));
tokio::spawn(async move {
let _r: Result<(), hyper::Error> = con.await;
});
}
}
}
} If async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
let tls = acceptor.clone();
let http = app.clone();
let builder = server.clone();
let shutdown = sender.clone();
tokio::spawn(async move {
if let Ok(io) = tls.accept(val.0).await {
let _r: Result<(), hyper::Error> = shutdown.watch(builder.serve_connection(hyper_util::rt::TokioIo::new(io), http)).await;
}
});
}
}
} "Hand-rolled" graceful shutdown: struct ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
{
con: hyper::server::conn::http2::Connection<
hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
S,
hyper_util::rt::TokioExecutor,
>,
signal: F,
shutting_down: bool,
}
impl<S, F> ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
{
fn new(
con: hyper::server::conn::http2::Connection<
hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
S,
hyper_util::rt::TokioExecutor,
>,
signal: F,
) -> Self {
Self {
con,
signal,
shutting_down: false,
}
}
}
impl<S, F> core::future::Future for ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
F: core::future::Future<Output = Result<(), tokio::sync::watch::error::RecvError>>
+ core::marker::Unpin,
{
type Output = Result<(), hyper::Error>;
fn poll(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if !self.shutting_down && core::pin::Pin::new(&mut self.signal).poll(cx).is_ready() {
self.shutting_down = true;
core::pin::Pin::new(&mut self.con).graceful_shutdown();
}
core::pin::Pin::new(&mut self.con).poll(cx)
}
}
async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: &tokio::sync::watch::Sender<()>,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
let tls = acceptor.clone();
let http = app.clone();
let builder = server.clone();
let mut rx = sender.subscribe();
tokio::spawn(async move {
if let Ok(io) = tls.accept(val.0).await {
let _r: Result<(), hyper::Error> = ShutdownConnection::new(
builder.serve_connection(hyper_util::rt::TokioIo::new(io), http),
core::pin::pin!(rx.changed()),
)
.await;
}
});
}
}
} |
Implements Clone for GracefulShutdown. This allows users to use GracefulShutdown in other functions.