Skip to content

Commit

Permalink
WebSocket over h2 (RFC 8441)
Browse files Browse the repository at this point in the history
- Auto probe and fallback
- Pending hyperium/hyper#3242 to work
  - hyper 人合了麻烦踢一下谢谢茄子
  • Loading branch information
bdbai committed Jun 9, 2023
1 parent faac352 commit c241257
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 210 deletions.
10 changes: 5 additions & 5 deletions ytflow/src/config/plugin/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl<'de> Factory for WsClientFactory<'de> {
}
};

ws::WebSocketStreamOutboundFactory {
path: self.path.to_string(),
host: self.host.map(|s| s.to_owned()),
headers: std::mem::take(&mut self.headers),
ws::WebSocketStreamOutboundFactory::new(
self.host.map(|s| s.to_owned()),
self.path.to_string(),
std::mem::take(&mut self.headers),
next,
}
)
});
set.fully_constructed
.stream_outbounds
Expand Down
2 changes: 2 additions & 0 deletions ytflow/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ pub mod tls;
pub mod trojan;
pub mod vmess;
pub mod ws;

pub(self) mod h2;
149 changes: 149 additions & 0 deletions ytflow/src/plugin/h2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::error::Error as StdError;
use std::future::Future;
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Weak;
use std::task::{Context, Poll};

use http::uri::{Scheme, Uri};
use hyper::client::connect::{Connected, Connection};
use hyper::rt::Executor;
use hyper::service::Service as TowerService;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::flow::*;

#[derive(Clone)]
pub struct FlowAdapterConnector {
pub next: Weak<dyn StreamOutboundFactory>,
}

pub struct CompatStreamAdapter {
stream: CompatStream,
use_h2: bool,
}

pub struct TokioHyperExecutor(tokio::runtime::Handle);

impl TokioHyperExecutor {
pub fn new_current() -> Self {
Self(tokio::runtime::Handle::current())
}
}

impl Executor<Pin<Box<dyn Future<Output = ()> + Send>>> for TokioHyperExecutor {
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn(fut);
}
}

impl TowerService<Uri> for FlowAdapterConnector {
type Response = CompatStreamAdapter;

type Error = Box<dyn StdError + Send + Sync>;

type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, dst: Uri) -> Self::Future {
let host = dst
.authority()
.expect("h2 url must have authority")
.host()
.trim_start_matches('[')
.trim_end_matches(']');
let next_host = Ipv4Addr::from_str(host)
.ok()
.map(|i| HostName::Ip(i.into()))
.or_else(|| {
Ipv6Addr::from_str(host)
.map(|i| HostName::Ip(i.into()))
.ok()
})
.or_else(|| {
let mut host = host.to_string();
if !host.ends_with('.') {
host.push('.');
}
HostName::from_domain_name(host.to_string()).ok()
})
.unwrap();
let remote_peer = DestinationAddr {
host: next_host,
port: dst
.port_u16()
.unwrap_or(if dst.scheme() == Some(&Scheme::HTTPS) {
443
} else {
80
}),
};
let next = self.next.clone();
Box::pin(async move {
let next = next.upgrade().ok_or("next is gone")?;
let mut ctx = FlowContext::new(
SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
remote_peer,
);
ctx.application_layer_protocol = ["h2", "http/1.1"].into_iter().collect();
let (stream, inital_data) = next
.create_outbound(&mut ctx, &[])
.await
.map_err(|e| e.to_string())?;
let use_h2 = std::mem::take(&mut ctx.application_layer_protocol) == ["h2"].into();
Ok(CompatStreamAdapter {
stream: CompatStream {
inner: stream,
reader: StreamReader::new(4096, inital_data),
},
use_h2,
})
})
}
}

impl AsyncRead for CompatStreamAdapter {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}

impl AsyncWrite for CompatStreamAdapter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}

impl Connection for CompatStreamAdapter {
fn connected(&self) -> Connected {
if self.use_h2 {
Connected::new().negotiated_h2()
} else {
Connected::new()
}
}
}
147 changes: 5 additions & 142 deletions ytflow/src/plugin/host_resolver/doh_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,26 @@
use std::error::Error as StdError;
use std::future::Future;
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Weak;
use std::task::{ready, Context, Poll};

use async_trait::async_trait;
use futures::{FutureExt, SinkExt};
use http::header::{ACCEPT, CONTENT_TYPE};
use http::uri::{Scheme, Uri};
use http::uri::Uri;
use http::{Method, Request};
use hyper::body::{Bytes, HttpBody};
use hyper::client::connect::{Connected, Connection};
use hyper::client::ResponseFuture;
use hyper::rt::Executor;
use hyper::{service::Service as TowerService, Body, Client as HyperClient};
use tokio::io::{AsyncRead, AsyncWrite};
use hyper::{Body, Client as HyperClient};
use tokio::sync::mpsc;
use tokio_util::sync::PollSender;

use crate::flow::*;
use crate::plugin::h2::{FlowAdapterConnector, TokioHyperExecutor};

pub struct DohDatagramAdapterFactory {
client: HyperClient<FlowAdapterConnector, Body>,
url: Uri,
}

#[derive(Clone)]
struct FlowAdapterConnector {
next: Weak<dyn StreamOutboundFactory>,
remote_peer: DestinationAddr,
}

#[derive(Default)]
enum DohDatagramAdapterTxState {
#[default]
Expand All @@ -49,139 +36,15 @@ struct DohDatagramAdapter {
rx_chan: (Option<PollSender<Buffer>>, mpsc::Receiver<Buffer>),
}

struct CompatStreamAdapter {
stream: CompatStream,
use_h2: bool,
}

struct TokioHyperExecutor(tokio::runtime::Handle);

impl DohDatagramAdapterFactory {
pub fn new(url: Uri, next: Weak<dyn StreamOutboundFactory>) -> Self {
let host = url
.authority()
.expect("doh url must have authority")
.host()
.trim_start_matches('[')
.trim_end_matches(']');
let next_host = Ipv4Addr::from_str(host)
.ok()
.map(|i| HostName::Ip(i.into()))
.or_else(|| {
Ipv6Addr::from_str(host)
.map(|i| HostName::Ip(i.into()))
.ok()
})
.or_else(|| {
let mut host = host.to_string();
if !host.ends_with('.') {
host.push('.');
}
HostName::from_domain_name(host.to_string()).ok()
})
.unwrap();
let remote_peer = DestinationAddr {
host: next_host,
port: url
.port_u16()
.unwrap_or(if url.scheme() == Some(&Scheme::HTTPS) {
443
} else {
80
}),
};
let client = hyper::Client::builder()
.executor(TokioHyperExecutor(tokio::runtime::Handle::current()))
.build(FlowAdapterConnector { next, remote_peer });
.executor(TokioHyperExecutor::new_current())
.build(FlowAdapterConnector { next });
Self { client, url }
}
}

impl Executor<Pin<Box<dyn Future<Output = ()> + Send>>> for TokioHyperExecutor {
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn(fut);
}
}

impl TowerService<Uri> for FlowAdapterConnector {
type Response = CompatStreamAdapter;

type Error = Box<dyn StdError + Send + Sync>;

type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _dst: Uri) -> Self::Future {
// 让下一层 redirect,这里直接摆烂
let next = self.next.clone();
let remote_peer = self.remote_peer.clone();
Box::pin(async move {
let next = next.upgrade().ok_or("next is gone")?;
let mut ctx = FlowContext::new(
SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
remote_peer,
);
ctx.application_layer_protocol = ["h2", "http/1.1"].into_iter().collect();
let (stream, inital_data) = next
.create_outbound(&mut ctx, &[])
.await
.map_err(|e| e.to_string())?;
let use_h2 = std::mem::take(&mut ctx.application_layer_protocol) == ["h2"].into();
Ok(CompatStreamAdapter {
stream: CompatStream {
inner: stream,
reader: StreamReader::new(4096, inital_data),
},
use_h2,
})
})
}
}

impl AsyncRead for CompatStreamAdapter {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}

impl AsyncWrite for CompatStreamAdapter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}

impl Connection for CompatStreamAdapter {
fn connected(&self) -> Connected {
if self.use_h2 {
Connected::new().negotiated_h2()
} else {
Connected::new()
}
}
}

#[async_trait]
impl DatagramSessionFactory for DohDatagramAdapterFactory {
async fn bind(&self, _context: Box<FlowContext>) -> FlowResult<Box<dyn DatagramSession>> {
Expand Down
Loading

0 comments on commit c241257

Please sign in to comment.