diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 4b3c65617e..447598005d 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -1,13 +1,13 @@ use std::path::{Path, PathBuf}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{ensure, Context, Result}; use clap::Parser; -use tokio_util::task::LocalPoolHandle; +use iroh::client::quic::Iroh as IrohRpc; use crate::config::{ConsoleEnv, NodeConfig}; use self::blob::{BlobAddOptions, BlobSource}; -use self::rpc::{RpcCommands, RpcStatus}; +use self::rpc::RpcCommands; use self::start::RunType; pub(crate) mod author; @@ -84,14 +84,13 @@ pub(crate) enum Commands { } impl Cli { - pub(crate) async fn run(self, rt: LocalPoolHandle, data_dir: &Path) -> Result<()> { + pub(crate) async fn run(self, data_dir: &Path) -> Result<()> { match self.command { Commands::Console => { let env = ConsoleEnv::for_console(data_dir)?; if self.start { let config = NodeConfig::from_env(self.config.as_deref())?; start::run_with_command( - &rt, &config, data_dir, RunType::SingleCommandNoAbort, @@ -99,7 +98,7 @@ impl Cli { ) .await } else { - let iroh = iroh_quic_connect(data_dir).await.context("rpc connect")?; + let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?; console::run(&iroh, &env).await } } @@ -108,7 +107,6 @@ impl Cli { if self.start { let config = NodeConfig::from_env(self.config.as_deref())?; start::run_with_command( - &rt, &config, data_dir, RunType::SingleCommandAbortable, @@ -116,7 +114,7 @@ impl Cli { ) .await } else { - let iroh = iroh_quic_connect(data_dir).await.context("rpc connect")?; + let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?; command.run(&iroh, &env).await } } @@ -137,7 +135,6 @@ impl Cli { }); start::run_with_command( - &rt, &config, data_dir, RunType::UntilStopped, @@ -157,18 +154,3 @@ impl Cli { } } } - -async fn iroh_quic_connect(root: &Path) -> Result { - let rpc_status = RpcStatus::load(root).await?; - match rpc_status { - RpcStatus::Stopped => { - bail!("iroh is not running, please start it"); - } - RpcStatus::Running(rpc_port) => { - let iroh = iroh::client::quic::connect(rpc_port) - .await - .context("quic::connect")?; - Ok(iroh) - } - } -} diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index c56e2a0355..c04439d6b5 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -966,15 +966,14 @@ mod tests { let data_dir = tempfile::tempdir()?; - let lp = tokio_util::task::LocalPoolHandle::new(1); - let node = crate::commands::start::start_node(&lp, data_dir.path(), None).await?; + let node = crate::commands::start::start_node(data_dir.path(), None).await?; let client = node.client(); let doc = client.docs.create().await.context("doc create")?; let author = client.authors.create().await.context("author create")?; // set up command, getting iroh node let cli = ConsoleEnv::for_console(data_dir.path()).context("ConsoleEnv")?; - let iroh = crate::commands::iroh_quic_connect(data_dir.path()) + let iroh = iroh::client::quic::Iroh::connect(data_dir.path()) .await .context("rpc connect")?; diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index c255010325..036ecf08c7 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -1,11 +1,7 @@ -use std::path::Path; - -use anyhow::{ensure, Context, Result}; +use anyhow::Result; use clap::Subcommand; -use iroh::{client::Iroh, rpc_protocol::ProviderService, util::path::IrohPaths}; +use iroh::{client::Iroh, rpc_protocol::ProviderService}; use quic_rpc::ServiceConnection; -use tokio::{fs, io::AsyncReadExt}; -use tracing::trace; use crate::config::ConsoleEnv; @@ -75,95 +71,3 @@ impl RpcCommands { } } } - -/// The current status of the RPC endpoint. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum RpcStatus { - /// Stopped. - Stopped, - /// Running on this port. - Running(u16), -} - -impl RpcStatus { - pub async fn load(root: impl AsRef) -> Result { - let p = IrohPaths::RpcLock.with_root(root); - trace!("loading RPC lock: {}", p.display()); - - if p.exists() { - // Lock file exists, read the port and check if we can get a connection. - let mut file = fs::File::open(&p).await.context("open rpc lock file")?; - let file_len = file - .metadata() - .await - .context("reading rpc lock file metadata")? - .len(); - if file_len == 2 { - let mut buffer = [0u8; 2]; - file.read_exact(&mut buffer) - .await - .context("read rpc lock file")?; - let running_rpc_port = u16::from_le_bytes(buffer); - if iroh::client::quic::connect(running_rpc_port).await.is_ok() { - return Ok(RpcStatus::Running(running_rpc_port)); - } - } - - // invalid or outdated rpc lock file, delete - drop(file); - fs::remove_file(&p) - .await - .context("deleting rpc lock file")?; - Ok(RpcStatus::Stopped) - } else { - // No lock file, stopped - Ok(RpcStatus::Stopped) - } - } - - /// Store the current rpc status. - pub async fn store(root: impl AsRef, rpc_port: u16) -> Result<()> { - let p = IrohPaths::RpcLock.with_root(root); - trace!("storing RPC lock: {}", p.display()); - - ensure!(!p.exists(), "iroh is already running"); - if let Some(parent) = p.parent() { - fs::create_dir_all(parent) - .await - .context("creating parent dir")?; - } - fs::write(&p, &rpc_port.to_le_bytes()) - .await - .context("writing rpc lock file")?; - Ok(()) - } - - /// Cleans up an existing rpc lock - pub async fn clear(root: impl AsRef) -> Result<()> { - let p = IrohPaths::RpcLock.with_root(root); - trace!("clearing RPC lock: {}", p.display()); - - // ignore errors - tokio::fs::remove_file(&p).await.ok(); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_rpc_lock_file() { - let dir = testdir::testdir!(); - - let rpc_port = 7778; - RpcStatus::store(&dir, rpc_port).await.unwrap(); - let status = RpcStatus::load(&dir).await.unwrap(); - assert_eq!(status, RpcStatus::Stopped); - let p = IrohPaths::RpcLock.with_root(&dir); - let exists = fs::try_exists(&p).await.unwrap(); - assert!(!exists, "should be deleted as not running"); - } -} diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index 9448b6fe3c..e1cd6c12d6 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -1,35 +1,18 @@ -use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{net::SocketAddr, path::Path, time::Duration}; -use anyhow::{Context, Result}; +use anyhow::Result; use colored::Colorize; use futures::Future; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; -use iroh::net::{ - derp::{DerpMap, DerpMode}, - key::SecretKey, -}; +use iroh::node::Node; use iroh::{ - client::quic::RPC_ALPN, - node::Node, - rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, - util::{fs::load_secret_key, path::IrohPaths}, + net::derp::{DerpMap, DerpMode}, + node::RpcStatus, }; -use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint}; -use tokio_util::task::LocalPoolHandle; use tracing::{info_span, Instrument}; use crate::config::NodeConfig; -use super::rpc::RpcStatus; - -const DEFAULT_RPC_PORT: u16 = 0x1337; -const MAX_RPC_CONNECTIONS: u32 = 16; -const MAX_RPC_STREAMS: u32 = 1024; - /// Whether to stop the node after running a command or run forever until stopped. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum RunType { @@ -46,7 +29,6 @@ pub enum RunType { pub struct AlreadyRunningError(u16); pub async fn run_with_command( - rt: &LocalPoolHandle, config: &NodeConfig, iroh_data_root: &Path, run_type: RunType, @@ -58,7 +40,7 @@ where { let metrics_fut = start_metrics_server(config.metrics_addr); - let res = run_with_command_inner(rt, config, iroh_data_root, run_type, command).await; + let res = run_with_command_inner(config, iroh_data_root, run_type, command).await; if let Some(metrics_fut) = metrics_fut { metrics_fut.abort(); @@ -81,7 +63,6 @@ where } async fn run_with_command_inner( - rt: &LocalPoolHandle, config: &NodeConfig, iroh_data_root: &Path, run_type: RunType, @@ -94,14 +75,14 @@ where let derp_map = config.derp_map()?; let spinner = create_spinner("Iroh booting..."); - let node = start_node(rt, iroh_data_root, derp_map).await?; + let node = start_node(iroh_data_root, derp_map).await?; drop(spinner); eprintln!("{}", welcome_message(&node)?); let client = node.client(); - let mut command_task = rt.spawn_pinned(move || { + let mut command_task = node.local_pool_handle().spawn_pinned(move || { async move { match command(client).await { Err(err) => Err(err), @@ -141,56 +122,13 @@ where Ok(()) } -/// Migrate the flat store from v0 to v1. This can not be done in the store itself, since the -/// constructor of the store now only takes a single directory. -fn migrate_flat_store_v0_v1(iroh_data_root: PathBuf) -> anyhow::Result<()> { - let complete_v0 = iroh_data_root.join("blobs.v0"); - let partial_v0 = iroh_data_root.join("blobs-partial.v0"); - let meta_v0 = iroh_data_root.join("blobs-meta.v0"); - let complete_v1 = IrohPaths::BaoFlatStoreDir - .with_root(&iroh_data_root) - .join("complete"); - let partial_v1 = IrohPaths::BaoFlatStoreDir - .with_root(&iroh_data_root) - .join("partial"); - let meta_v1 = IrohPaths::BaoFlatStoreDir - .with_root(&iroh_data_root) - .join("meta"); - if complete_v0.exists() && !complete_v1.exists() { - tracing::info!( - "moving complete files from {} to {}", - complete_v0.display(), - complete_v1.display() - ); - std::fs::rename(complete_v0, complete_v1).context("migrating complete store failed")?; - } - if partial_v0.exists() && !partial_v1.exists() { - tracing::info!( - "moving partial files from {} to {}", - partial_v0.display(), - partial_v1.display() - ); - std::fs::rename(partial_v0, partial_v1).context("migrating partial store failed")?; - } - if meta_v0.exists() && !meta_v1.exists() { - tracing::info!( - "moving meta files from {} to {}", - meta_v0.display(), - meta_v1.display() - ); - std::fs::rename(meta_v0, meta_v1).context("migrating meta store failed")?; - } - Ok(()) -} - pub(crate) async fn start_node( - rt: &LocalPoolHandle, iroh_data_root: &Path, derp_map: Option, ) -> Result> { let rpc_status = RpcStatus::load(iroh_data_root).await?; match rpc_status { - RpcStatus::Running(port) => { + RpcStatus::Running { port, .. } => { return Err(AlreadyRunningError(port).into()); } RpcStatus::Stopped => { @@ -198,31 +136,16 @@ pub(crate) async fn start_node( } } - let blob_dir = IrohPaths::BaoFlatStoreDir.with_root(iroh_data_root); - let peers_data_path = IrohPaths::PeerData.with_root(iroh_data_root); - tokio::fs::create_dir_all(&blob_dir).await?; - let root = iroh_data_root.to_path_buf(); - tokio::task::spawn_blocking(|| migrate_flat_store_v0_v1(root)).await??; - let bao_store = iroh::bytes::store::flat::Store::load(&blob_dir) - .await - .with_context(|| format!("Failed to load iroh database from {}", blob_dir.display()))?; - let secret_key_path = Some(IrohPaths::SecretKey.with_root(iroh_data_root)); - let doc_store = - iroh::sync::store::fs::Store::new(IrohPaths::DocsDatabase.with_root(iroh_data_root))?; - - let secret_key = get_secret_key(secret_key_path).await?; - let rpc_endpoint = make_rpc_endpoint(&secret_key, DEFAULT_RPC_PORT, iroh_data_root).await?; let derp_mode = match derp_map { None => DerpMode::Default, Some(derp_map) => DerpMode::Custom(derp_map), }; - Node::builder(bao_store, doc_store) + Node::persistent(iroh_data_root) + .await? .derp_mode(derp_mode) - .peers_data_path(peers_data_path) - .local_pool(rt) - .rpc_endpoint(rpc_endpoint) - .secret_key(secret_key) + .enable_rpc() + .await? .spawn() .await } @@ -237,65 +160,6 @@ fn welcome_message(node: &Node) -> Result) -> Result { - match key { - Some(key_path) => load_secret_key(key_path).await, - None => { - // No path provided, just generate one - Ok(SecretKey::generate()) - } - } -} - -/// Makes a an RPC endpoint that uses a QUIC transport -async fn make_rpc_endpoint( - secret_key: &SecretKey, - rpc_port: u16, - iroh_data_root: &Path, -) -> Result> { - let rpc_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_port); - let mut transport_config = quinn::TransportConfig::default(); - transport_config - .max_concurrent_bidi_streams(MAX_RPC_STREAMS.into()) - .max_concurrent_uni_streams(0u32.into()); - let mut server_config = iroh::net::magic_endpoint::make_server_config( - secret_key, - vec![RPC_ALPN.to_vec()], - Some(transport_config), - false, - )?; - server_config.concurrent_connections(MAX_RPC_CONNECTIONS); - - let rpc_quinn_endpoint = quinn::Endpoint::server(server_config.clone(), rpc_addr.into()); - let rpc_quinn_endpoint = match rpc_quinn_endpoint { - Ok(ep) => ep, - Err(err) => { - if err.kind() == std::io::ErrorKind::AddrInUse { - tracing::warn!( - "RPC port {} already in use, switching to random port", - rpc_port - ); - // Use a random port - quinn::Endpoint::server( - server_config, - SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into(), - )? - } else { - return Err(err.into()); - } - } - }; - - let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port(); - let rpc_endpoint = - QuinnServerEndpoint::::new(rpc_quinn_endpoint)?; - - // store rpc endpoint - RpcStatus::store(iroh_data_root, actual_rpc_port).await?; - - Ok(rpc_endpoint) -} - /// Create a nice spinner. fn create_spinner(msg: &'static str) -> ProgressBar { let pb = ProgressBar::new_spinner(); @@ -331,6 +195,7 @@ pub fn start_metrics_server( mod tests { use super::*; use anyhow::bail; + use iroh::util::path::IrohPaths; #[tokio::test] async fn test_run_rpc_lock_file() -> Result<()> { @@ -340,15 +205,12 @@ mod tests { .join(IrohPaths::RpcLock.with_root(data_dir.path())); let data_dir_path = data_dir.path().to_path_buf(); - let rt1 = LocalPoolHandle::new(1); - let rt2 = LocalPoolHandle::new(1); let (ready_s, ready_r) = tokio::sync::oneshot::channel(); let (close_s, close_r) = tokio::sync::oneshot::channel(); // run the first start command, using channels to coordinate so we know when the node has fully booted up, and when we need to shut the node down let start = tokio::spawn(async move { run_with_command( - &rt1, &NodeConfig::default(), &data_dir_path, RunType::SingleCommandAbortable, @@ -381,7 +243,6 @@ mod tests { // run the second command, this should fail if run_with_command( - &rt2, &NodeConfig::default(), data_dir.path(), RunType::SingleCommandAbortable, diff --git a/iroh-cli/src/main.rs b/iroh-cli/src/main.rs index de10bcafef..bbc3a7ae53 100644 --- a/iroh-cli/src/main.rs +++ b/iroh-cli/src/main.rs @@ -24,7 +24,6 @@ fn main() -> Result<()> { } async fn main_impl() -> Result<()> { - let lp = tokio_util::task::LocalPoolHandle::new(num_cpus::get()); let data_dir = config::iroh_data_root()?; let cli = Cli::parse(); @@ -47,14 +46,14 @@ async fn main_impl() -> Result<()> { ) .with(EnvFilter::from_default_env()) .init(); - return cli.run(lp, &data_dir).await; + return cli.run(&data_dir).await; } tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) .with(EnvFilter::from_default_env()) .init(); - cli.run(lp, &data_dir).await + cli.run(&data_dir).await } /// Newtype for `ManuallyDrop` so we can impl a foreign trait. diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index 649bc1bd6e..fe60940a08 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -13,9 +13,7 @@ use tokio_stream::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - let db = iroh_bytes::store::mem::Store::new(); - let store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db.clone(), store).spawn().await?; + let node = Node::memory().spawn().await?; let client = node.client(); let doc = client.docs.create().await?; let author = client.authors.create().await?; diff --git a/iroh/examples/collection-fetch.rs b/iroh/examples/collection-fetch.rs index 99e11b3f57..5f76cbd3a8 100644 --- a/iroh/examples/collection-fetch.rs +++ b/iroh/examples/collection-fetch.rs @@ -8,7 +8,6 @@ use iroh::{client::BlobDownloadProgress, rpc_protocol::BlobDownloadRequest}; use iroh_bytes::BlobFormat; use std::env; use std::str::FromStr; -use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -35,17 +34,8 @@ async fn main() -> Result<()> { let ticket = iroh::ticket::BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example collection-provide`")?; - // create a new, empty in memory database - let db = iroh_bytes::store::mem::Store::default(); - // create an in-memory doc store (not used in the example) - let doc_store = iroh_sync::store::memory::Store::default(); - // create a new iroh runtime with 1 worker thread - let lp = LocalPoolHandle::new(1); // create a new node - let node = iroh::node::Node::builder(db, doc_store) - .local_pool(&lp) - .spawn() - .await?; + let node = iroh::node::Node::memory().spawn().await?; // create a client that allows us to interact with the running node let client = node.client(); diff --git a/iroh/examples/collection-provide.rs b/iroh/examples/collection-provide.rs index f59d054fb6..1f1edc63d7 100644 --- a/iroh/examples/collection-provide.rs +++ b/iroh/examples/collection-provide.rs @@ -6,8 +6,9 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run --example collection-provide -use iroh_bytes::{format::collection::Collection, BlobFormat, Hash}; -use tokio_util::task::LocalPoolHandle; +use bytes::Bytes; +use iroh::rpc_protocol::SetTagOption; +use iroh_bytes::{format::collection::Collection, BlobFormat}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -23,30 +24,40 @@ pub fn setup_logging() { async fn main() -> anyhow::Result<()> { setup_logging(); println!("\ncollection provide example!"); - // create a new database and add two blobs - let (mut db, names) = iroh_bytes::store::readonly_mem::Store::new([ - ("blob1", b"the first blob of bytes".to_vec()), - ("blob2", b"the second blob of bytes".to_vec()), - ]); - // create blobs from the data - let collection: Collection = names + // create a new node + // we must configure the iroh collection parser so the node understands iroh collections + let node = iroh::node::Node::memory().spawn().await?; + + // Add two blobs + let blob1 = node + .client() + .blobs + .add_bytes( + Bytes::from_static(b"the first blob of bytes"), + SetTagOption::Auto, + ) + .await?; + let blob2 = node + .client() + .blobs + .add_bytes( + Bytes::from_static(b"the second blob of bytes"), + SetTagOption::Auto, + ) + .await?; + + // Create blobs from the data + let collection: Collection = [("blob1", blob1.hash), ("blob2", blob2.hash)] .into_iter() - .map(|(name, hash)| (name, Hash::from(hash))) .collect(); - // create a collection and add it to the db as well - let hash = db.insert_many(collection.to_blobs()).unwrap(); - // create a new local pool handle with 1 worker thread - let lp = LocalPoolHandle::new(1); - - // create an in-memory doc store for iroh sync (not used here) - let doc_store = iroh_sync::store::memory::Store::default(); - // create a new node - // we must configure the iroh collection parser so the node understands iroh collections - let node = iroh::node::Node::builder(db, doc_store) - .local_pool(&lp) - .spawn() + // Create a collection + let (hash, _) = node + .client() + .blobs + .create_collection(collection, SetTagOption::Auto, Default::default()) .await?; + // create a ticket // tickets wrap all details needed to get a collection let ticket = node.ticket(hash, BlobFormat::HashSeq).await?; diff --git a/iroh/examples/hello-world-fetch.rs b/iroh/examples/hello-world-fetch.rs index 30cdb916ee..337d3900e4 100644 --- a/iroh/examples/hello-world-fetch.rs +++ b/iroh/examples/hello-world-fetch.rs @@ -8,7 +8,6 @@ use iroh::{client::BlobDownloadProgress, rpc_protocol::BlobDownloadRequest}; use iroh_bytes::BlobFormat; use std::env; use std::str::FromStr; -use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -35,17 +34,8 @@ async fn main() -> Result<()> { let ticket = iroh::ticket::BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`")?; - // create a new, empty in memory database - let db = iroh_bytes::store::mem::Store::default(); - // create an in-memory doc store (not used in the example) - let doc_store = iroh_sync::store::memory::Store::default(); - // create a new iroh runtime with 1 worker thread - let lp = LocalPoolHandle::new(1); // create a new node - let node = iroh::node::Node::builder(db, doc_store) - .local_pool(&lp) - .spawn() - .await?; + let node = iroh::node::Node::memory().spawn().await?; // create a client that allows us to interact with the running node let client = node.client(); diff --git a/iroh/examples/hello-world-provide.rs b/iroh/examples/hello-world-provide.rs index 525aab2817..3509909930 100644 --- a/iroh/examples/hello-world-provide.rs +++ b/iroh/examples/hello-world-provide.rs @@ -3,8 +3,9 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run --example hello-world-provide +use bytes::Bytes; +use iroh::rpc_protocol::SetTagOption; use iroh_bytes::BlobFormat; -use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -20,19 +21,18 @@ pub fn setup_logging() { async fn main() -> anyhow::Result<()> { setup_logging(); println!("'Hello World' provide example!"); - // create a new, empty in memory database - let mut db = iroh_bytes::store::readonly_mem::Store::default(); - // create an in-memory doc store (not used in the example) - let doc_store = iroh_sync::store::memory::Store::default(); - // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime - let lp = LocalPoolHandle::new(1); - // add some data and remember the hash - let hash = db.insert(b"Hello, world!"); + // create a new node - let node = iroh::node::Node::builder(db, doc_store) - .local_pool(&lp) - .spawn() - .await?; + let node = iroh::node::Node::memory().spawn().await?; + + // add some data and remember the hash + let hash = node + .client() + .blobs + .add_bytes(Bytes::from_static(b"Hello, world!"), SetTagOption::Auto) + .await? + .hash; + // create a ticket let ticket = node.ticket(hash, BlobFormat::Raw).await?; // print some info about the node diff --git a/iroh/examples/rpc.rs b/iroh/examples/rpc.rs index 6a9d4060bb..f92d4f1221 100644 --- a/iroh/examples/rpc.rs +++ b/iroh/examples/rpc.rs @@ -6,16 +6,10 @@ //! cargo as well: //! $ cargo run node stats //! The `node stats` command will reach out over RPC to the node constructed in the example -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use clap::Parser; -use iroh::rpc_protocol::ProviderService; -use iroh::rpc_protocol::{ProviderRequest, ProviderResponse}; +use iroh::node::StorageConfig; use iroh_bytes::store::Store; -use iroh_net::key::SecretKey; -use quic_rpc::transport::quinn::QuinnServerEndpoint; -use quic_rpc::ServiceEndpoint; -use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -27,46 +21,15 @@ pub fn setup_logging() { .ok(); } -const DEFAULT_RPC_PORT: u16 = 0x1337; -const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; - -/// Makes a an RPC endpoint that uses a QUIC transport -fn make_rpc_endpoint( - secret_key: &SecretKey, -) -> anyhow::Result> { - let rpc_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.max_concurrent_bidi_streams(8u32.into()); - let mut config = iroh_net::magic_endpoint::make_server_config( - secret_key, - vec![RPC_ALPN.to_vec()], - Some(transport_config), - false, - )?; - config.concurrent_connections(1024); - let rpc_quinn_endpoint = quinn::Endpoint::server(config, rpc_addr)?; - let rpc_endpoint = - QuinnServerEndpoint::::new(rpc_quinn_endpoint)?; - Ok(rpc_endpoint) -} - -async fn run(db: impl Store) -> anyhow::Result<()> { - // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime - let lp = LocalPoolHandle::new(1); - // create a random secret key - let secret_key = SecretKey::generate(); - // create a rpc endpoint - let rpc_endpoint = make_rpc_endpoint(&secret_key)?; - +async fn run(blobs_store: impl Store, config: StorageConfig) -> anyhow::Result<()> { + let docs_store = iroh_sync::store::memory::Store::default(); // create a new node - // we must configure the iroh collection parser so the node understands iroh collections - let doc_store = iroh_sync::store::memory::Store::default(); - let node = iroh::node::Node::builder(db, doc_store) - .secret_key(secret_key) - .local_pool(&lp) - .rpc_endpoint(rpc_endpoint) + let node = iroh::node::Builder::with_db_and_store(blobs_store, docs_store, config) + .enable_rpc() + .await? // enable the RPC endpoint .spawn() .await?; + // print some info about the node let peer = node.node_id(); let addrs = node.local_endpoint_addresses().await?; @@ -99,11 +62,11 @@ async fn main() -> anyhow::Result<()> { Some(path) => { tokio::fs::create_dir_all(&path).await?; let db = iroh_bytes::store::flat::Store::load(&path).await?; - run(db).await + run(db, StorageConfig::Persistent(path.into())).await } None => { let db = iroh_bytes::store::mem::Store::new(); - run(db).await + run(db, StorageConfig::Mem).await } } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index b5d18b545d..4a882cc86c 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -1354,19 +1354,12 @@ mod tests { use rand::RngCore; use tokio::io::AsyncWriteExt; - use tokio_util::task::LocalPoolHandle; #[tokio::test] async fn test_drop_doc_client_sync() -> Result<()> { let _guard = iroh_test::logging::setup(); - let db = iroh_bytes::store::readonly_mem::Store::default(); - let doc_store = iroh_sync::store::memory::Store::default(); - let lp = LocalPoolHandle::new(1); - let node = crate::node::Node::builder(db, doc_store) - .local_pool(&lp) - .spawn() - .await?; + let node = crate::node::Node::memory().spawn().await?; let client = node.client(); let doc = client.docs.create().await?; @@ -1387,9 +1380,7 @@ mod tests { async fn test_doc_import_export() -> Result<()> { let _guard = iroh_test::logging::setup(); - let doc_store = iroh_sync::store::memory::Store::default(); - let db = iroh_bytes::store::mem::Store::new(); - let node = crate::node::Node::builder(db, doc_store).spawn().await?; + let node = crate::node::Node::memory().spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1461,9 +1452,7 @@ mod tests { async fn test_blob_create_collection() -> Result<()> { let _guard = iroh_test::logging::setup(); - let doc_store = iroh_sync::store::memory::Store::default(); - let db = iroh_bytes::store::mem::Store::new(); - let node = crate::node::Node::builder(db, doc_store).spawn().await?; + let node = crate::node::Node::memory().spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1549,9 +1538,7 @@ mod tests { async fn test_blob_read_at() -> Result<()> { // let _guard = iroh_test::logging::setup(); - let doc_store = iroh_sync::store::memory::Store::default(); - let db = iroh_bytes::store::mem::Store::new(); - let node = crate::node::Node::builder(db, doc_store).spawn().await?; + let node = crate::node::Node::memory().spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1648,9 +1635,7 @@ mod tests { async fn test_blob_get_collection() -> Result<()> { let _guard = iroh_test::logging::setup(); - let doc_store = iroh_sync::store::memory::Store::default(); - let db = iroh_bytes::store::mem::Store::new(); - let node = crate::node::Node::builder(db, doc_store).spawn().await?; + let node = crate::node::Node::memory().spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1718,9 +1703,7 @@ mod tests { async fn test_blob_share() -> Result<()> { let _guard = iroh_test::logging::setup(); - let doc_store = iroh_sync::store::memory::Store::default(); - let db = iroh_bytes::store::mem::Store::new(); - let node = crate::node::Node::builder(db, doc_store).spawn().await?; + let node = crate::node::Node::memory().spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 1d0aec87da..066a858f59 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -2,14 +2,18 @@ use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + path::Path, sync::Arc, time::Duration, }; -use anyhow::Context; +use anyhow::{bail, Context}; use quic_rpc::transport::quinn::QuinnConnection; -use crate::rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService}; +use crate::{ + node::RpcStatus, + rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService}, +}; /// TODO: Change to "/iroh-rpc/1" pub const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; @@ -20,16 +24,23 @@ pub type RpcClient = /// Client to an iroh node running in a separate process. /// -/// This is obtained from [`connect`]. +/// This is obtained from [`Iroh::connect`]. pub type Iroh = super::Iroh>; /// RPC document client to an iroh node running in a separate process. pub type Doc = super::Doc>; -/// Connect to an iroh node running on the same computer, but in a different process. -pub async fn connect(rpc_port: u16) -> anyhow::Result { - let client = connect_raw(rpc_port).await?; - Ok(Iroh::new(client)) +impl Iroh { + /// Connect to an iroh node running on the same computer, but in a different process. + pub async fn connect(root: impl AsRef) -> anyhow::Result { + let rpc_status = RpcStatus::load(root).await?; + match rpc_status { + RpcStatus::Stopped => { + bail!("iroh is not running, please start it"); + } + RpcStatus::Running { client, .. } => Ok(Iroh::new(client)), + } + } } /// Create a raw RPC client to an iroh node running on the same computer, but in a different diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 4bae7ebd9b..b84ad28fa3 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -7,541 +7,43 @@ //! To shut down the node, call [`Node::shutdown`]. use std::future::Future; use std::net::SocketAddr; -use std::path::PathBuf; +use std::path::Path; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; -use std::time::Duration; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Result}; use futures::future::{BoxFuture, Shared}; -use futures::{FutureExt, StreamExt, TryFutureExt}; -use iroh_bytes::downloader::Downloader; -use iroh_bytes::store::{GcMarkEvent, GcSweepEvent, Map, ReadableStore, Store as BaoStore}; +use futures::{FutureExt, StreamExt}; +use iroh_bytes::store::ReadableStore; use iroh_bytes::BlobFormat; -use iroh_bytes::{protocol::Closed, Hash}; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_bytes::Hash; use iroh_net::derp::DerpUrl; -use iroh_net::magic_endpoint::get_alpn; use iroh_net::magicsock::LocalEndpointsStream; use iroh_net::util::AbortingJoinHandle; use iroh_net::{ - derp::DerpMode, key::{PublicKey, SecretKey}, MagicEndpoint, NodeAddr, }; use iroh_sync::store::Store as DocStore; use quic_rpc::transport::flume::FlumeConnection; -use quic_rpc::transport::misc::DummyServerEndpoint; -use quic_rpc::{RpcClient, RpcServer, ServiceEndpoint}; -use serde::{Deserialize, Serialize}; +use quic_rpc::RpcClient; use tokio::sync::{mpsc, RwLock}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; -use tracing::{debug, error, error_span, info, trace, warn, Instrument}; +use tracing::debug; -use crate::rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}; -use crate::sync_engine::{SyncEngine, SYNC_ALPN}; +use crate::rpc_protocol::{ProviderRequest, ProviderResponse}; +use crate::sync_engine::SyncEngine; use crate::ticket::BlobTicket; +mod builder; mod rpc; +mod rpc_status; -const MAX_CONNECTIONS: u32 = 1024; -const MAX_STREAMS: u64 = 10; -const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); - -/// Default bind address for the node. -/// 11204 is "iroh" in leetspeak -pub const DEFAULT_BIND_PORT: u16 = 11204; - -/// How long we wait at most for some endpoints to be discovered. -const ENDPOINT_WAIT: Duration = Duration::from_secs(5); - -/// Chunk size for getting blobs over RPC -const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; -/// Channel cap for getting blobs over RPC -const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; -/// Default interval between GC runs. -const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); - -/// Policy for garbage collection. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum GcPolicy { - /// Garbage collection is disabled. - Disabled, - /// Garbage collection is run at the given interval. - Interval(Duration), -} - -impl Default for GcPolicy { - fn default() -> Self { - Self::Interval(DEFAULT_GC_INTERVAL) - } -} - -/// Builder for the [`Node`]. -/// -/// You must supply a blob store and a document store. -/// -/// Blob store implementations are available in [`iroh_bytes::store`]. -/// Document store implementations are available in [`iroh_sync::store`]. -/// -/// Everything else is optional. -/// -/// Finally you can create and run the node by calling [`Builder::spawn`]. -/// -/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated -/// using [`Node::shutdown`]. -#[derive(Debug)] -pub struct Builder -where - D: Map, - S: DocStore, - E: ServiceEndpoint, -{ - bind_port: u16, - secret_key: SecretKey, - rpc_endpoint: E, - db: D, - keylog: bool, - derp_mode: DerpMode, - gc_policy: GcPolicy, - rt: Option, - docs: S, - /// Path to store peer data. If `None`, peer data will not be persisted. - peers_data_path: Option, -} - -const PROTOCOLS: [&[u8]; 3] = [&iroh_bytes::protocol::ALPN, GOSSIP_ALPN, SYNC_ALPN]; - -impl Builder { - /// Creates a new builder for [`Node`] using the given database. - fn with_db_and_store(db: D, docs: S) -> Self { - Self { - bind_port: DEFAULT_BIND_PORT, - secret_key: SecretKey::generate(), - db, - keylog: false, - derp_mode: DerpMode::Default, - rpc_endpoint: Default::default(), - gc_policy: GcPolicy::Disabled, - rt: None, - docs, - peers_data_path: None, - } - } -} - -impl Builder -where - D: BaoStore, - S: DocStore, - E: ServiceEndpoint, -{ - /// Configure rpc endpoint, changing the type of the builder to the new endpoint type. - pub fn rpc_endpoint>( - self, - value: E2, - ) -> Builder { - // we can't use ..self here because the return type is different - Builder { - bind_port: self.bind_port, - secret_key: self.secret_key, - db: self.db, - keylog: self.keylog, - rpc_endpoint: value, - derp_mode: self.derp_mode, - gc_policy: self.gc_policy, - rt: self.rt, - docs: self.docs, - peers_data_path: self.peers_data_path, - } - } - - /// Sets the garbage collection policy. - /// - /// By default garbage collection is disabled. - pub fn gc_policy(mut self, gc_policy: GcPolicy) -> Self { - self.gc_policy = gc_policy; - self - } - - /// Sets the DERP servers to assist in establishing connectivity. - /// - /// DERP servers are used to discover other nodes by [`PublicKey`] and also help - /// establish connections between peers by being an initial relay for traffic while - /// assisting in holepunching to establish a direct connection between peers. - /// - /// When using [DerpMode::Custom], the provided `derp_map` must contain at least one - /// configured derp node. If an invalid [`iroh_net::derp::DerpMap`] - /// is provided [`Self::spawn`] will result in an error. - pub fn derp_mode(mut self, dm: DerpMode) -> Self { - self.derp_mode = dm; - self - } - - /// Binds the node service to a different socket. - /// - /// By default it binds to `127.0.0.1:11204`. - pub fn bind_port(mut self, port: u16) -> Self { - self.bind_port = port; - self - } - - /// Uses the given [`SecretKey`] for the [`PublicKey`] instead of a newly generated one. - pub fn secret_key(mut self, secret_key: SecretKey) -> Self { - self.secret_key = secret_key; - self - } - - /// Whether to log the SSL pre-master key. - /// - /// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this - /// file will be used to log the SSL pre-master key. This is useful to inspect captured - /// traffic. - pub fn keylog(mut self, keylog: bool) -> Self { - self.keylog = keylog; - self - } - - /// Set the path where known peer data is loaded on start-up and later persisted. - pub fn peers_data_path(mut self, path: PathBuf) -> Self { - self.peers_data_path = Some(path); - self - } - - /// Sets the tokio runtime to use. - /// - /// If not set, the current runtime will be picked up. - pub fn local_pool(mut self, rt: &LocalPoolHandle) -> Self { - self.rt = Some(rt.clone()); - self - } - - /// Spawns the [`Node`] in a tokio task. - /// - /// This will create the underlying network server and spawn a tokio task accepting - /// connections. The returned [`Node`] can be used to control the task as well as - /// get information about it. - pub async fn spawn(self) -> Result> { - trace!("spawning node"); - let lp = self - .rt - .unwrap_or_else(|| LocalPoolHandle::new(num_cpus::get())); - // Initialize the metrics collection. - // - // The metrics are global per process. Subsequent calls do not change the metrics - // collection and will return an error. We ignore this error. This means that if you'd - // spawn multiple Iroh nodes in the same process, the metrics would be shared between the - // nodes. - #[cfg(feature = "metrics")] - crate::metrics::try_init_metrics_collection().ok(); - - let mut transport_config = quinn::TransportConfig::default(); - transport_config - .max_concurrent_bidi_streams(MAX_STREAMS.try_into()?) - .max_concurrent_uni_streams(0u32.into()); - - let endpoint = MagicEndpoint::builder() - .secret_key(self.secret_key.clone()) - .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) - .keylog(self.keylog) - .transport_config(transport_config) - .concurrent_connections(MAX_CONNECTIONS) - .derp_mode(self.derp_mode); - let endpoint = match self.peers_data_path { - Some(path) => endpoint.peers_data_path(path), - None => endpoint, - }; - let endpoint = endpoint.bind(self.bind_port).await?; - trace!("created quinn endpoint"); - - let (cb_sender, cb_receiver) = mpsc::channel(8); - let cancel_token = CancellationToken::new(); - - debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr()); - - let addr = endpoint.my_addr().await?; - - // initialize the gossip protocol - let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); - - // spawn the sync engine - let downloader = Downloader::new(self.db.clone(), endpoint.clone(), lp.clone()); - let ds = self.docs.clone(); - let sync = SyncEngine::spawn( - endpoint.clone(), - gossip.clone(), - self.docs, - self.db.clone(), - downloader, - ); - - let callbacks = Callbacks::default(); - let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy { - tracing::info!("Starting GC task with interval {:?}", gc_period); - let db = self.db.clone(); - let callbacks = callbacks.clone(); - let task = lp.spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks)); - Some(AbortingJoinHandle(task)) - } else { - None - }; - let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); - let inner = Arc::new(NodeInner { - db: self.db, - endpoint: endpoint.clone(), - secret_key: self.secret_key, - controller, - cancel_token, - callbacks: callbacks.clone(), - cb_sender, - gc_task, - rt: lp.clone(), - sync, - }); - let task = { - let gossip = gossip.clone(); - let handler = rpc::Handler { - inner: inner.clone(), - }; - let me = endpoint.node_id().fmt_short(); - let ep = endpoint.clone(); - tokio::task::spawn( - async move { - Self::run( - ep, - callbacks, - cb_receiver, - handler, - self.rpc_endpoint, - internal_rpc, - gossip, - ) - .await - } - .instrument(error_span!("node", %me)), - ) - }; - let node = Node { - inner, - task: task.map_err(Arc::new).boxed().shared(), - }; - - // spawn a task that updates the gossip endpoints. - // TODO: track task - let mut stream = endpoint.local_endpoints(); - tokio::task::spawn(async move { - while let Some(eps) = stream.next().await { - if let Err(err) = gossip.update_endpoints(&eps) { - warn!("Failed to update gossip endpoints: {err:?}"); - } - } - warn!("failed to retrieve local endpoints"); - }); - - // Wait for a single endpoint update, to make sure - // we found some endpoints - tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next()) - .await - .context("waiting for endpoint")? - .context("no endpoints")?; - - Ok(node) - } - - #[allow(clippy::too_many_arguments)] - async fn run( - server: MagicEndpoint, - callbacks: Callbacks, - mut cb_receiver: mpsc::Receiver, - handler: rpc::Handler, - rpc: E, - internal_rpc: impl ServiceEndpoint, - gossip: Gossip, - ) { - let rpc = RpcServer::new(rpc); - let internal_rpc = RpcServer::new(internal_rpc); - if let Ok((ipv4, ipv6)) = server.local_addr() { - debug!( - "listening at: {}{}", - ipv4, - ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default() - ); - } - let cancel_token = handler.inner.cancel_token.clone(); - - // forward our initial endpoints to the gossip protocol - // it may happen the the first endpoint update callback is missed because the gossip cell - // is only initialized once the endpoint is fully bound - if let Some(local_endpoints) = server.local_endpoints().next().await { - debug!(me = ?server.node_id(), "gossip initial update: {local_endpoints:?}"); - gossip.update_endpoints(&local_endpoints).ok(); - } - - loop { - tokio::select! { - biased; - _ = cancel_token.cancelled() => break, - // handle rpc requests. This will do nothing if rpc is not configured, since - // accept is just a pending future. - request = rpc.accept() => { - match request { - Ok((msg, chan)) => { - handler.handle_rpc_request(msg, chan); - } - Err(e) => { - info!("rpc request error: {:?}", e); - } - } - }, - // handle internal rpc requests. - request = internal_rpc.accept() => { - match request { - Ok((msg, chan)) => { - handler.handle_rpc_request(msg, chan); - } - Err(_) => { - info!("last controller dropped, shutting down"); - break; - } - } - }, - // handle incoming p2p connections - Some(mut connecting) = server.accept() => { - let alpn = match get_alpn(&mut connecting).await { - Ok(alpn) => alpn, - Err(err) => { - error!("invalid handshake: {:?}", err); - continue; - } - }; - let gossip = gossip.clone(); - let inner = handler.inner.clone(); - let sync = handler.inner.sync.clone(); - tokio::task::spawn(async move { - if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await { - warn!("Handling incoming connection ended with error: {err}"); - } - }); - }, - // Handle new callbacks - Some(cb) = cb_receiver.recv() => { - callbacks.push(cb).await; - } - else => break, - } - } - - // Closing the Endpoint is the equivalent of calling Connection::close on all - // connections: Operations will immediately fail with - // ConnectionError::LocallyClosed. All streams are interrupted, this is not - // graceful. - let error_code = Closed::ProviderTerminating; - server - .close(error_code.into(), error_code.reason()) - .await - .ok(); - } - - async fn gc_loop(db: D, ds: S, gc_period: Duration, callbacks: Callbacks) { - tracing::debug!("GC loop starting {:?}", gc_period); - 'outer: loop { - // do delay before the two phases of GC - tokio::time::sleep(gc_period).await; - tracing::debug!("Starting GC"); - callbacks - .send(Event::Db(iroh_bytes::store::Event::GcStarted)) - .await; - db.clear_live().await; - let doc_hashes = match ds.content_hashes() { - Ok(hashes) => hashes, - Err(err) => { - tracing::error!("Error getting doc hashes: {}", err); - continue 'outer; - } - }; - let mut doc_db_error = false; - let doc_hashes = doc_hashes.filter_map(|e| match e { - Ok(hash) => Some(hash), - Err(err) => { - tracing::error!("Error getting doc hash: {}", err); - doc_db_error = true; - None - } - }); - db.add_live(doc_hashes).await; - if doc_db_error { - tracing::error!("Error getting doc hashes, skipping GC to be safe"); - continue 'outer; - } - - tracing::debug!("Starting GC mark phase"); - let mut stream = db.gc_mark(None); - while let Some(item) = stream.next().await { - match item { - GcMarkEvent::CustomDebug(text) => { - tracing::debug!("{}", text); - } - GcMarkEvent::CustomWarning(text, _) => { - tracing::warn!("{}", text); - } - GcMarkEvent::Error(err) => { - tracing::error!("Fatal error during GC mark {}", err); - continue 'outer; - } - } - } - - tracing::debug!("Starting GC sweep phase"); - let mut stream = db.gc_sweep(); - while let Some(item) = stream.next().await { - match item { - GcSweepEvent::CustomDebug(text) => { - tracing::debug!("{}", text); - } - GcSweepEvent::CustomWarning(text, _) => { - tracing::warn!("{}", text); - } - GcSweepEvent::Error(err) => { - tracing::error!("Fatal error during GC mark {}", err); - continue 'outer; - } - } - } - callbacks - .send(Event::Db(iroh_bytes::store::Event::GcCompleted)) - .await; - } - } -} - -// TODO: Restructure this code to not take all these arguments. -#[allow(clippy::too_many_arguments)] -async fn handle_connection( - connecting: quinn::Connecting, - alpn: String, - node: Arc>, - gossip: Gossip, - sync: SyncEngine, -) -> Result<()> { - match alpn.as_bytes() { - GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, - SYNC_ALPN => sync.handle_connection(connecting).await?, - alpn if alpn == iroh_bytes::protocol::ALPN => { - iroh_bytes::provider::handle_connection( - connecting, - node.db.clone(), - node.callbacks.clone(), - node.rt.clone(), - ) - .await - } - _ => bail!("ignoring connection: unsupported ALPN protocol"), - } - Ok(()) -} +pub use builder::{Builder, GcPolicy, StorageConfig}; +pub use rpc_status::RpcStatus; type EventCallback = Box BoxFuture<'static, ()> + 'static + Sync + Send>; @@ -578,8 +80,8 @@ impl iroh_bytes::provider::EventSender for Callbacks { /// /// Clients can connect to this server and requests hashes from it. /// -/// The only way to create this is by using the [`Builder::spawn`]. [`Node::builder`] -/// is a shorthand to create a suitable [`Builder`]. +/// The only way to create this is by using the [`Builder::spawn`]. You can use [`Node::memory`] +/// or [`Node::persistent`] to create a suitable [`Builder`]. /// /// This runs a tokio task which can be aborted and joined if desired. To join the task /// await the [`Node`] struct directly, it will complete when the task completes. If @@ -616,14 +118,34 @@ pub enum Event { Db(iroh_bytes::store::Event), } -impl Node { - /// Returns a new builder for the [`Node`]. +/// In memory node. +pub type MemNode = Node; + +/// Persistent node. +pub type FsNode = Node; + +impl MemNode { + /// Returns a new builder for the [`Node`], by default configured to run in memory. /// - /// Once the done with the builder call [`Builder::spawn`] to create the node. - pub fn builder(bao_store: D, doc_store: S) -> Builder { - Builder::with_db_and_store(bao_store, doc_store) + /// Once done with the builder call [`Builder::spawn`] to create the node. + pub fn memory() -> Builder { + Builder::default() } +} +impl FsNode { + /// Returns a new builder for the [`Node`], configured to persist all data + /// from the given path. + /// + /// Once done with the builder call [`Builder::spawn`] to create the node. + pub async fn persistent( + root: impl AsRef, + ) -> Result> { + Builder::default().persist(root).await + } +} + +impl Node { /// Returns the [`MagicEndpoint`] of the node. /// /// This can be used to establish connections to other nodes under any @@ -684,6 +206,11 @@ impl Node { crate::client::Iroh::new(self.controller()) } + /// Returns a referenc to the used `LocalPoolHandle`. + pub fn local_pool_handle(&self) -> &LocalPoolHandle { + &self.inner.rt + } + /// Return a single token containing everything needed to get a hash. /// /// See [`BlobTicket`] for more details of how it can be used. @@ -743,10 +270,13 @@ impl NodeInner { #[cfg(all(test, feature = "flat-db"))] mod tests { - use anyhow::bail; + use std::path::Path; + use std::time::Duration; + + use anyhow::{bail, Context}; + use bytes::Bytes; use futures::StreamExt; use iroh_bytes::provider::AddProgress; - use std::path::Path; use crate::rpc_protocol::{BlobAddPathRequest, BlobAddPathResponse, SetTagOption, WrapOption}; @@ -756,16 +286,18 @@ mod tests { async fn test_ticket_multiple_addrs() { let _guard = iroh_test::logging::setup(); - let lp = LocalPoolHandle::new(1); - let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", b"hello")]); - let doc_store = iroh_sync::store::memory::Store::default(); - let hash = hashes["test"].into(); - let node = Node::builder(db, doc_store) - .bind_port(0) - .local_pool(&lp) - .spawn() + let node = Node::memory().spawn().await.unwrap(); + let hash = node + .client() + .blobs + .add_bytes( + Bytes::from_static(b"hello"), + SetTagOption::Named("test".into()), + ) .await - .unwrap(); + .unwrap() + .hash; + let _drop_guard = node.cancel_token().drop_guard(); let ticket = node.ticket(hash, BlobFormat::Raw).await.unwrap(); println!("addrs: {:?}", ticket.node_addr().info); @@ -777,13 +309,7 @@ mod tests { let _guard = iroh_test::logging::setup(); use std::io::Cursor; - let db = iroh_bytes::store::mem::Store::new(); - let doc_store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db, doc_store) - .bind_port(0) - .local_pool(&LocalPoolHandle::new(1)) - .spawn() - .await?; + let node = Node::memory().bind_port(0).spawn().await?; let _drop_guard = node.cancel_token().drop_guard(); let client = node.client(); @@ -801,9 +327,7 @@ mod tests { async fn test_node_add_tagged_blob_event() -> Result<()> { let _guard = iroh_test::logging::setup(); - let db = iroh_bytes::store::mem::Store::new(); - let doc_store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db, doc_store).bind_port(0).spawn().await?; + let node = Node::memory().bind_port(0).spawn().await?; let _drop_guard = node.cancel_token().drop_guard(); diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs new file mode 100644 index 0000000000..7408811a13 --- /dev/null +++ b/iroh/src/node/builder.rs @@ -0,0 +1,683 @@ +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; + +use anyhow::{bail, Context, Result}; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use iroh_base::key::SecretKey; +use iroh_bytes::{ + downloader::Downloader, + protocol::Closed, + store::{GcMarkEvent, GcSweepEvent, Map, Store as BaoStore}, +}; +use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_net::{derp::DerpMode, magic_endpoint::get_alpn, util::AbortingJoinHandle, MagicEndpoint}; +use iroh_sync::net::SYNC_ALPN; +use quic_rpc::{ + transport::{misc::DummyServerEndpoint, quinn::QuinnServerEndpoint}, + RpcServer, ServiceEndpoint, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tokio_util::{sync::CancellationToken, task::LocalPoolHandle}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument}; + +use crate::{ + client::quic::RPC_ALPN, + node::{Event, NodeInner}, + rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, + sync_engine::SyncEngine, + util::{fs::load_secret_key, path::IrohPaths}, +}; + +use super::{rpc, Callbacks, DocStore, EventCallback, Node, RpcStatus}; + +pub const PROTOCOLS: [&[u8]; 3] = [&iroh_bytes::protocol::ALPN, GOSSIP_ALPN, SYNC_ALPN]; + +/// Default bind address for the node. +/// 11204 is "iroh" in leetspeak +pub const DEFAULT_BIND_PORT: u16 = 11204; + +/// How long we wait at most for some endpoints to be discovered. +const ENDPOINT_WAIT: Duration = Duration::from_secs(5); + +/// Default interval between GC runs. +const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); + +const MAX_CONNECTIONS: u32 = 1024; +const MAX_STREAMS: u64 = 10; + +/// Builder for the [`Node`]. +/// +/// You must supply a blob store and a document store. +/// +/// Blob store implementations are available in [`iroh_bytes::store`]. +/// Document store implementations are available in [`iroh_sync::store`]. +/// +/// Everything else is optional. +/// +/// Finally you can create and run the node by calling [`Builder::spawn`]. +/// +/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated +/// using [`Node::shutdown`]. +#[derive(Debug)] +pub struct Builder +where + D: Map, + S: DocStore, + E: ServiceEndpoint, +{ + storage: StorageConfig, + bind_port: Option, + secret_key: SecretKey, + rpc_endpoint: E, + blobs_store: D, + keylog: bool, + derp_mode: DerpMode, + gc_policy: GcPolicy, + docs_store: S, +} + +/// Configuration for storage. +#[derive(Debug)] +pub enum StorageConfig { + /// In memory + Mem, + /// On disk persistet, at this location. + Persistent(PathBuf), +} + +impl Default for Builder { + fn default() -> Self { + Self { + storage: StorageConfig::Mem, + bind_port: None, + secret_key: SecretKey::generate(), + blobs_store: Default::default(), + keylog: false, + derp_mode: DerpMode::Default, + rpc_endpoint: Default::default(), + gc_policy: GcPolicy::Disabled, + docs_store: Default::default(), + } + } +} + +impl Builder { + /// Creates a new builder for [`Node`] using the given databases. + pub fn with_db_and_store(blobs_store: D, docs_store: S, storage: StorageConfig) -> Self { + Self { + storage, + bind_port: None, + secret_key: SecretKey::generate(), + blobs_store, + keylog: false, + derp_mode: DerpMode::Default, + rpc_endpoint: Default::default(), + gc_policy: GcPolicy::Disabled, + docs_store, + } + } +} + +impl Builder +where + D: BaoStore, + S: DocStore, + E: ServiceEndpoint, +{ + /// Persist all node data in the provided directory. + pub async fn persist( + self, + root: impl AsRef, + ) -> Result> { + let root = root.as_ref(); + let blob_dir = IrohPaths::BaoFlatStoreDir.with_root(root); + + tokio::fs::create_dir_all(&blob_dir).await?; + let root2 = root.to_path_buf(); + tokio::task::spawn_blocking(|| migrate_flat_store_v0_v1(root2)).await??; + let blobs_store = iroh_bytes::store::flat::Store::load(&blob_dir) + .await + .with_context(|| format!("Failed to load iroh database from {}", blob_dir.display()))?; + let docs_store = iroh_sync::store::fs::Store::new(IrohPaths::DocsDatabase.with_root(root))?; + + let secret_key_path = IrohPaths::SecretKey.with_root(root); + let secret_key = load_secret_key(secret_key_path).await?; + + Ok(Builder { + storage: StorageConfig::Persistent(root.into()), + bind_port: self.bind_port, + secret_key, + blobs_store, + keylog: self.keylog, + rpc_endpoint: self.rpc_endpoint, + derp_mode: self.derp_mode, + gc_policy: self.gc_policy, + docs_store, + }) + } + + /// Configure rpc endpoint, changing the type of the builder to the new endpoint type. + pub fn rpc_endpoint>( + self, + value: E2, + ) -> Builder { + // we can't use ..self here because the return type is different + Builder { + storage: self.storage, + bind_port: self.bind_port, + secret_key: self.secret_key, + blobs_store: self.blobs_store, + keylog: self.keylog, + rpc_endpoint: value, + derp_mode: self.derp_mode, + gc_policy: self.gc_policy, + docs_store: self.docs_store, + } + } + + /// Configure the default iroh rpc endpoint. + pub async fn enable_rpc( + self, + ) -> Result>> { + let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?; + if let StorageConfig::Persistent(ref root) = self.storage { + // store rpc endpoint + RpcStatus::store(root, actual_rpc_port).await?; + } + + Ok(Builder { + storage: self.storage, + bind_port: self.bind_port, + secret_key: self.secret_key, + blobs_store: self.blobs_store, + keylog: self.keylog, + rpc_endpoint: ep, + derp_mode: self.derp_mode, + gc_policy: self.gc_policy, + docs_store: self.docs_store, + }) + } + + /// Sets the garbage collection policy. + /// + /// By default garbage collection is disabled. + pub fn gc_policy(mut self, gc_policy: GcPolicy) -> Self { + self.gc_policy = gc_policy; + self + } + + /// Sets the DERP servers to assist in establishing connectivity. + /// + /// DERP servers are used to discover other nodes by `PublicKey` and also help + /// establish connections between peers by being an initial relay for traffic while + /// assisting in holepunching to establish a direct connection between peers. + /// + /// When using [DerpMode::Custom], the provided `derp_map` must contain at least one + /// configured derp node. If an invalid [`iroh_net::derp::DerpMap`] + /// is provided [`Self::spawn`] will result in an error. + pub fn derp_mode(mut self, dm: DerpMode) -> Self { + self.derp_mode = dm; + self + } + + /// Binds the node service to a different socket. + /// + /// By default it binds to `127.0.0.1:11204`. + pub fn bind_port(mut self, port: u16) -> Self { + self.bind_port.replace(port); + self + } + + /// Uses the given [`SecretKey`] for the `PublicKey` instead of a newly generated one. + pub fn secret_key(mut self, secret_key: SecretKey) -> Self { + self.secret_key = secret_key; + self + } + + /// Whether to log the SSL pre-master key. + /// + /// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this + /// file will be used to log the SSL pre-master key. This is useful to inspect captured + /// traffic. + pub fn keylog(mut self, keylog: bool) -> Self { + self.keylog = keylog; + self + } + + /// Spawns the [`Node`] in a tokio task. + /// + /// This will create the underlying network server and spawn a tokio task accepting + /// connections. The returned [`Node`] can be used to control the task as well as + /// get information about it. + pub async fn spawn(self) -> Result> { + trace!("spawning node"); + let lp = LocalPoolHandle::new(num_cpus::get()); + + // Initialize the metrics collection. + // + // The metrics are global per process. Subsequent calls do not change the metrics + // collection and will return an error. We ignore this error. This means that if you'd + // spawn multiple Iroh nodes in the same process, the metrics would be shared between the + // nodes. + #[cfg(feature = "metrics")] + crate::metrics::try_init_metrics_collection().ok(); + + let mut transport_config = quinn::TransportConfig::default(); + transport_config + .max_concurrent_bidi_streams(MAX_STREAMS.try_into()?) + .max_concurrent_uni_streams(0u32.into()); + + let endpoint = MagicEndpoint::builder() + .secret_key(self.secret_key.clone()) + .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) + .keylog(self.keylog) + .transport_config(transport_config) + .concurrent_connections(MAX_CONNECTIONS) + .derp_mode(self.derp_mode); + let endpoint = match self.storage { + StorageConfig::Persistent(ref root) => { + let peers_data_path = IrohPaths::PeerData.with_root(root); + endpoint.peers_data_path(peers_data_path) + } + StorageConfig::Mem => endpoint, + }; + let bind_port = self.bind_port.unwrap_or(DEFAULT_BIND_PORT); + let endpoint = endpoint.bind(bind_port).await?; + trace!("created quinn endpoint"); + + let (cb_sender, cb_receiver) = mpsc::channel(8); + let cancel_token = CancellationToken::new(); + + debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr()); + + let addr = endpoint.my_addr().await?; + + // initialize the gossip protocol + let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); + + // spawn the sync engine + let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); + let ds = self.docs_store.clone(); + let sync = SyncEngine::spawn( + endpoint.clone(), + gossip.clone(), + self.docs_store, + self.blobs_store.clone(), + downloader, + ); + + let callbacks = Callbacks::default(); + let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy { + tracing::info!("Starting GC task with interval {:?}", gc_period); + let db = self.blobs_store.clone(); + let callbacks = callbacks.clone(); + let task = lp.spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks)); + Some(AbortingJoinHandle(task)) + } else { + None + }; + let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); + let inner = Arc::new(NodeInner { + db: self.blobs_store, + endpoint: endpoint.clone(), + secret_key: self.secret_key, + controller, + cancel_token, + callbacks: callbacks.clone(), + cb_sender, + gc_task, + rt: lp.clone(), + sync, + }); + let task = { + let gossip = gossip.clone(); + let handler = rpc::Handler { + inner: inner.clone(), + }; + let me = endpoint.node_id().fmt_short(); + let ep = endpoint.clone(); + tokio::task::spawn( + async move { + Self::run( + ep, + callbacks, + cb_receiver, + handler, + self.rpc_endpoint, + internal_rpc, + gossip, + ) + .await + } + .instrument(error_span!("node", %me)), + ) + }; + let node = Node { + inner, + task: task.map_err(Arc::new).boxed().shared(), + }; + + // spawn a task that updates the gossip endpoints. + // TODO: track task + let mut stream = endpoint.local_endpoints(); + tokio::task::spawn(async move { + while let Some(eps) = stream.next().await { + if let Err(err) = gossip.update_endpoints(&eps) { + warn!("Failed to update gossip endpoints: {err:?}"); + } + } + warn!("failed to retrieve local endpoints"); + }); + + // Wait for a single endpoint update, to make sure + // we found some endpoints + tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next()) + .await + .context("waiting for endpoint")? + .context("no endpoints")?; + + Ok(node) + } + + #[allow(clippy::too_many_arguments)] + async fn run( + server: MagicEndpoint, + callbacks: Callbacks, + mut cb_receiver: mpsc::Receiver, + handler: rpc::Handler, + rpc: E, + internal_rpc: impl ServiceEndpoint, + gossip: Gossip, + ) { + let rpc = RpcServer::new(rpc); + let internal_rpc = RpcServer::new(internal_rpc); + if let Ok((ipv4, ipv6)) = server.local_addr() { + debug!( + "listening at: {}{}", + ipv4, + ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default() + ); + } + let cancel_token = handler.inner.cancel_token.clone(); + + // forward our initial endpoints to the gossip protocol + // it may happen the the first endpoint update callback is missed because the gossip cell + // is only initialized once the endpoint is fully bound + if let Some(local_endpoints) = server.local_endpoints().next().await { + debug!(me = ?server.node_id(), "gossip initial update: {local_endpoints:?}"); + gossip.update_endpoints(&local_endpoints).ok(); + } + + loop { + tokio::select! { + biased; + _ = cancel_token.cancelled() => break, + // handle rpc requests. This will do nothing if rpc is not configured, since + // accept is just a pending future. + request = rpc.accept() => { + match request { + Ok((msg, chan)) => { + handler.handle_rpc_request(msg, chan); + } + Err(e) => { + info!("rpc request error: {:?}", e); + } + } + }, + // handle internal rpc requests. + request = internal_rpc.accept() => { + match request { + Ok((msg, chan)) => { + handler.handle_rpc_request(msg, chan); + } + Err(_) => { + info!("last controller dropped, shutting down"); + break; + } + } + }, + // handle incoming p2p connections + Some(mut connecting) = server.accept() => { + let alpn = match get_alpn(&mut connecting).await { + Ok(alpn) => alpn, + Err(err) => { + error!("invalid handshake: {:?}", err); + continue; + } + }; + let gossip = gossip.clone(); + let inner = handler.inner.clone(); + let sync = handler.inner.sync.clone(); + tokio::task::spawn(async move { + if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await { + warn!("Handling incoming connection ended with error: {err}"); + } + }); + }, + // Handle new callbacks + Some(cb) = cb_receiver.recv() => { + callbacks.push(cb).await; + } + else => break, + } + } + + // Closing the Endpoint is the equivalent of calling Connection::close on all + // connections: Operations will immediately fail with + // ConnectionError::LocallyClosed. All streams are interrupted, this is not + // graceful. + let error_code = Closed::ProviderTerminating; + server + .close(error_code.into(), error_code.reason()) + .await + .ok(); + } + + async fn gc_loop(db: D, ds: S, gc_period: Duration, callbacks: Callbacks) { + tracing::debug!("GC loop starting {:?}", gc_period); + 'outer: loop { + // do delay before the two phases of GC + tokio::time::sleep(gc_period).await; + tracing::debug!("Starting GC"); + callbacks + .send(Event::Db(iroh_bytes::store::Event::GcStarted)) + .await; + db.clear_live().await; + let doc_hashes = match ds.content_hashes() { + Ok(hashes) => hashes, + Err(err) => { + tracing::error!("Error getting doc hashes: {}", err); + continue 'outer; + } + }; + let mut doc_db_error = false; + let doc_hashes = doc_hashes.filter_map(|e| match e { + Ok(hash) => Some(hash), + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + doc_db_error = true; + None + } + }); + db.add_live(doc_hashes).await; + if doc_db_error { + tracing::error!("Error getting doc hashes, skipping GC to be safe"); + continue 'outer; + } + + tracing::debug!("Starting GC mark phase"); + let mut stream = db.gc_mark(None); + while let Some(item) = stream.next().await { + match item { + GcMarkEvent::CustomDebug(text) => { + tracing::debug!("{}", text); + } + GcMarkEvent::CustomWarning(text, _) => { + tracing::warn!("{}", text); + } + GcMarkEvent::Error(err) => { + tracing::error!("Fatal error during GC mark {}", err); + continue 'outer; + } + } + } + + tracing::debug!("Starting GC sweep phase"); + let mut stream = db.gc_sweep(); + while let Some(item) = stream.next().await { + match item { + GcSweepEvent::CustomDebug(text) => { + tracing::debug!("{}", text); + } + GcSweepEvent::CustomWarning(text, _) => { + tracing::warn!("{}", text); + } + GcSweepEvent::Error(err) => { + tracing::error!("Fatal error during GC mark {}", err); + continue 'outer; + } + } + } + callbacks + .send(Event::Db(iroh_bytes::store::Event::GcCompleted)) + .await; + } + } +} + +/// Policy for garbage collection. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum GcPolicy { + /// Garbage collection is disabled. + Disabled, + /// Garbage collection is run at the given interval. + Interval(Duration), +} + +impl Default for GcPolicy { + fn default() -> Self { + Self::Interval(DEFAULT_GC_INTERVAL) + } +} + +// TODO: Restructure this code to not take all these arguments. +#[allow(clippy::too_many_arguments)] +async fn handle_connection( + connecting: quinn::Connecting, + alpn: String, + node: Arc>, + gossip: Gossip, + sync: SyncEngine, +) -> Result<()> { + match alpn.as_bytes() { + GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, + SYNC_ALPN => sync.handle_connection(connecting).await?, + alpn if alpn == iroh_bytes::protocol::ALPN => { + iroh_bytes::provider::handle_connection( + connecting, + node.db.clone(), + node.callbacks.clone(), + node.rt.clone(), + ) + .await + } + _ => bail!("ignoring connection: unsupported ALPN protocol"), + } + Ok(()) +} + +const DEFAULT_RPC_PORT: u16 = 0x1337; +const MAX_RPC_CONNECTIONS: u32 = 16; +const MAX_RPC_STREAMS: u32 = 1024; + +/// Makes a an RPC endpoint that uses a QUIC transport +fn make_rpc_endpoint( + secret_key: &SecretKey, + rpc_port: u16, +) -> Result<(QuinnServerEndpoint, u16)> { + let rpc_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_port); + let mut transport_config = quinn::TransportConfig::default(); + transport_config + .max_concurrent_bidi_streams(MAX_RPC_STREAMS.into()) + .max_concurrent_uni_streams(0u32.into()); + let mut server_config = iroh_net::magic_endpoint::make_server_config( + secret_key, + vec![RPC_ALPN.to_vec()], + Some(transport_config), + false, + )?; + server_config.concurrent_connections(MAX_RPC_CONNECTIONS); + + let rpc_quinn_endpoint = quinn::Endpoint::server(server_config.clone(), rpc_addr.into()); + let rpc_quinn_endpoint = match rpc_quinn_endpoint { + Ok(ep) => ep, + Err(err) => { + if err.kind() == std::io::ErrorKind::AddrInUse { + tracing::warn!( + "RPC port {} already in use, switching to random port", + rpc_port + ); + // Use a random port + quinn::Endpoint::server( + server_config, + SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into(), + )? + } else { + return Err(err.into()); + } + } + }; + + let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port(); + let rpc_endpoint = + QuinnServerEndpoint::::new(rpc_quinn_endpoint)?; + + Ok((rpc_endpoint, actual_rpc_port)) +} + +/// Migrate the flat store from v0 to v1. This can not be done in the store itself, since the +/// constructor of the store now only takes a single directory. +fn migrate_flat_store_v0_v1(iroh_data_root: PathBuf) -> anyhow::Result<()> { + let complete_v0 = iroh_data_root.join("blobs.v0"); + let partial_v0 = iroh_data_root.join("blobs-partial.v0"); + let meta_v0 = iroh_data_root.join("blobs-meta.v0"); + let complete_v1 = IrohPaths::BaoFlatStoreDir + .with_root(&iroh_data_root) + .join("complete"); + let partial_v1 = IrohPaths::BaoFlatStoreDir + .with_root(&iroh_data_root) + .join("partial"); + let meta_v1 = IrohPaths::BaoFlatStoreDir + .with_root(&iroh_data_root) + .join("meta"); + if complete_v0.exists() && !complete_v1.exists() { + tracing::info!( + "moving complete files from {} to {}", + complete_v0.display(), + complete_v1.display() + ); + std::fs::rename(complete_v0, complete_v1).context("migrating complete store failed")?; + } + if partial_v0.exists() && !partial_v1.exists() { + tracing::info!( + "moving partial files from {} to {}", + partial_v0.display(), + partial_v1.display() + ); + std::fs::rename(partial_v0, partial_v1).context("migrating partial store failed")?; + } + if meta_v0.exists() && !meta_v1.exists() { + tracing::info!( + "moving meta files from {} to {}", + meta_v0.display(), + meta_v1.display() + ); + std::fs::rename(meta_v0, meta_v1).context("migrating meta store failed")?; + } + Ok(()) +} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index c301cae124..ab3b58e638 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::io; use std::sync::{Arc, Mutex}; +use std::time::Duration; use anyhow::{anyhow, Result}; use futures::{Future, FutureExt, Stream, StreamExt}; @@ -28,7 +29,6 @@ use tokio::sync::mpsc; use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; -use crate::node::{Event, RPC_BLOB_GET_CHANNEL_CAP, RPC_BLOB_GET_CHUNK_SIZE}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, @@ -44,7 +44,13 @@ use crate::rpc_protocol::{ ProviderService, SetTagOption, }; -use super::{NodeInner, HEALTH_POLL_WAIT}; +use super::{Event, NodeInner}; + +const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); +/// Chunk size for getting blobs over RPC +const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; +/// Channel cap for getting blobs over RPC +const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; #[derive(Debug, Clone)] pub(crate) struct Handler { diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs new file mode 100644 index 0000000000..f5ea1b0981 --- /dev/null +++ b/iroh/src/node/rpc_status.rs @@ -0,0 +1,108 @@ +use std::path::Path; + +use anyhow::{ensure, Context, Result}; +use tokio::{fs, io::AsyncReadExt}; +use tracing::trace; + +use crate::util::path::IrohPaths; + +/// The current status of the RPC endpoint. +#[derive(Debug, Clone)] +pub enum RpcStatus { + /// Stopped. + Stopped, + /// Running on this port. + Running { + /// The port we are connected on. + port: u16, + /// Actual connected RPC client. + client: crate::client::quic::RpcClient, + }, +} + +impl RpcStatus { + /// Load the current RPC status from the given location. + pub async fn load(root: impl AsRef) -> Result { + let p = IrohPaths::RpcLock.with_root(root); + trace!("loading RPC lock: {}", p.display()); + + if p.exists() { + // Lock file exists, read the port and check if we can get a connection. + let mut file = fs::File::open(&p).await.context("open rpc lock file")?; + let file_len = file + .metadata() + .await + .context("reading rpc lock file metadata")? + .len(); + if file_len == 2 { + let mut buffer = [0u8; 2]; + file.read_exact(&mut buffer) + .await + .context("read rpc lock file")?; + let running_rpc_port = u16::from_le_bytes(buffer); + if let Ok(client) = crate::client::quic::connect_raw(running_rpc_port).await { + return Ok(RpcStatus::Running { + port: running_rpc_port, + client, + }); + } + } + + // invalid or outdated rpc lock file, delete + drop(file); + fs::remove_file(&p) + .await + .context("deleting rpc lock file")?; + Ok(RpcStatus::Stopped) + } else { + // No lock file, stopped + Ok(RpcStatus::Stopped) + } + } + + /// Store the current rpc status. + pub async fn store(root: impl AsRef, rpc_port: u16) -> Result<()> { + let p = IrohPaths::RpcLock.with_root(root); + trace!("storing RPC lock: {}", p.display()); + + ensure!(!p.exists(), "iroh is already running"); + if let Some(parent) = p.parent() { + fs::create_dir_all(parent) + .await + .context("creating parent dir")?; + } + fs::write(&p, &rpc_port.to_le_bytes()) + .await + .context("writing rpc lock file")?; + Ok(()) + } + + /// Cleans up an existing rpc lock + pub async fn clear(root: impl AsRef) -> Result<()> { + let p = IrohPaths::RpcLock.with_root(root); + trace!("clearing RPC lock: {}", p.display()); + + // ignore errors + tokio::fs::remove_file(&p).await.ok(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_rpc_lock_file() { + let dir = testdir::testdir!(); + + let rpc_port = 7778; + RpcStatus::store(&dir, rpc_port).await.unwrap(); + let status = RpcStatus::load(&dir).await.unwrap(); + assert!(matches!(status, RpcStatus::Stopped)); + let p = IrohPaths::RpcLock.with_root(&dir); + let exists = fs::try_exists(&p).await.unwrap(); + assert!(!exists, "should be deleted as not running"); + } +} diff --git a/iroh/src/util/fs.rs b/iroh/src/util/fs.rs index 63059ff238..e4a1b4a6b0 100644 --- a/iroh/src/util/fs.rs +++ b/iroh/src/util/fs.rs @@ -119,7 +119,8 @@ pub fn relative_canonicalized_path_to_string(path: impl AsRef) -> anyhow:: canonicalized_path_to_string(path, true) } -/// Loads a [`SecretKey`] from the provided file. +/// Loads a [`SecretKey`] from the provided file, or stores a newly generated one +/// at the given location. pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result { if key_path.exists() { let keystr = tokio::fs::read(key_path).await?; diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index 7bd09d7119..2f1bb2cf0a 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Result; use bytes::Bytes; use futures::FutureExt; -use iroh::node::Node; +use iroh::node::{self, Node}; use rand::RngCore; use iroh_bytes::{ @@ -12,7 +12,6 @@ use iroh_bytes::{ util::Tag, BlobFormat, HashAndFormat, }; -use tokio_util::task::LocalPoolHandle; fn create_test_data(n: usize) -> Bytes { let mut rng = rand::thread_rng(); @@ -27,9 +26,8 @@ where S: iroh_bytes::store::Store, { let doc_store = iroh_sync::store::memory::Store::default(); - Node::builder(bao_store, doc_store) + node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .local_pool(&LocalPoolHandle::new(1)) .spawn() .await .unwrap() diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index d1bd03a5b2..e849065261 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -10,7 +10,7 @@ use bytes::Bytes; use futures::FutureExt; use iroh::{ dial::Options, - node::{Builder, Event, Node}, + node::{Builder, Event}, }; use iroh_net::{key::SecretKey, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -31,36 +31,23 @@ use iroh_bytes::{ BlobFormat, Hash, }; use iroh_sync::store; -use tokio_util::task::LocalPoolHandle; - -/// Pick up the tokio runtime from the thread local and add a -/// thread per core runtime. -fn test_local_pool() -> LocalPoolHandle { - LocalPoolHandle::new(1) -} fn test_node(db: D) -> Builder { let store = iroh_sync::store::memory::Store::default(); - Node::builder(db, store).bind_port(0) + iroh::node::Builder::with_db_and_store(db, store, iroh::node::StorageConfig::Mem).bind_port(0) } #[tokio::test] #[ignore = "flaky"] async fn basics() -> Result<()> { let _guard = iroh_test::logging::setup(); - let lp = test_local_pool(); - transfer_data( - vec![("hello_world", "hello world!".as_bytes().to_vec())], - &lp, - ) - .await + transfer_data(vec![("hello_world", "hello world!".as_bytes().to_vec())]).await } #[tokio::test] #[ignore = "flaky"] async fn multi_file() -> Result<()> { let _guard = iroh_test::logging::setup(); - let lp = test_local_pool(); let file_opts = vec![ ("1", 10), @@ -69,14 +56,13 @@ async fn multi_file() -> Result<()> { // overkill, but it works! Just annoying to wait for // ("4", 1024 * 1024 * 90), ]; - transfer_random_data(file_opts, &lp).await + transfer_random_data(file_opts).await } #[tokio::test] #[ignore = "flaky"] async fn many_files() -> Result<()> { let _guard = iroh_test::logging::setup(); - let lp = test_local_pool(); let num_files = [10, 100]; for num in num_files { println!("NUM_FILES: {num}"); @@ -87,7 +73,7 @@ async fn many_files() -> Result<()> { (name, 10) }) .collect(); - transfer_random_data(file_opts, &lp).await?; + transfer_random_data(file_opts).await?; } Ok(()) } @@ -96,7 +82,6 @@ async fn many_files() -> Result<()> { #[ignore = "flaky"] async fn sizes() -> Result<()> { let _guard = iroh_test::logging::setup(); - let lp = test_local_pool(); let sizes = [ 0, @@ -112,7 +97,7 @@ async fn sizes() -> Result<()> { for size in sizes { let now = Instant::now(); - transfer_random_data(vec![("hello_world", size)], &lp).await?; + transfer_random_data(vec![("hello_world", size)]).await?; println!(" took {}ms", now.elapsed().as_millis()); } @@ -122,7 +107,6 @@ async fn sizes() -> Result<()> { #[tokio::test] #[ignore = "flaky"] async fn empty_files() -> Result<()> { - let lp = test_local_pool(); // try to transfer as many files as possible without hitting a limit // booo 400 is too small :( let num_files = 400; @@ -130,7 +114,7 @@ async fn empty_files() -> Result<()> { for i in 0..num_files { file_opts.push((i.to_string(), 0)); } - transfer_random_data(file_opts, &lp).await + transfer_random_data(file_opts).await } /// Create new get options with the given node id and addresses, using a @@ -159,8 +143,7 @@ async fn multiple_clients() -> Result<()> { let expect_name = "hello_world"; let collection = Collection::from_iter([(expect_name, expect_hash)]); let hash = db.insert_many(collection.to_blobs()).unwrap(); - let lp = test_local_pool(); - let node = test_node(db).local_pool(&lp).spawn().await?; + let node = test_node(db).spawn().await?; let mut tasks = Vec::new(); for _i in 0..3 { let file_hash: Hash = expect_hash; @@ -169,7 +152,7 @@ async fn multiple_clients() -> Result<()> { let peer_id = node.node_id(); let content = content.to_vec(); - tasks.push(lp.spawn_pinned(move || { + tasks.push(node.local_pool_handle().spawn_pinned(move || { async move { let opts = get_options(peer_id, addrs); let expected_data = &content; @@ -193,7 +176,7 @@ async fn multiple_clients() -> Result<()> { // Run the test creating random data for each blob, using the size specified by the file // options -async fn transfer_random_data(file_opts: Vec<(S, usize)>, rt: &LocalPoolHandle) -> Result<()> +async fn transfer_random_data(file_opts: Vec<(S, usize)>) -> Result<()> where S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, { @@ -205,11 +188,11 @@ where (name, content) }) .collect(); - transfer_data(file_opts, rt).await + transfer_data(file_opts).await } // Run the test for a vec of filenames and blob data -async fn transfer_data(file_opts: Vec<(S, Vec)>, rt: &LocalPoolHandle) -> Result<()> +async fn transfer_data(file_opts: Vec<(S, Vec)>) -> Result<()> where S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, { @@ -236,7 +219,7 @@ where let collection_orig = Collection::from_iter(blobs); let collection_hash = mdb.insert_many(collection_orig.to_blobs()).unwrap(); - let node = test_node(mdb.clone()).local_pool(rt).spawn().await?; + let node = test_node(mdb.clone()).spawn().await?; let (events_sender, mut events_recv) = mpsc::unbounded_channel(); @@ -326,14 +309,13 @@ fn assert_events(events: Vec, num_blobs: usize) { #[tokio::test] async fn test_server_close() { - let lp = test_local_pool(); // Prepare a Provider transferring a file. let _guard = iroh_test::logging::setup(); let mut db = iroh_bytes::store::readonly_mem::Store::default(); let child_hash = db.insert(b"hello there"); let collection = Collection::from_iter([("hello", child_hash)]); let hash = db.insert_many(collection.to_blobs()).unwrap(); - let mut node = test_node(db).local_pool(&lp).spawn().await.unwrap(); + let mut node = test_node(db).spawn().await.unwrap(); let node_addr = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); @@ -393,10 +375,9 @@ fn create_test_db( #[ignore = "flaky"] async fn test_ipv6() { let _guard = iroh_test::logging::setup(); - let lp = test_local_pool(); let (db, hash) = create_test_db([("test", b"hello")]); - let node = match test_node(db).local_pool(&lp).spawn().await { + let node = match test_node(db).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -421,11 +402,10 @@ async fn test_ipv6() { #[ignore = "flaky"] async fn test_not_found() { let _ = iroh_test::logging::setup(); - let lp = test_local_pool(); let db = iroh_bytes::store::readonly_mem::Store::default(); let hash = blake3::hash(b"hello").into(); - let node = match test_node(db).local_pool(&lp).spawn().await { + let node = match test_node(db).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -463,13 +443,12 @@ async fn test_not_found() { #[ignore = "flaky"] async fn test_chunk_not_found_1() { let _ = iroh_test::logging::setup(); - let lp = test_local_pool(); let db = iroh_bytes::store::mem::Store::new(); let data = (0..1024 * 64).map(|i| i as u8).collect::>(); let hash = blake3::hash(&data).into(); let _entry = db.get_or_create(hash, data.len() as u64).await.unwrap(); - let node = match test_node(db).local_pool(&lp).spawn().await { + let node = match test_node(db).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -504,9 +483,8 @@ async fn test_chunk_not_found_1() { #[tokio::test] async fn test_run_ticket() { - let lp = test_local_pool(); let (db, hash) = create_test_db([("test", b"hello")]); - let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); + let node = test_node(db).spawn().await.unwrap(); let _drop_guard = node.cancel_token().drop_guard(); let ticket = node.ticket(hash, BlobFormat::HashSeq).await.unwrap(); @@ -556,9 +534,8 @@ async fn run_collection_get_request( #[tokio::test] #[ignore = "flaky"] async fn test_run_fsm() { - let lp = test_local_pool(); let (db, hash) = create_test_db([("a", b"hello"), ("b", b"world")]); - let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); + let node = test_node(db).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { @@ -602,12 +579,11 @@ fn make_test_data(n: usize) -> Vec { /// The verified last chunk also verifies the size. #[tokio::test] async fn test_size_request_blob() { - let lp = test_local_pool(); let expected = make_test_data(1024 * 64 + 1234); let last_chunk = last_chunk(&expected); let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", &expected)]); let hash = Hash::from(*hashes.values().next().unwrap()); - let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); + let node = test_node(db).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { @@ -631,11 +607,10 @@ async fn test_size_request_blob() { #[tokio::test] #[ignore = "flaky"] async fn test_collection_stat() { - let lp = test_local_pool(); let child1 = make_test_data(123456); let child2 = make_test_data(345678); let (db, hash) = create_test_db([("a", &child1), ("b", &child2)]); - let node = test_node(db.clone()).local_pool(&lp).spawn().await.unwrap(); + let node = test_node(db.clone()).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index de3bed7186..7683ee3193 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -16,7 +16,6 @@ use iroh::{ use iroh_net::key::{PublicKey, SecretKey}; use quic_rpc::transport::misc::DummyServerEndpoint; use rand::{CryptoRng, Rng, SeedableRng}; -use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -32,10 +31,7 @@ const TIMEOUT: Duration = Duration::from_secs(60); fn test_node( secret_key: SecretKey, ) -> Builder { - let db = iroh_bytes::store::mem::Store::new(); - let store = iroh_sync::store::memory::Store::default(); - Node::builder(db, store) - .local_pool(&LocalPoolHandle::new(1)) + Node::memory() .secret_key(secret_key) .derp_mode(DerpMode::Disabled) } @@ -859,9 +855,7 @@ impl PartialEq for (Entry, Bytes) { #[tokio::test] async fn doc_delete() -> Result<()> { - let db = iroh_bytes::store::mem::Store::new(); - let store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db, store) + let node = Node::memory() .gc_policy(iroh::node::GcPolicy::Interval(Duration::from_millis(100))) .spawn() .await?;