From 73345be65f895660492e28e718786b66034a4d03 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 26 Nov 2018 16:19:28 -0800 Subject: [PATCH] feat(server): add `http1_half_close(bool)` option This option determines whether a read EOF should close the connection automatically. The behavior was to always allow read EOF while waiting to respond, so this option has a default of `true`. Setting this option to `false` will allow Service futures to be canceled as soon as disconnect is noticed. Closes #1716 --- src/proto/h1/conn.rs | 27 ++++++++++++++---- src/server/conn.rs | 20 ++++++++++++++ src/server/mod.rs | 14 ++++++++++ tests/server.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 5 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 47a501e5a1..0b7e31ec3a 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -38,6 +38,7 @@ where I: AsyncRead + AsyncWrite, Conn { io: Buffered::new(io), state: State { + allow_half_close: true, cached_headers: None, error: None, keep_alive: KA::Busy, @@ -75,6 +76,10 @@ where I: AsyncRead + AsyncWrite, self.state.title_case_headers = true; } + pub(crate) fn set_disable_half_close(&mut self) { + self.state.allow_half_close = false; + } + pub fn into_inner(self) -> (I, Bytes) { self.io.into_inner() } @@ -228,10 +233,11 @@ where I: AsyncRead + AsyncWrite, trace!("read_keep_alive; is_mid_message={}", self.is_mid_message()); - if !self.is_mid_message() { - self.require_empty_read().map_err(::Error::new_io)?; + if self.is_mid_message() { + self.mid_message_detect_eof().map_err(::Error::new_io) + } else { + self.require_empty_read().map_err(::Error::new_io) } - Ok(()) } fn is_mid_message(&self) -> bool { @@ -252,7 +258,7 @@ where I: AsyncRead + AsyncWrite, // This should only be called for Clients wanting to enter the idle // state. fn require_empty_read(&mut self) -> io::Result<()> { - assert!(!self.can_read_head() && !self.can_read_body()); + debug_assert!(!self.can_read_head() && !self.can_read_body()); if !self.io.read_buf().is_empty() { debug!("received an unexpected {} bytes", self.io.read_buf().len()); @@ -279,11 +285,21 @@ where I: AsyncRead + AsyncWrite, } } + fn mid_message_detect_eof(&mut self) -> io::Result<()> { + debug_assert!(!self.can_read_head() && !self.can_read_body()); + + if self.state.allow_half_close || !self.io.read_buf().is_empty() { + Ok(()) + } else { + self.try_io_read().map(|_| ()) + } + } + fn try_io_read(&mut self) -> Poll { match self.io.read_from_io() { Ok(Async::Ready(0)) => { trace!("try_io_read; found EOF on connection: {:?}", self.state); - let must_error = self.should_error_on_eof(); + let must_error = !self.state.is_idle(); let ret = if must_error { let desc = if self.is_mid_message() { "unexpected EOF waiting for response" @@ -655,6 +671,7 @@ impl fmt::Debug for Conn { } struct State { + allow_half_close: bool, /// Re-usable HeaderMap to reduce allocating new ones. cached_headers: Option, /// If an error occurs when there wasn't a direct way to return it diff --git a/src/server/conn.rs b/src/server/conn.rs index 2902e7cd02..c729183fe1 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -45,6 +45,7 @@ pub(super) use self::upgrades::UpgradeableConnection; #[derive(Clone, Debug)] pub struct Http { exec: E, + h1_half_close: bool, h1_writev: bool, mode: ConnectionMode, keep_alive: bool, @@ -163,6 +164,7 @@ impl Http { pub fn new() -> Http { Http { exec: Exec::Default, + h1_half_close: true, h1_writev: true, mode: ConnectionMode::Fallback, keep_alive: true, @@ -195,6 +197,20 @@ impl Http { self } + /// Set whether HTTP/1 connections should support half-closures. + /// + /// Clients can chose to shutdown their write-side while waiting + /// for the server to respond. Setting this to `false` will + /// automatically close any connection immediately if `read` + /// detects an EOF. + /// + /// Default is `true`. + #[inline] + pub fn http1_half_close(&mut self, val: bool) -> &mut Self { + self.h1_half_close = val; + self + } + /// Set whether HTTP/1 connections should try to use vectored writes, /// or always flatten into a single buffer. /// @@ -261,6 +277,7 @@ impl Http { pub fn with_executor(self, exec: E2) -> Http { Http { exec, + h1_half_close: self.h1_half_close, h1_writev: self.h1_writev, mode: self.mode, keep_alive: self.keep_alive, @@ -319,6 +336,9 @@ impl Http { if !self.keep_alive { conn.disable_keep_alive(); } + if !self.h1_half_close { + conn.set_disable_half_close(); + } if !self.h1_writev { conn.set_write_strategy_flatten(); } diff --git a/src/server/mod.rs b/src/server/mod.rs index a0e0b0184e..5356f0118c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -247,6 +247,20 @@ impl Builder { self } + + /// Set whether HTTP/1 connections should support half-closures. + /// + /// Clients can chose to shutdown their write-side while waiting + /// for the server to respond. Setting this to `false` will + /// automatically close any connection immediately if `read` + /// detects an EOF. + /// + /// Default is `true`. + pub fn http1_half_close(mut self, val: bool) -> Self { + self.protocol.http1_half_close(val); + self + } + /// Sets whether HTTP/1 is required. /// /// Default is `false`. diff --git a/tests/server.rs b/tests/server.rs index 883bc9a005..f4ce1168eb 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -991,6 +991,71 @@ fn nonempty_parse_eof_returns_error() { rt.block_on(fut).expect_err("partial parse eof is error"); } +#[test] +fn socket_half_closed() { + let _ = pretty_env_logger::try_init(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); + tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR"); + + let mut buf = [0; 256]; + tcp.read(&mut buf).unwrap(); + let expected = "HTTP/1.1 200 OK\r\n"; + assert_eq!(s(&buf[..expected.len()]), expected); + }); + + let fut = listener.incoming() + .into_future() + .map_err(|_| unreachable!()) + .and_then(|(item, _incoming)| { + let socket = item.unwrap(); + Http::new() + .serve_connection(socket, service_fn(|_| { + Delay::new(Duration::from_millis(500)) + .map(|_| { + Response::new(Body::empty()) + }) + })) + }); + + rt.block_on(fut).unwrap(); +} + +#[test] +fn disconnect_after_reading_request_before_responding() { + let _ = pretty_env_logger::try_init(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); + }); + + let fut = listener.incoming() + .into_future() + .map_err(|_| unreachable!()) + .and_then(|(item, _incoming)| { + let socket = item.unwrap(); + Http::new() + .http1_half_close(false) + .serve_connection(socket, service_fn(|_| { + Delay::new(Duration::from_secs(2)) + .map(|_| -> Response { + panic!("response future should have been dropped"); + }) + })) + }); + + rt.block_on(fut).expect_err("socket disconnected"); +} + #[test] fn returning_1xx_response_is_error() { let mut rt = Runtime::new().unwrap();