From cccc16c83fd2a25a53bf21afba6d38fd2683d02b Mon Sep 17 00:00:00 2001 From: "Patrick J.P. Culp" Date: Thu, 14 Sep 2023 18:07:56 +0000 Subject: [PATCH] pluto: add hyper-proxy as a module The 'hyper_proxy' module is clone of tafia/hyper-proxy, but modified to take advantage of newer Rust dependencies. Since Bottlerocket only uses 'rustls', other features and unused lines of code have been removed. --- COPYRIGHT | 4 + sources/Cargo.lock | 151 ++-------- sources/api/pluto/Cargo.toml | 8 +- .../api/pluto/src/hyper_proxy/LICENSE-MIT.md | 23 ++ sources/api/pluto/src/hyper_proxy/mod.rs | 284 ++++++++++++++++++ sources/api/pluto/src/hyper_proxy/stream.rs | 94 ++++++ sources/api/pluto/src/hyper_proxy/tunnel.rs | 220 ++++++++++++++ sources/api/pluto/src/main.rs | 1 + sources/api/pluto/src/proxy.rs | 2 +- sources/deny.toml | 4 +- 10 files changed, 655 insertions(+), 136 deletions(-) create mode 100644 sources/api/pluto/src/hyper_proxy/LICENSE-MIT.md create mode 100644 sources/api/pluto/src/hyper_proxy/mod.rs create mode 100644 sources/api/pluto/src/hyper_proxy/stream.rs create mode 100644 sources/api/pluto/src/hyper_proxy/tunnel.rs diff --git a/COPYRIGHT b/COPYRIGHT index 235f44a22cc..8dae61976f5 100644 --- a/COPYRIGHT +++ b/COPYRIGHT @@ -17,3 +17,7 @@ operating system images. macros/rust and macros/cargo (used during build) are derived from the Fedora Rust SIG's rust2rpm. https://pagure.io/fedora-rust/rust2rpm Copyright (c) 2017 Igor Gnatenko + +Contains modified hyper-proxy files [mod.rs, stream.rs, tunnel.rs] from +/~https://github.com/tafia/hyper-proxy 2021-09-20. +Copyright (c) 2017 Johann Tuffe. Licensed under the MIT License. diff --git a/sources/Cargo.lock b/sources/Cargo.lock index c3d1b60ebfb..7fdbb99c6cf 100644 --- a/sources/Cargo.lock +++ b/sources/Cargo.lock @@ -859,10 +859,10 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls 0.23.2", + "hyper-rustls", "lazy_static", "pin-project-lite", - "rustls 0.20.8", + "rustls", "tokio", "tower", "tracing", @@ -1392,15 +1392,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ct-logs" -version = "0.8.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" -dependencies = [ - "sct 0.6.1", -] - [[package]] name = "darling" version = "0.14.4" @@ -1733,7 +1724,6 @@ checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -1756,17 +1746,6 @@ version = "0.3.28" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" -[[package]] -name = "futures-executor" -version = "0.3.28" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.28" @@ -2109,42 +2088,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-proxy" -version = "0.9.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" -dependencies = [ - "bytes", - "futures", - "headers", - "http", - "hyper", - "hyper-rustls 0.22.1", - "rustls-native-certs 0.5.0", - "tokio", - "tokio-rustls 0.22.0", - "tower-service", - "webpki 0.21.4", -] - -[[package]] -name = "hyper-rustls" -version = "0.22.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" -dependencies = [ - "ct-logs", - "futures-util", - "hyper", - "log", - "rustls 0.19.1", - "rustls-native-certs 0.5.0", - "tokio", - "tokio-rustls 0.22.0", - "webpki 0.21.4", -] - [[package]] name = "hyper-rustls" version = "0.23.2" @@ -2154,10 +2097,10 @@ dependencies = [ "http", "hyper", "log", - "rustls 0.20.8", - "rustls-native-certs 0.6.3", + "rustls", + "rustls-native-certs", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", ] [[package]] @@ -3038,17 +2981,21 @@ dependencies = [ "aws-smithy-types", "aws-types", "bottlerocket-variant", + "bytes", "constants", + "futures-util", "generate-readme", + "headers", + "http", "hyper", - "hyper-proxy", - "hyper-rustls 0.23.2", + "hyper-rustls", "imdsclient", "models", "serde_json", "snafu", "tokio", "tokio-retry", + "tokio-rustls", ] [[package]] @@ -3252,7 +3199,7 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls 0.23.2", + "hyper-rustls", "ipnet", "js-sys", "log", @@ -3260,14 +3207,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.20.8", - "rustls-native-certs 0.6.3", + "rustls", + "rustls-native-certs", "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", @@ -3336,19 +3283,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64 0.13.1", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.8" @@ -3357,20 +3291,8 @@ checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", -] - -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls 0.19.1", - "schannel", - "security-framework", + "sct", + "webpki", ] [[package]] @@ -3479,16 +3401,6 @@ version = "1.2.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -4119,26 +4031,15 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls 0.19.1", - "tokio", - "webpki 0.21.4", -] - [[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.8", + "rustls", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -4575,19 +4476,9 @@ dependencies = [ [[package]] name = "webpki" -version = "0.21.4" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "webpki" -version = "0.22.0" +version = "0.22.1" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +checksum = "f0e74f82d49d545ad128049b7e88f6576df2da6b02e9ce565c6f533be576957e" dependencies = [ "ring", "untrusted", diff --git a/sources/api/pluto/Cargo.toml b/sources/api/pluto/Cargo.toml index a49b859d6bd..16850438409 100644 --- a/sources/api/pluto/Cargo.toml +++ b/sources/api/pluto/Cargo.toml @@ -2,7 +2,7 @@ name = "pluto" version = "0.1.0" authors = ["Michael Patraw "] -license = "Apache-2.0 OR MIT" +license = "(Apache-2.0 OR MIT) AND MIT" edition = "2021" publish = false build = "build.rs" @@ -11,9 +11,12 @@ exclude = ["README.md"] [dependencies] apiclient = { path = "../apiclient", version = "0.1" } +bytes = "1" constants = { path = "../../constants", version = "0.1" } +futures-util = { version = "0.3", default-features = false } +headers = "0.3" +http = "0.2" hyper = "0.14" -hyper-proxy = { version = "0.9", default-features = false, features = ["rustls"] } hyper-rustls = { version = "0.23", default-features = false, features = ["http2", "native-tokio", "tls12", "logging"] } imdsclient = { path = "../../imdsclient", version = "0.1" } models = { path = "../../models", version = "0.1" } @@ -27,6 +30,7 @@ serde_json = "1" snafu = "0.7" tokio = { version = "~1.25", default-features = false, features = ["macros", "rt-multi-thread"] } # LTS tokio-retry = "0.3" +tokio-rustls = "0.23" [build-dependencies] bottlerocket-variant = { version = "0.1", path = "../../bottlerocket-variant" } diff --git a/sources/api/pluto/src/hyper_proxy/LICENSE-MIT.md b/sources/api/pluto/src/hyper_proxy/LICENSE-MIT.md new file mode 100644 index 00000000000..47d7815df21 --- /dev/null +++ b/sources/api/pluto/src/hyper_proxy/LICENSE-MIT.md @@ -0,0 +1,23 @@ +The MIT License (MIT) + +Copyright (c) 2017 Johann Tuffe + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/sources/api/pluto/src/hyper_proxy/mod.rs b/sources/api/pluto/src/hyper_proxy/mod.rs new file mode 100644 index 00000000000..a7fbe900a71 --- /dev/null +++ b/sources/api/pluto/src/hyper_proxy/mod.rs @@ -0,0 +1,284 @@ +//! A Proxy Connector crate for Hyper based applications + +// Original Copyright 2017 Johann Tuffe. Licensed under the MIT License. +// Modifications Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +mod stream; +mod tunnel; +use http::header::HeaderMap; +use hyper::{service::Service, Uri}; + +use futures_util::future::TryFutureExt; +use std::{fmt, io, sync::Arc}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pub use stream::ProxyStream; +use tokio::io::{AsyncRead, AsyncWrite}; + +use hyper_rustls::ConfigBuilderExt; +use tokio_rustls::rustls::{ClientConfig, ServerName}; +use tokio_rustls::TlsConnector; + +type BoxError = Box; + +/// The Intercept enum to filter connections +#[derive(Debug, Clone)] +pub enum Intercept { + /// A custom intercept + Custom(Custom), +} + +/// A trait for matching between Destination and Uri +pub trait Dst { + /// Returns the connection scheme, e.g. "http" or "https" + fn scheme(&self) -> Option<&str>; + /// Returns the host of the connection + fn host(&self) -> Option<&str>; + /// Returns the port for the connection + fn port(&self) -> Option; +} + +impl Dst for Uri { + fn scheme(&self) -> Option<&str> { + self.scheme_str() + } + + fn host(&self) -> Option<&str> { + self.host() + } + + fn port(&self) -> Option { + self.port_u16() + } +} + +#[inline] +pub(crate) fn io_err>>(e: E) -> io::Error { + io::Error::new(io::ErrorKind::Other, e) +} + +/// A Custom struct to proxy custom uris +#[derive(Clone)] +#[allow(clippy::type_complexity)] +pub struct Custom(Arc, Option<&str>, Option) -> bool + Send + Sync>); + +impl fmt::Debug for Custom { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "_") + } +} + +impl, Option<&str>, Option) -> bool + Send + Sync + 'static> From + for Custom +{ + fn from(f: F) -> Custom { + Custom(Arc::new(f)) + } +} + +impl Intercept { + /// A function to check if given `Uri` is proxied + pub fn matches(&self, uri: &D) -> bool { + match (self, uri.scheme()) { + (&Intercept::Custom(Custom(ref f)), _) => f(uri.scheme(), uri.host(), uri.port()), + } + } +} + +impl, Option<&str>, Option) -> bool + Send + Sync + 'static> From + for Intercept +{ + fn from(f: F) -> Intercept { + Intercept::Custom(f.into()) + } +} + +/// A Proxy struct +#[derive(Clone, Debug)] +pub struct Proxy { + intercept: Intercept, + force_connect: bool, + headers: HeaderMap, + uri: Uri, +} + +impl Proxy { + /// Create a new `Proxy` + pub fn new>(intercept: I, uri: Uri) -> Proxy { + Proxy { + intercept: intercept.into(), + uri, + headers: HeaderMap::new(), + force_connect: false, + } + } +} + +/// A wrapper around `Proxy`s with a connector. +#[derive(Clone)] +pub struct ProxyConnector { + proxies: Vec, + connector: C, + tls: Option, +} + +impl fmt::Debug for ProxyConnector { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!( + f, + "ProxyConnector {}{{ proxies: {:?}, connector: {:?} }}", + if self.tls.is_some() { + "" + } else { + "(unsecured)" + }, + self.proxies, + self.connector + ) + } +} + +impl ProxyConnector { + /// Create a new secured Proxies + pub fn new(connector: C) -> Result { + let config = ClientConfig::builder() + .with_safe_defaults() + .with_native_roots() + .with_no_client_auth(); + + let cfg = Arc::new(config); + let tls = TlsConnector::from(cfg); + + Ok(ProxyConnector { + proxies: Vec::new(), + connector, + tls: Some(tls), + }) + } + + /// Create a proxy connector and attach a particular proxy + pub fn from_proxy(connector: C, proxy: Proxy) -> Result { + let mut c = ProxyConnector::new(connector)?; + c.proxies.push(proxy); + Ok(c) + } + + fn match_proxy(&self, uri: &D) -> Option<&Proxy> { + self.proxies.iter().find(|p| p.intercept.matches(uri)) + } +} + +macro_rules! mtry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => break Err(e.into()), + } + }; +} + +impl Service for ProxyConnector +where + C: Service, + C::Response: AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: Into, +{ + type Response = ProxyStream; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.connector.poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(io_err(e.into()))), + Poll::Pending => Poll::Pending, + } + } + + fn call(&mut self, uri: Uri) -> Self::Future { + if let (Some(p), Some(host)) = (self.match_proxy(&uri), uri.host()) { + if uri.scheme() == Some(&http::uri::Scheme::HTTPS) || p.force_connect { + let host = host.to_owned(); + let port = + uri.port_u16() + .unwrap_or(if uri.scheme() == Some(&http::uri::Scheme::HTTP) { + 80 + } else { + 443 + }); + let tunnel = tunnel::new(&host, port, &p.headers); + let connection = + proxy_dst(&uri, &p.uri).map(|proxy_url| self.connector.call(proxy_url)); + let tls = if uri.scheme() == Some(&http::uri::Scheme::HTTPS) { + self.tls.clone() + } else { + None + }; + + Box::pin(async move { + #[allow(clippy::never_loop)] + loop { + // this hack will gone once `try_blocks` will eventually stabilized + let proxy_stream = mtry!(mtry!(connection).await.map_err(io_err)); + let tunnel_stream = mtry!(tunnel.with_stream(proxy_stream).await); + + break match tls { + Some(tls) => { + let server_name: ServerName = + mtry!(host.as_str().try_into().map_err(io_err)); + let secure_stream = mtry!(tls + .connect(server_name, tunnel_stream) + .await + .map_err(io_err)); + + Ok(ProxyStream::Secured(Box::new(secure_stream))) + } + + None => Ok(ProxyStream::Regular(tunnel_stream)), + }; + } + }) + } else { + match proxy_dst(&uri, &p.uri) { + Ok(proxy_uri) => Box::pin( + self.connector + .call(proxy_uri) + .map_ok(ProxyStream::Regular) + .map_err(|err| io_err(err.into())), + ), + Err(err) => Box::pin(futures_util::future::err(io_err(err))), + } + } + } else { + Box::pin( + self.connector + .call(uri) + .map_ok(ProxyStream::NoProxy) + .map_err(|err| io_err(err.into())), + ) + } + } +} + +fn proxy_dst(dst: &Uri, proxy: &Uri) -> io::Result { + Uri::builder() + .scheme( + proxy + .scheme_str() + .ok_or_else(|| io_err(format!("proxy uri missing scheme: {}", proxy)))?, + ) + .authority( + proxy + .authority() + .ok_or_else(|| io_err(format!("proxy uri missing host: {}", proxy)))? + .clone(), + ) + .path_and_query(dst.path_and_query().unwrap().clone()) + .build() + .map_err(|err| io_err(format!("other error: {}", err))) +} diff --git a/sources/api/pluto/src/hyper_proxy/stream.rs b/sources/api/pluto/src/hyper_proxy/stream.rs new file mode 100644 index 00000000000..b762eaeddd9 --- /dev/null +++ b/sources/api/pluto/src/hyper_proxy/stream.rs @@ -0,0 +1,94 @@ +// Original Copyright 2017 Johann Tuffe. Licensed under the MIT License. +// Modifications Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use tokio_rustls::client::TlsStream as RustlsStream; + +use hyper::client::connect::{Connected, Connection}; + +pub type TlsStream = RustlsStream; + +/// A Proxy Stream wrapper +pub enum ProxyStream { + NoProxy(R), + Regular(R), + Secured(Box>), +} + +macro_rules! match_fn_pinned { + ($self:expr, $fn:ident, $ctx:expr, $buf:expr) => { + match $self.get_mut() { + ProxyStream::NoProxy(s) => Pin::new(s).$fn($ctx, $buf), + ProxyStream::Regular(s) => Pin::new(s).$fn($ctx, $buf), + ProxyStream::Secured(s) => Pin::new(s).$fn($ctx, $buf), + } + }; + + ($self:expr, $fn:ident, $ctx:expr) => { + match $self.get_mut() { + ProxyStream::NoProxy(s) => Pin::new(s).$fn($ctx), + ProxyStream::Regular(s) => Pin::new(s).$fn($ctx), + ProxyStream::Secured(s) => Pin::new(s).$fn($ctx), + } + }; +} + +impl AsyncRead for ProxyStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match_fn_pinned!(self, poll_read, cx, buf) + } +} + +impl AsyncWrite for ProxyStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match_fn_pinned!(self, poll_write, cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + match_fn_pinned!(self, poll_write_vectored, cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + match self { + ProxyStream::NoProxy(s) => s.is_write_vectored(), + ProxyStream::Regular(s) => s.is_write_vectored(), + ProxyStream::Secured(s) => s.is_write_vectored(), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match_fn_pinned!(self, poll_flush, cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match_fn_pinned!(self, poll_shutdown, cx) + } +} + +impl Connection for ProxyStream { + fn connected(&self) -> Connected { + match self { + ProxyStream::NoProxy(s) => s.connected(), + + ProxyStream::Regular(s) => s.connected().proxy(true), + + ProxyStream::Secured(s) => s.get_ref().0.connected().proxy(true), + } + } +} diff --git a/sources/api/pluto/src/hyper_proxy/tunnel.rs b/sources/api/pluto/src/hyper_proxy/tunnel.rs new file mode 100644 index 00000000000..c2e53452ae6 --- /dev/null +++ b/sources/api/pluto/src/hyper_proxy/tunnel.rs @@ -0,0 +1,220 @@ +// Original Copyright 2017 Johann Tuffe. Licensed under the MIT License. +// Modifications Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +use crate::hyper_proxy::io_err; +use bytes::{buf::Buf, BytesMut}; +use http::HeaderMap; +use std::fmt::{self, Display, Formatter}; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +macro_rules! try_ready { + ($x:expr) => { + match $x { + core::task::Poll::Ready(Ok(x)) => x, + core::task::Poll::Ready(Err(e)) => return core::task::Poll::Ready(Err(e.into())), + core::task::Poll::Pending => return core::task::Poll::Pending, + } + }; +} + +pub(crate) struct TunnelConnect { + buf: BytesMut, +} + +impl TunnelConnect { + /// Change stream + pub fn with_stream(self, stream: S) -> Tunnel { + Tunnel { + buf: self.buf, + stream: Some(stream), + state: TunnelState::Writing, + } + } +} + +pub(crate) struct Tunnel { + buf: BytesMut, + stream: Option, + state: TunnelState, +} + +#[derive(Debug)] +enum TunnelState { + Writing, + Reading, +} + +struct HeadersDisplay<'a>(&'a HeaderMap); + +impl<'a> Display for HeadersDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + for (key, value) in self.0 { + let value_str = value.to_str().map_err(|_| fmt::Error)?; + write!(f, "{}: {}\r\n", key.as_str(), value_str)?; + } + + Ok(()) + } +} + +/// Creates a new tunnel through proxy +pub(crate) fn new(host: &str, port: u16, headers: &HeaderMap) -> TunnelConnect { + let buf = format!( + "CONNECT {0}:{1} HTTP/1.1\r\n\ + Host: {0}:{1}\r\n\ + {2}\ + \r\n", + host, + port, + HeadersDisplay(headers) + ) + .into_bytes(); + + TunnelConnect { + buf: buf.as_slice().into(), + } +} + +impl Future for Tunnel { + type Output = Result; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + if self.stream.is_none() { + panic!("must not poll after future is complete") + } + + let this = self.get_mut(); + + loop { + if let TunnelState::Writing = &this.state { + let fut = this.stream.as_mut().unwrap().write_buf(&mut this.buf); + futures_util::pin_mut!(fut); + let n = try_ready!(fut.poll(ctx)); + + if !this.buf.has_remaining() { + this.state = TunnelState::Reading; + this.buf.truncate(0); + } else if n == 0 { + return Poll::Ready(Err(io_err("unexpected EOF while tunnel writing"))); + } + } else { + let fut = this.stream.as_mut().unwrap().read_buf(&mut this.buf); + futures_util::pin_mut!(fut); + let n = try_ready!(fut.poll(ctx)); + + if n == 0 { + return Poll::Ready(Err(io_err("unexpected EOF while tunnel reading"))); + } else { + let read = &this.buf[..]; + if read.len() > 12 { + if read.starts_with(b"HTTP/1.1 200") || read.starts_with(b"HTTP/1.0 200") { + if read.ends_with(b"\r\n\r\n") { + return Poll::Ready(Ok(this.stream.take().unwrap())); + } + // else read more + } else { + let len = read.len().min(16); + return Poll::Ready(Err(io_err(format!( + "unsuccessful tunnel ({})", + String::from_utf8_lossy(&read[0..len]) + )))); + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{HeaderMap, Tunnel}; + use futures_util::future::TryFutureExt; + use std::io::{Read, Write}; + use std::net::TcpListener; + use std::thread; + use tokio::net::TcpStream; + use tokio::runtime::Runtime; + + fn tunnel(conn: S, host: String, port: u16) -> Tunnel { + super::new(&host, port, &HeaderMap::new()).with_stream(conn) + } + + #[rustfmt::skip] + macro_rules! mock_tunnel { + () => {{ + mock_tunnel!( + b"\ + HTTP/1.1 200 OK\r\n\ + \r\n\ + " + ) + }}; + ($write:expr) => {{ + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let connect_expected = format!( + "\ + CONNECT {0}:{1} HTTP/1.1\r\n\ + Host: {0}:{1}\r\n\ + \r\n\ + ", + addr.ip(), + addr.port() + ).into_bytes(); + + thread::spawn(move || { + let (mut sock, _) = listener.accept().unwrap(); + let mut buf = [0u8; 4096]; + let n = sock.read(&mut buf).unwrap(); + assert_eq!(&buf[..n], &connect_expected[..]); + + sock.write_all($write).unwrap(); + }); + addr + }}; + } + + #[test] + fn test_tunnel() { + let addr = mock_tunnel!(); + + let core = Runtime::new().unwrap(); + let work = TcpStream::connect(&addr); + let host = addr.ip().to_string(); + let port = addr.port(); + let work = work.and_then(|tcp| tunnel(tcp, host, port)); + + core.block_on(work).unwrap(); + } + + #[test] + fn test_tunnel_eof() { + let addr = mock_tunnel!(b"HTTP/1.1 200 OK"); + + let core = Runtime::new().unwrap(); + let work = TcpStream::connect(&addr); + let host = addr.ip().to_string(); + let port = addr.port(); + let work = work.and_then(|tcp| tunnel(tcp, host, port)); + + core.block_on(work).unwrap_err(); + } + + #[test] + fn test_tunnel_bad_response() { + let addr = mock_tunnel!(b"foo bar baz hallo"); + + let core = Runtime::new().unwrap(); + let work = TcpStream::connect(&addr); + let host = addr.ip().to_string(); + let port = addr.port(); + let work = work.and_then(|tcp| tunnel(tcp, host, port)); + + core.block_on(work).unwrap_err(); + } +} diff --git a/sources/api/pluto/src/main.rs b/sources/api/pluto/src/main.rs index f1a0d42f43f..749f3bc0ce3 100644 --- a/sources/api/pluto/src/main.rs +++ b/sources/api/pluto/src/main.rs @@ -35,6 +35,7 @@ mod api; mod aws; mod ec2; mod eks; +mod hyper_proxy; mod proxy; use imdsclient::ImdsClient; diff --git a/sources/api/pluto/src/proxy.rs b/sources/api/pluto/src/proxy.rs index 1e355c242a6..24ba7818b8a 100644 --- a/sources/api/pluto/src/proxy.rs +++ b/sources/api/pluto/src/proxy.rs @@ -1,5 +1,5 @@ +use crate::hyper_proxy::{Proxy, ProxyConnector}; use hyper::Uri; -use hyper_proxy::{Proxy, ProxyConnector}; use hyper_rustls::HttpsConnectorBuilder; use snafu::{ResultExt, Snafu}; use std::env; diff --git a/sources/deny.toml b/sources/deny.toml index e0ed120cc66..1f1c1e3fc23 100644 --- a/sources/deny.toml +++ b/sources/deny.toml @@ -65,15 +65,13 @@ skip = [ ] skip-tree = [ - # hyper-proxy is using an older hyper-rustls - { name = "hyper-proxy", version = "=0.9.1" }, # tungstenite is using an older sha-1 { name = "tungstenite", version = "=0.16" }, # windows-sys is not a direct dependency. mio and schannel # are using different versions of windows-sys. we skip the # dependency tree because windows-sys has many sub-crates # that differ in major version. - { name = "windows-sys", version = "=0.42.0" }, + { name = "windows-sys" }, # generate-readme pulls in an older clap that causes some duplicate # dependencies { name = "generate-readme", version = "=0.1.0" },