From d2aa5d862c95168f4e71cc65155c2dc41f306f36 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Mon, 27 Nov 2017 21:41:10 -0800 Subject: [PATCH] fix(client): don't leak connections with no keep-alive Closes #1383 --- src/client/pool.rs | 5 ++++- tests/client.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/client/pool.rs b/src/client/pool.rs index 5a1afee541..f582ccb5b6 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -200,10 +200,13 @@ impl KeepAlive for Pooled { }; if pool.is_enabled() { pool.put(self.key.clone(), self.entry.clone()); + } else { + trace!("keepalive disabled, dropping pooled ({:?})", self.key); + self.disable(); } } else { trace!("pool dropped, dropping pooled ({:?})", self.key); - self.entry.status.set(TimedKA::Disabled); + self.disable(); } } diff --git a/tests/client.rs b/tests/client.rs index c3a7af50a1..eb72a2177e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -654,6 +654,46 @@ mod dispatch_impl { } + #[test] + fn no_keep_alive_closes_connection() { + // /~https://github.com/hyperium/hyper/issues/1383 + let _ = pretty_env_logger::init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let closes = Arc::new(AtomicUsize::new(0)); + + let (tx1, rx1) = oneshot::channel(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); + let _ = tx1.send(()); + }); + + let uri = format!("http://{}/a", addr).parse().unwrap(); + + let client = Client::configure() + .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .no_proto() + .keep_alive(false) + .build(&handle); + let res = client.get(uri).and_then(move |res| { + assert_eq!(res.status(), hyper::StatusCode::Ok); + res.body().concat2() + }); + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(closes.load(Ordering::Relaxed), 1); + } + struct DebugConnector(HttpConnector, Arc);