diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 6e4125db86..47a501e5a1 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -794,7 +794,7 @@ impl State { match (&self.reading, &self.writing) { (&Reading::KeepAlive, &Writing::KeepAlive) => { if let KA::Busy = self.keep_alive.status() { - self.idle(); + self.idle::(); } else { trace!("try_keep_alive({}): could keep-alive, but status = {:?}", T::LOG, self.keep_alive); self.close(); @@ -819,12 +819,23 @@ impl State { self.keep_alive.busy(); } - fn idle(&mut self) { + fn idle(&mut self) { + debug_assert!(!self.is_idle(), "State::idle() called while idle"); + self.method = None; self.keep_alive.idle(); if self.is_idle() { self.reading = Reading::Init; self.writing = Writing::Init; + + // !T::should_read_first() means Client. + // + // If Client connection has just gone idle, the Dispatcher + // should try the poll loop one more time, so as to poll the + // pending requests stream. + if !T::should_read_first() { + self.notify_read = true; + } } else { self.close(); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 174a8cb951..622ce96100 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -466,7 +466,7 @@ where // user has dropped sender handle Ok(Async::Ready(None)) }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::NotReady) => Ok(Async::NotReady), Err(never) => match never {}, } } diff --git a/tests/client.rs b/tests/client.rs index 0df729e2d8..180f926987 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1215,6 +1215,83 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 2); } + #[test] + fn client_keep_alive_when_response_before_request_body_ends() { + use futures_timer::Delay; + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut rt = Runtime::new().unwrap(); + + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder() + .build(connector); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); + // after writing the response, THEN stream the body + let _ = tx1.send(()); + + sock.read(&mut buf).expect("read 2"); + let _ = tx2.send(()); + + let n2 = sock.read(&mut buf).expect("read 3"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); + let _ = tx3.send(()); + }); + + + assert_eq!(connects.load(Ordering::Relaxed), 0); + + let delayed_body = rx1 + .map_err(|_| -> hyper::Error { panic!("rx1") }) + .and_then(|_| Delay::new(Duration::from_millis(200)).map_err(|_| panic!("delay"))) + .into_stream() + .map(|_| "hello a"); + + let rx = rx2.expect("thread panicked"); + let req = Request::builder() + .method("POST") + .uri(&*format!("http://{}/a", addr)) + .body(Body::wrap_stream(delayed_body)) + .unwrap(); + let client2 = client.clone(); + + // req 1 + let fut = client.request(req) + .join(rx) + .and_then(|_| Delay::new(Duration::from_millis(200)).expect("delay")) + // req 2 + .and_then(move |()| { + let rx = rx3.expect("thread panicked"); + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(Body::empty()) + .unwrap(); + client2 + .request(req) + .join(rx) + .map(|_| ()) + }); + + rt.block_on(fut).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1); + } + #[test] fn connect_proxy_sends_absolute_uri() { let _ = pretty_env_logger::try_init();