Skip to content

Commit

Permalink
Add connection setup helpers to the quinn transport. (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn authored Dec 12, 2024
2 parents ef57dc1 + c5b1e14 commit a0ee996
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 187 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust-version = "1.76"

[dependencies]
bytes = { version = "1", optional = true }
derive_more = { version = "1.0.0-beta.6", features = ["from", "try_into", "display"] }
flume = { version = "0.11", optional = true }
futures-lite = "2.3.0"
futures-sink = "0.3.30"
Expand All @@ -23,23 +22,29 @@ hyper = { version = "0.14.16", features = ["full"], optional = true }
iroh = { version = "0.29", optional = true }
pin-project = "1"
quinn = { package = "iroh-quinn", version = "0.12", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
tokio-serde = { version = "0.9", features = ["bincode"], optional = true }
tokio-util = { version = "0.7", features = ["rt"] }
postcard = { version = "1", features = ["use-std"], optional = true }
tracing = "0.1"
futures = { version = "0.3.30", optional = true }
anyhow = "1.0.73"
anyhow = "1"
document-features = "0.2"
# for test-utils
rcgen = { version = "0.13", optional = true }
# for test-utils
rustls = { version = "0.23", default-features = false, features = ["ring"], optional = true }

# Indirect dependencies, is needed to make the minimal crates versions work
slab = "0.4.9" # iroh-quinn
smallvec = "1.13.2"
time = "0.3.36" # serde

[dev-dependencies]
anyhow = "1.0.73"
anyhow = "1"
async-stream = "0.3.3"
derive_more = { version = "1", features = ["from", "try_into", "display"] }

serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
Expand All @@ -55,15 +60,27 @@ nested_enum_utils = "0.1.0"
tokio-util = { version = "0.7", features = ["rt"] }

[features]
## HTTP transport using the `hyper` crate
hyper-transport = ["dep:flume", "dep:hyper", "dep:postcard", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
## QUIC transport using the `iroh-quinn` crate
quinn-transport = ["dep:flume", "dep:quinn", "dep:postcard", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
## In memory transport using the `flume` crate
flume-transport = ["dep:flume"]
## p2p QUIC transport using the `iroh` crate
iroh-transport = ["dep:iroh", "dep:flume", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"]
## Macros for creating request handlers
macros = []
## Utilities for testing
test-utils = ["dep:rcgen", "dep:rustls"]
## Default, includes the memory transport
default = ["flume-transport"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "quicrpc_docsrs"]

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(quicrpc_docsrs)"] }

[[example]]
name = "errors"
Expand Down
32 changes: 32 additions & 0 deletions DOCS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Building docs for this crate is a bit complex. There are lots of feature flags,
so we want feature flag markers in the docs, especially for the transports.

There is an experimental cargo doc feature that adds feature flag markers. To
get those, run docs with this command line:

```rust
RUSTDOCFLAGS="--cfg quicrpc_docsrs" cargo +nightly doc --all-features --no-deps --open
```

This sets the flag `quicrpc_docsrs` when creating docs, which triggers statements
like below that add feature flag markers. Note that you *need* nightly for this feature
as of now.

```
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "flume-transport")))]
```

The feature is *enabled* using this statement in lib.rs:

```
#![cfg_attr(quicrpc_docsrs, feature(doc_cfg))]
```

We tell [docs.rs] to use the `quicrpc_docsrs` config using these statements
in Cargo.toml:

```
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "quicrpc_docsrs"]
```
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@ This may change in the future as quic implementations get more optimized.
[quinn]: https://docs.rs/quinn/
[flume]: https://docs.rs/flume/
[grpc]: https://grpc.io/

# Docs

Properly building docs for this crate is quite complex. For all the gory details,
see [DOCS.md].
2 changes: 1 addition & 1 deletion examples/split/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.14"
futures = "0.3.26"
quic-rpc = { path = "../../..", features = ["quinn-transport", "macros"] }
quic-rpc = { path = "../../..", features = ["quinn-transport", "macros", "test-utils"] }
quinn = { package = "iroh-quinn", version = "0.12" }
rustls = { version = "0.23", default-features = false, features = ["ring"] }
tracing-subscriber = "0.3.16"
Expand Down
60 changes: 5 additions & 55 deletions examples/split/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![allow(unknown_lints, non_local_definitions)]

use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;

use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use quic_rpc::{transport::quinn::QuinnConnector, RpcClient};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use quic_rpc::{
transport::quinn::{make_insecure_client_endpoint, QuinnConnector},
RpcClient,
};
use types::compute::*;

// types::create_compute_client!(ComputeClient);
Expand Down Expand Up @@ -63,54 +64,3 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

pub fn make_insecure_client_endpoint(bind_addr: SocketAddr) -> Result<Endpoint> {
let crypto = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification))
.with_no_client_auth();

let client_cfg = QuicClientConfig::try_from(crypto)?;
let client_cfg = ClientConfig::new(Arc::new(client_cfg));
let mut endpoint = Endpoint::client(bind_addr)?;
endpoint.set_default_client_config(client_cfg);
Ok(endpoint)
}

#[derive(Debug)]
struct SkipServerVerification;

impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}

fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![]
}
}
4 changes: 1 addition & 3 deletions examples/split/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ anyhow = "1.0.14"
async-stream = "0.3.3"
futures = "0.3.26"
tracing-subscriber = "0.3.16"
quic-rpc = { path = "../../..", features = ["quinn-transport", "macros"] }
quic-rpc = { path = "../../..", features = ["quinn-transport", "macros", "test-utils"] }
quinn = { package = "iroh-quinn", version = "0.12" }
rcgen = "0.13"
rustls = { version = "0.23", default-features = false, features = ["ring"] }
tokio = { version = "1", features = ["full"] }
types = { path = "../types" }
28 changes: 5 additions & 23 deletions examples/split/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;

use async_stream::stream;
use futures::stream::{Stream, StreamExt};
use quic_rpc::{server::run_server_loop, transport::quinn::QuinnListener};
use quinn::{Endpoint, ServerConfig};
use quic_rpc::{
server::run_server_loop,
transport::quinn::{make_server_endpoint, QuinnListener},
};
use types::compute::*;

#[derive(Clone)]
Expand Down Expand Up @@ -71,23 +73,3 @@ async fn main() -> anyhow::Result<()> {
.await?;
Ok(())
}

fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec<u8>)> {
let (server_config, server_cert) = configure_server()?;
let endpoint = Endpoint::server(server_config, bind_addr)?;
Ok((endpoint, server_cert))
}

fn configure_server() -> anyhow::Result<(ServerConfig, Vec<u8>)> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
let cert_der = cert.cert.der();
let priv_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());
let cert_chain = vec![cert_der.clone()];

let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key.into())?;
Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_concurrent_uni_streams(0_u8.into());

Ok((server_config, cert_der.to_vec()))
}
2 changes: 1 addition & 1 deletion examples/split/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ edition = "2021"
[dependencies]
quic-rpc = { path = "../../..", features = ["macros"] }
serde = { version = "1", features = ["derive"] }
derive_more = { version = "1.0.0-beta.6", features = ["from", "try_into"] }
derive_more = { version = "1", features = ["from", "try_into"] }
12 changes: 6 additions & 6 deletions quic-rpc-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ description = "Macros for quic-rpc"
proc-macro = true

[dependencies]
syn = { version = "1.0", features = ["full"] }
quote = "1.0"
proc-macro2 = "1.0"
syn = { version = "1", features = ["full"] }
quote = "1"
proc-macro2 = "1"
quic-rpc = { version = "0.17", path = ".." }

[dev-dependencies]
derive_more = "1.0.0-beta.6"
serde = { version = "1.0.203", features = ["serde_derive"] }
trybuild = "1.0.96"
derive_more = { version = "1", features = ["from", "try_into", "display"] }
serde = { version = "1", features = ["serde_derive"] }
trybuild = "1.0"
28 changes: 25 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,34 @@ use crate::{
Connector, Service,
};

/// Type alias for a boxed connection to a specific service
///
/// This is a convenience type alias for a boxed connection to a specific service.
/// A boxed connector for the given [`Service`]
pub type BoxedConnector<S> =
crate::transport::boxed::BoxedConnector<<S as crate::Service>::Res, <S as crate::Service>::Req>;

#[cfg(feature = "flume-transport")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "flume-transport")))]
/// A flume connector for the given [`Service`]
pub type FlumeConnector<S> =
crate::transport::flume::FlumeConnector<<S as Service>::Res, <S as Service>::Req>;

#[cfg(feature = "quinn-transport")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))]
/// A quinn connector for the given [`Service`]
pub type QuinnConnector<S> =
crate::transport::quinn::QuinnConnector<<S as Service>::Res, <S as Service>::Req>;

#[cfg(feature = "hyper-transport")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "hyper-transport")))]
/// A hyper connector for the given [`Service`]
pub type HyperConnector<S> =
crate::transport::hyper::HyperConnector<<S as Service>::Res, <S as Service>::Req>;

#[cfg(feature = "iroh-transport")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))]
/// An iroh connector for the given [`Service`]
pub type IrohConnector<S> =
crate::transport::iroh::IrohConnector<<S as Service>::Res, <S as Service>::Req>;

/// Sync version of `future::stream::BoxStream`.
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

Expand Down
40 changes: 40 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@
//! # Ok(())
//! # }
//! ```
//!
//! # Features
#![doc = document_features::document_features!()]
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![cfg_attr(quicrpc_docsrs, feature(doc_cfg))]
use std::fmt::{Debug, Display};

use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -101,6 +105,7 @@ pub mod transport;
pub use client::RpcClient;
pub use server::RpcServer;
#[cfg(feature = "macros")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "macros")))]
mod macros;

pub mod pattern;
Expand Down Expand Up @@ -177,3 +182,38 @@ impl<T: transport::Connector<In = S::Res, Out = S::Req>, S: Service> Connector<S
pub trait Listener<S: Service>: transport::Listener<In = S::Req, Out = S::Res> {}

impl<T: transport::Listener<In = S::Req, Out = S::Res>, S: Service> Listener<S> for T {}

#[cfg(feature = "flume-transport")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "flume-transport")))]
/// Create a pair of [`RpcServer`] and [`RpcClient`] for the given [`Service`] type using a flume channel
pub fn flume_channel<S: Service>(
size: usize,
) -> (
RpcServer<S, server::FlumeListener<S>>,
RpcClient<S, client::FlumeConnector<S>>,
) {
let (listener, connector) = transport::flume::channel(size);
(RpcServer::new(listener), RpcClient::new(connector))
}

#[cfg(feature = "test-utils")]
#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "test-utils")))]
/// Create a pair of [`RpcServer`] and [`RpcClient`] for the given [`Service`] type using a quinn channel
///
/// This is using a network connection using the local network. It is useful for testing remote services
/// in a more realistic way than the memory transport.
#[allow(clippy::type_complexity)]
pub fn quinn_channel<S: Service>() -> anyhow::Result<(
RpcServer<S, server::QuinnListener<S>>,
RpcClient<S, client::QuinnConnector<S>>,
)> {
let bind_addr: std::net::SocketAddr = ([0, 0, 0, 0], 0).into();
let (server_endpoint, cert_der) = transport::quinn::make_server_endpoint(bind_addr)?;
let addr = server_endpoint.local_addr()?;
let server = server::QuinnListener::<S>::new(server_endpoint)?;
let server = RpcServer::new(server);
let client_endpoint = transport::quinn::make_client_endpoint(bind_addr, &[&cert_der])?;
let client = client::QuinnConnector::<S>::new(client_endpoint, addr, "localhost".into());
let client = RpcClient::new(client);
Ok((server, client))
}
Loading

0 comments on commit a0ee996

Please sign in to comment.