Skip to content

Commit

Permalink
refactor(iroh)!: Eliminate the type parameter for the rpc service type (
Browse files Browse the repository at this point in the history
#2405)

## Description

Eliminate the type parameter for the rpc service type in the builder.

We are going to need more type parameters if we want to make rpc generic
and extendable, so we need to make some room... :-)

~~todo: needs new minor version of quic-rpc to make boxed(...) public~~

## Breaking Changes

- Builder loses the `E` type parameter
- ProtocolBuilder loses the `E` type parameter
- rpc_endpoint takes a `boxed::ServerEndpoint`

## Notes & open questions

Note: we now require the user to box the rpc_endpoint if they want to do
a custom one. But since that is most likely a non-mem version it should
not have a performance impact. If you want a mem rpc controller, we
already have that...

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jun 27, 2024
1 parent d30ed19 commit 52c96ba
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 28 deletions.
80 changes: 56 additions & 24 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ use iroh_net::{
relay::RelayMode,
Endpoint,
};
use quic_rpc::{
transport::{
flume::FlumeServerEndpoint, misc::DummyServerEndpoint, quinn::QuinnServerEndpoint,
},
ServiceEndpoint,
use quic_rpc::transport::{
boxed::BoxableServerEndpoint, flume::FlumeServerEndpoint, quinn::QuinnServerEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
Expand Down Expand Up @@ -56,6 +53,11 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;

type BoxedServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>;

/// Storage backend for documents.
#[derive(Debug, Clone)]
pub enum DocsStorage {
Expand All @@ -81,15 +83,14 @@ pub enum DocsStorage {
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
/// using [`Node::shutdown`].
#[derive(derive_more::Debug)]
pub struct Builder<D, E = DummyServerEndpoint>
pub struct Builder<D>
where
D: Map,
E: ServiceEndpoint<RpcService>,
{
storage: StorageConfig,
bind_port: Option<u16>,
secret_key: SecretKey,
rpc_endpoint: E,
rpc_endpoint: BoxedServerEndpoint,
rpc_port: Option<u16>,
blobs_store: D,
keylog: bool,
Expand Down Expand Up @@ -146,6 +147,40 @@ impl From<Box<ConcurrentDiscovery>> for DiscoveryConfig {
}
}

/// A server endpoint that does nothing. Accept will never resolve.
///
/// This is used unless an external rpc endpoint is configured.
#[derive(Debug, Default)]
struct DummyServerEndpoint;

impl BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Response>
for DummyServerEndpoint
{
fn clone_box(
&self,
) -> Box<dyn BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Response>>
{
Box::new(DummyServerEndpoint)
}

fn accept_bi_boxed(
&self,
) -> quic_rpc::transport::boxed::AcceptFuture<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
> {
quic_rpc::transport::boxed::AcceptFuture::boxed(futures_lite::future::pending())
}

fn local_addr(&self) -> &[quic_rpc::transport::LocalAddr] {
&[]
}
}

fn mk_external_rpc() -> BoxedServerEndpoint {
quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint)
}

impl Default for Builder<iroh_blobs::store::mem::Store> {
fn default() -> Self {
Self {
Expand All @@ -156,7 +191,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
keylog: false,
relay_mode: RelayMode::Default,
dns_resolver: None,
rpc_endpoint: Default::default(),
rpc_endpoint: mk_external_rpc(),
rpc_port: None,
gc_policy: GcPolicy::Disabled,
docs_storage: DocsStorage::Memory,
Expand All @@ -183,7 +218,7 @@ impl<D: Map> Builder<D> {
keylog: false,
relay_mode: RelayMode::Default,
dns_resolver: None,
rpc_endpoint: Default::default(),
rpc_endpoint: mk_external_rpc(),
rpc_port: None,
gc_policy: GcPolicy::Disabled,
docs_storage,
Expand All @@ -195,16 +230,15 @@ impl<D: Map> Builder<D> {
}
}

impl<D, E> Builder<D, E>
impl<D> Builder<D>
where
D: BaoStore,
E: ServiceEndpoint<RpcService>,
{
/// Persist all node data in the provided directory.
pub async fn persist(
self,
root: impl AsRef<Path>,
) -> Result<Builder<iroh_blobs::store::fs::Store, E>> {
) -> Result<Builder<iroh_blobs::store::fs::Store>> {
let root = root.as_ref();
let blob_dir = IrohPaths::BaoStoreDir.with_root(root);

Expand Down Expand Up @@ -260,11 +294,7 @@ where
}

/// Configure rpc endpoint, changing the type of the builder to the new endpoint type.
pub fn rpc_endpoint<E2: ServiceEndpoint<RpcService>>(
self,
value: E2,
port: Option<u16>,
) -> Builder<D, E2> {
pub fn rpc_endpoint(self, value: BoxedServerEndpoint, port: Option<u16>) -> Builder<D> {
// we can't use ..self here because the return type is different
Builder {
storage: self.storage,
Expand All @@ -286,8 +316,9 @@ where
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc(self) -> Result<Builder<D, QuinnServerEndpoint<RpcService>>> {
pub async fn enable_rpc(self) -> Result<Builder<D>> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
let ep = quic_rpc::transport::boxed::ServerEndpoint::new(ep);
if let StorageConfig::Persistent(ref root) = self.storage {
// store rpc endpoint
RpcStatus::store(root, actual_rpc_port).await?;
Expand Down Expand Up @@ -416,7 +447,7 @@ where
///
/// Returns an [`ProtocolBuilder`], on which custom protocols can be registered with
/// [`ProtocolBuilder::accept`]. To spawn the node, call [`ProtocolBuilder::spawn`].
pub async fn build(self) -> Result<ProtocolBuilder<D, E>> {
pub async fn build(self) -> Result<ProtocolBuilder<D>> {
// Clone the blob store to shutdown in case of error.
let blobs_store = self.blobs_store.clone();
match self.build_inner().await {
Expand All @@ -428,7 +459,7 @@ where
}
}

async fn build_inner(self) -> Result<ProtocolBuilder<D, E>> {
async fn build_inner(self) -> Result<ProtocolBuilder<D>> {
trace!("building node");
let lp = LocalPoolHandle::new(num_cpus::get());
let endpoint = {
Expand Down Expand Up @@ -547,17 +578,18 @@ where
/// Note that RPC calls performed with client returned from [`Self::client`] will not complete
/// until the node is spawned.
#[derive(derive_more::Debug)]
pub struct ProtocolBuilder<D, E> {
pub struct ProtocolBuilder<D> {
inner: Arc<NodeInner<D>>,
internal_rpc: FlumeServerEndpoint<RpcService>,
external_rpc: E,
#[debug("external rpc")]
external_rpc: BoxedServerEndpoint,
protocols: ProtocolMap,
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
gc_policy: GcPolicy,
}

impl<D: iroh_blobs::store::Store, E: ServiceEndpoint<RpcService>> ProtocolBuilder<D, E> {
impl<D: iroh_blobs::store::Store> ProtocolBuilder<D> {
/// Registers a protocol handler for incoming connections.
///
/// Use this to register custom protocols onto the iroh node. Whenever a new connection for
Expand Down
3 changes: 1 addition & 2 deletions iroh/tests/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures_lite::FutureExt;
use iroh::node::{Builder, DocsStorage};
use iroh_base::node_addr::AddrInfoOptions;
use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::RngCore;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
Expand Down Expand Up @@ -39,7 +38,7 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result<quinn::Co
.context("failed to connect to provider")
}

fn test_node<D: Store>(db: D) -> Builder<D, DummyServerEndpoint> {
fn test_node<D: Store>(db: D) -> Builder<D> {
iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem)
.bind_port(0)
}
Expand Down
3 changes: 1 addition & 2 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use iroh::{
net::key::{PublicKey, SecretKey},
node::{Builder, Node},
};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::{CryptoRng, Rng, SeedableRng};
use tracing::{debug, error_span, info, Instrument};
use tracing_subscriber::{prelude::*, EnvFilter};
Expand All @@ -32,7 +31,7 @@ use iroh_net::relay::RelayMode;

const TIMEOUT: Duration = Duration::from_secs(60);

fn test_node(secret_key: SecretKey) -> Builder<iroh_blobs::store::mem::Store, DummyServerEndpoint> {
fn test_node(secret_key: SecretKey) -> Builder<iroh_blobs::store::mem::Store> {
Node::memory()
.secret_key(secret_key)
.relay_mode(RelayMode::Disabled)
Expand Down

0 comments on commit 52c96ba

Please sign in to comment.