Skip to content

Commit

Permalink
feat(iroh): improved node builder (#2087)
Browse files Browse the repository at this point in the history
- Self sufficient `Node::memory()` for in memory configuration
- Self sufficient `Node::persistent(root)` for in persistent
configuration (including secret key)
- Easy way to enable rpc endpoint with `Builder::enable_rpc()`
- Remove manual setting of `local_pool_handle`
- Add type aliases for the main node configurations
  • Loading branch information
dignifiedquire authored Mar 18, 2024
1 parent e7690b9 commit 2364329
Show file tree
Hide file tree
Showing 21 changed files with 997 additions and 1,017 deletions.
30 changes: 6 additions & 24 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::path::{Path, PathBuf};

use anyhow::{bail, ensure, Context, Result};
use anyhow::{ensure, Context, Result};
use clap::Parser;
use tokio_util::task::LocalPoolHandle;
use iroh::client::quic::Iroh as IrohRpc;

use crate::config::{ConsoleEnv, NodeConfig};

use self::blob::{BlobAddOptions, BlobSource};
use self::rpc::{RpcCommands, RpcStatus};
use self::rpc::RpcCommands;
use self::start::RunType;

pub(crate) mod author;
Expand Down Expand Up @@ -84,22 +84,21 @@ pub(crate) enum Commands {
}

impl Cli {
pub(crate) async fn run(self, rt: LocalPoolHandle, data_dir: &Path) -> Result<()> {
pub(crate) async fn run(self, data_dir: &Path) -> Result<()> {
match self.command {
Commands::Console => {
let env = ConsoleEnv::for_console(data_dir)?;
if self.start {
let config = NodeConfig::from_env(self.config.as_deref())?;
start::run_with_command(
&rt,
&config,
data_dir,
RunType::SingleCommandNoAbort,
|iroh| async move { console::run(&iroh, &env).await },
)
.await
} else {
let iroh = iroh_quic_connect(data_dir).await.context("rpc connect")?;
let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?;
console::run(&iroh, &env).await
}
}
Expand All @@ -108,15 +107,14 @@ impl Cli {
if self.start {
let config = NodeConfig::from_env(self.config.as_deref())?;
start::run_with_command(
&rt,
&config,
data_dir,
RunType::SingleCommandAbortable,
|iroh| async move { command.run(&iroh, &env).await },
)
.await
} else {
let iroh = iroh_quic_connect(data_dir).await.context("rpc connect")?;
let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?;
command.run(&iroh, &env).await
}
}
Expand All @@ -137,7 +135,6 @@ impl Cli {
});

start::run_with_command(
&rt,
&config,
data_dir,
RunType::UntilStopped,
Expand All @@ -157,18 +154,3 @@ impl Cli {
}
}
}

async fn iroh_quic_connect(root: &Path) -> Result<iroh::client::quic::Iroh> {
let rpc_status = RpcStatus::load(root).await?;
match rpc_status {
RpcStatus::Stopped => {
bail!("iroh is not running, please start it");
}
RpcStatus::Running(rpc_port) => {
let iroh = iroh::client::quic::connect(rpc_port)
.await
.context("quic::connect")?;
Ok(iroh)
}
}
}
5 changes: 2 additions & 3 deletions iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,15 +966,14 @@ mod tests {

let data_dir = tempfile::tempdir()?;

let lp = tokio_util::task::LocalPoolHandle::new(1);
let node = crate::commands::start::start_node(&lp, data_dir.path(), None).await?;
let node = crate::commands::start::start_node(data_dir.path(), None).await?;
let client = node.client();
let doc = client.docs.create().await.context("doc create")?;
let author = client.authors.create().await.context("author create")?;

// set up command, getting iroh node
let cli = ConsoleEnv::for_console(data_dir.path()).context("ConsoleEnv")?;
let iroh = crate::commands::iroh_quic_connect(data_dir.path())
let iroh = iroh::client::quic::Iroh::connect(data_dir.path())
.await
.context("rpc connect")?;

Expand Down
100 changes: 2 additions & 98 deletions iroh-cli/src/commands/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::path::Path;

use anyhow::{ensure, Context, Result};
use anyhow::Result;
use clap::Subcommand;
use iroh::{client::Iroh, rpc_protocol::ProviderService, util::path::IrohPaths};
use iroh::{client::Iroh, rpc_protocol::ProviderService};
use quic_rpc::ServiceConnection;
use tokio::{fs, io::AsyncReadExt};
use tracing::trace;

use crate::config::ConsoleEnv;

Expand Down Expand Up @@ -75,95 +71,3 @@ impl RpcCommands {
}
}
}

/// The current status of the RPC endpoint.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum RpcStatus {
/// Stopped.
Stopped,
/// Running on this port.
Running(u16),
}

impl RpcStatus {
pub async fn load(root: impl AsRef<Path>) -> Result<RpcStatus> {
let p = IrohPaths::RpcLock.with_root(root);
trace!("loading RPC lock: {}", p.display());

if p.exists() {
// Lock file exists, read the port and check if we can get a connection.
let mut file = fs::File::open(&p).await.context("open rpc lock file")?;
let file_len = file
.metadata()
.await
.context("reading rpc lock file metadata")?
.len();
if file_len == 2 {
let mut buffer = [0u8; 2];
file.read_exact(&mut buffer)
.await
.context("read rpc lock file")?;
let running_rpc_port = u16::from_le_bytes(buffer);
if iroh::client::quic::connect(running_rpc_port).await.is_ok() {
return Ok(RpcStatus::Running(running_rpc_port));
}
}

// invalid or outdated rpc lock file, delete
drop(file);
fs::remove_file(&p)
.await
.context("deleting rpc lock file")?;
Ok(RpcStatus::Stopped)
} else {
// No lock file, stopped
Ok(RpcStatus::Stopped)
}
}

/// Store the current rpc status.
pub async fn store(root: impl AsRef<Path>, rpc_port: u16) -> Result<()> {
let p = IrohPaths::RpcLock.with_root(root);
trace!("storing RPC lock: {}", p.display());

ensure!(!p.exists(), "iroh is already running");
if let Some(parent) = p.parent() {
fs::create_dir_all(parent)
.await
.context("creating parent dir")?;
}
fs::write(&p, &rpc_port.to_le_bytes())
.await
.context("writing rpc lock file")?;
Ok(())
}

/// Cleans up an existing rpc lock
pub async fn clear(root: impl AsRef<Path>) -> Result<()> {
let p = IrohPaths::RpcLock.with_root(root);
trace!("clearing RPC lock: {}", p.display());

// ignore errors
tokio::fs::remove_file(&p).await.ok();

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_rpc_lock_file() {
let dir = testdir::testdir!();

let rpc_port = 7778;
RpcStatus::store(&dir, rpc_port).await.unwrap();
let status = RpcStatus::load(&dir).await.unwrap();
assert_eq!(status, RpcStatus::Stopped);
let p = IrohPaths::RpcLock.with_root(&dir);
let exists = fs::try_exists(&p).await.unwrap();
assert!(!exists, "should be deleted as not running");
}
}
Loading

0 comments on commit 2364329

Please sign in to comment.