diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 53a33972b0..1f4032cebd 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -31,11 +31,11 @@ use crate::rpc_protocol::{ BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, CounterStats, DeleteTagRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocGetManyRequest, DocGetOneRequest, DocImportRequest, DocInfoRequest, - DocLeaveRequest, DocListRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, - DocSubscribeRequest, DocTicket, GetProgress, ListTagsRequest, ListTagsResponse, - NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, - NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, NodeStatusResponse, ProviderService, - ShareMode, WrapOption, + DocLeaveRequest, DocListRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, + DocStartSyncRequest, DocSubscribeRequest, DocTicket, GetProgress, ListTagsRequest, + ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, + NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, + NodeStatusResponse, ProviderService, ShareMode, WrapOption, }; use crate::sync_engine::{LiveEvent, LiveStatus}; @@ -556,6 +556,26 @@ where Ok(res.entry.content_hash()) } + /// Set an entries on the doc via its key, hash, and size. + pub async fn set_hash( + &self, + author_id: AuthorId, + key: Vec, + hash: Hash, + size: u64, + ) -> Result<()> { + self.rpc + .rpc(DocSetHashRequest { + doc_id: self.id, + author_id, + key, + hash, + size, + }) + .await??; + Ok(()) + } + /// Read the content of an [`Entry`] as a streaming [`BlobReader`]. pub async fn read(&self, entry: &Entry) -> Result { BlobReader::from_rpc(&self.rpc, entry.content_hash()).await diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 983ef3dfc2..672f0cbfac 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -1,16 +1,31 @@ +use std::{ + cell::RefCell, + collections::BTreeMap, + path::{Path, PathBuf}, + rc::Rc, + time::{Duration, Instant}, +}; + use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; use colored::Colorize; use dialoguer::Confirm; -use futures::{StreamExt, TryStreamExt}; -use indicatif::HumanBytes; +use futures::{Stream, StreamExt, TryStreamExt}; +use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; +use tokio::io::AsyncReadExt; + use iroh::{ client::quic::{Doc, Iroh}, - rpc_protocol::{DocTicket, ShareMode}, + rpc_protocol::{DocTicket, ShareMode, WrapOption}, sync_engine::{LiveEvent, Origin}, + util::fs::{path_content_info, PathContent}, +}; +use iroh_bytes::{ + provider::AddProgress, + util::{SetTagOption, Tag}, + Hash, }; use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; -use tokio::io::AsyncReadExt; use crate::config::ConsoleEnv; @@ -133,6 +148,53 @@ pub enum DocCommands { #[clap(short, long, default_value_t=DisplayContentMode::Auto)] mode: DisplayContentMode, }, + /// Import data into a document + Import { + /// Document to operate on. + /// + /// Required unless the document is set through the IROH_DOC environment variable. + /// Within the Iroh console, the active document can also be set with `doc switch`. + #[clap(short, long)] + doc: Option, + /// Author of the entry. + /// + /// Required unless the author is set through the IROH_AUTHOR environment variable. + /// Within the Iroh console, the active author can also be set with `author switch`. + #[clap(short, long)] + author: Option, + /// Prefix to add to imported entries (parsed as UTF-8 string). Defaults to no prefix + #[clap(long)] + prefix: Option, + /// Path to a local file or directory to import + /// + /// Pathnames will be used as the document key + path: String, + /// If true, don't copy the file into iroh, reference the existing file instead + /// + /// Moving a file imported with `in-place` will result in data corruption + #[clap(short, long)] + in_place: bool, + }, + /// Export data from a document + Export { + /// Document to operate on. + /// + /// Required unless the document is set through the IROH_DOC environment variable. + /// Within the Iroh console, the active document can also be set with `doc switch`. + #[clap(short, long)] + doc: Option, + /// Author of the entry. + /// + /// Required unless the author is set through the IROH_AUTHOR environment variable. + /// Within the Iroh console, the active author can also be set with `author switch`. + #[clap(short, long)] + author: Option, + /// Key to the entry (parsed as UTF-8 string) + key: String, + /// Path to export to + #[clap(short, long)] + out: String, + }, /// Watch for changes and events on a document Watch { /// Document to operate on. @@ -309,6 +371,101 @@ impl DocCommands { doc.leave().await?; println!("Doc {} is now inactive", fmt_short(doc.id())); } + Self::Import { + doc, + author, + prefix, + path, + in_place, + } => { + let doc = get_doc(iroh, env, doc).await?; + let author = env.author(author)?; + let mut prefix = prefix.unwrap_or_else(|| String::from("")); + + if prefix.ends_with('/') { + prefix.pop(); + } + let root = canonicalize_path(&path)?.canonicalize()?; + let tag = tag_from_file_name(&root)?; + + let root0 = root.clone(); + println!("Preparing import..."); + // get information about the directory or file we are trying to import + // and confirm with the user that they still want to import the file + let PathContent { size, files } = + tokio::task::spawn_blocking(|| path_content_info(root0)).await??; + let prompt = format!("Import {files} files totaling {}?", HumanBytes(size)); + if !Confirm::new() + .with_prompt(prompt) + .interact() + .unwrap_or(false) + { + println!("Aborted."); + return Ok(()); + } else { + print!("\r"); + } + + let stream = iroh + .blobs + .add_from_path( + root.clone(), + in_place, + SetTagOption::Named(tag.clone()), + WrapOption::NoWrap, + ) + .await?; + let root_prefix = match root.parent() { + Some(p) => p.to_path_buf(), + None => PathBuf::new(), + }; + let start = Instant::now(); + import_coordinator(doc, author, root_prefix, prefix, stream, size, files).await?; + println!("Success! ({})", HumanDuration(start.elapsed())); + } + Self::Export { + doc, + author, + key, + out, + } => { + let doc = get_doc(iroh, env, doc).await?; + let author = env.author(author)?; + let key_str = key.clone(); + let key = key.as_bytes().to_vec(); + let path: PathBuf = canonicalize_path(&out)?; + let entry = doc + .get_one(author, key) + .await? + .ok_or_else(|| anyhow!(""))?; + match doc.read(&entry).await { + Ok(mut content) => { + if let Some(dir) = path.parent() { + if let Err(err) = std::fs::create_dir_all(dir) { + println!( + "", + path.display() + ); + } + }; + let pb = ProgressBar::new(content.size()); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap() + .progress_chars("=>-")); + let file = tokio::fs::File::create(path.clone()).await?; + if let Err(err) = + tokio::io::copy(&mut content, &mut pb.wrap_async_write(file)).await + { + pb.finish_and_clear(); + println!("", path.display()) + } else { + pb.finish_and_clear(); + println!("wrote '{key_str}' to {}", path.display()); + } + } + Err(err) => println!(""), + } + } Self::Watch { doc } => { let doc = get_doc(iroh, env, doc).await?; let mut stream = doc.subscribe().await?; @@ -499,3 +656,204 @@ pub fn fmt_short(hash: impl AsRef<[u8]>) -> String { text.make_ascii_lowercase(); format!("{}…", &text) } + +fn canonicalize_path(path: &str) -> anyhow::Result { + let path = PathBuf::from(shellexpand::tilde(&path).to_string()); + Ok(path) +} + +fn tag_from_file_name(path: &Path) -> anyhow::Result { + match path.file_name() { + Some(name) => name + .to_os_string() + .into_string() + .map(|t| t.into()) + .map_err(|e| anyhow!("{e:?} contains invalid Unicode")), + None => bail!("the given `path` does not have a proper directory or file name"), + } +} + +/// Takes the [`BlobsClient::add_from_path`] and coordinates adding blobs to a +/// document via the hash of the blob. +/// It also creates and powers the [`ImportProgressBar`]. +#[tracing::instrument(skip_all)] +async fn import_coordinator( + doc: Doc, + author_id: AuthorId, + root: PathBuf, + prefix: String, + blob_add_progress: impl Stream> + Send + Unpin + 'static, + expected_size: u64, + expected_entries: u64, +) -> Result<()> { + let imp = ImportProgressBar::new( + &root.display().to_string(), + doc.id(), + expected_size, + expected_entries, + ); + let task_imp = imp.clone(); + + let collections = Rc::new(RefCell::new(BTreeMap::< + u64, + (String, u64, Option, u64), + >::new())); + + let _stats: Vec = blob_add_progress + .filter_map(|item| async { + let item = match item.context("Error adding files") { + Err(e) => return Some(Err(e)), + Ok(item) => item, + }; + match item { + AddProgress::Found { name, id, size } => { + tracing::info!("Found({id},{name},{size})"); + imp.add_found(name.clone(), size); + collections.borrow_mut().insert(id, (name, size, None, 0)); + None + } + AddProgress::Progress { id, offset } => { + tracing::info!("Progress({id}, {offset})"); + if let Some((_, size, _, last_val)) = collections.borrow_mut().get_mut(&id) { + assert!(*last_val <= offset, "wtf"); + assert!(offset <= *size, "wtf2"); + imp.add_progress(offset - *last_val); + *last_val = offset; + } + None + } + AddProgress::Done { hash, id } => { + tracing::info!("Done({id},{hash:?})"); + match collections.borrow_mut().get_mut(&id) { + Some((path_str, size, ref mut h, last_val)) => { + imp.add_progress(*size - *last_val); + imp.import_found(path_str.clone()); + *h = Some(hash); + let key = match key_from_path_str( + root.clone(), + prefix.clone(), + path_str.clone(), + ) { + Ok(k) => k, + Err(e) => { + tracing::info!("error getting key from {}, id {id}", path_str); + return Some(Err(anyhow::anyhow!( + "Issue creating a key for entry {hash:?}: {e}" + ))); + } + }; + // send update to doc + tracing::info!( + "setting entry {} (id: {id}) to doc", + String::from_utf8(key.clone()).unwrap() + ); + Some(Ok((key, hash, *size))) + } + None => { + tracing::info!( + "error: got `AddProgress::Done` for unknown collection id {id}" + ); + Some(Err(anyhow::anyhow!( + "Received progress information on an unknown file." + ))) + } + } + } + AddProgress::AllDone { hash, .. } => { + imp.add_done(); + tracing::info!("AddProgress::AllDone({hash:?})"); + None + } + AddProgress::Abort(e) => { + tracing::info!("Error while adding data: {e}"); + Some(Err(anyhow::anyhow!("Error while adding files: {e}"))) + } + } + }) + .try_chunks(1024) + .map_ok(|chunks| { + futures::stream::iter(chunks.into_iter().map(|(key, hash, size)| { + let doc = doc.clone(); + let imp = task_imp.clone(); + Ok(async move { + doc.set_hash(author_id, key, hash, size).await?; + imp.import_progress(); + anyhow::Ok(size) + }) + })) + }) + .try_flatten() + .try_buffer_unordered(64) + .try_collect() + .await?; + + task_imp.all_done(); + Ok(()) +} + +/// Creates a document key from the path, removing the full canonicalized path, and adding +/// whatever prefix the user requests. +fn key_from_path_str(root: PathBuf, prefix: String, path_str: String) -> Result> { + let suffix = PathBuf::from(path_str) + .strip_prefix(root)? + .to_str() + .map(|p| p.as_bytes()) + .ok_or(anyhow!("could not convert path to bytes"))? + .to_vec(); + let mut key = prefix.into_bytes().to_vec(); + key.extend(suffix); + Ok(key) +} + +#[derive(Debug, Clone)] +struct ImportProgressBar { + mp: MultiProgress, + import: ProgressBar, + add: ProgressBar, +} + +impl ImportProgressBar { + fn new(source: &str, doc_id: NamespaceId, expected_size: u64, expected_entries: u64) -> Self { + let mp = MultiProgress::new(); + let add = mp.add(ProgressBar::new(0)); + add.set_style(ProgressStyle::default_bar() + .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap() + .progress_chars("=>-")); + add.set_message(format!("Importing from {source}...")); + add.set_length(expected_size); + add.set_position(0); + add.enable_steady_tick(Duration::from_millis(500)); + + let doc_id = fmt_short(doc_id.to_bytes()); + let import = mp.add(ProgressBar::new(0)); + import.set_style(ProgressStyle::default_bar() + .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} ({per_sec}, eta {eta})").unwrap() + .progress_chars("=>-")); + import.set_message(format!("Adding to doc {doc_id}...")); + import.set_length(expected_entries); + import.set_position(0); + import.enable_steady_tick(Duration::from_millis(500)); + + Self { mp, import, add } + } + + fn add_found(&self, _name: String, _size: u64) {} + + fn import_found(&self, _name: String) {} + + fn add_progress(&self, size: u64) { + self.add.inc(size); + } + + fn import_progress(&self) { + self.import.inc(1); + } + + fn add_done(&self) { + self.add.set_position(self.add.length().unwrap_or_default()); + } + + fn all_done(self) { + self.mp.clear().ok(); + } +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 993bd99606..2f28201662 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1517,6 +1517,12 @@ fn handle_rpc_request { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.doc_set_hash(req).await + }) + .await + } DocGet(msg) => { chan.server_streaming(msg, handler, |handler, req| { handler.inner.sync.doc_get_many(req) diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 053fe51999..697ef28c83 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -671,6 +671,29 @@ pub struct DocDelResponse { pub removed: usize, } +/// Set an entry in a document via its hash +#[derive(Serialize, Deserialize, Debug)] +pub struct DocSetHashRequest { + /// The document id + pub doc_id: NamespaceId, + /// Author of this entry. + pub author_id: AuthorId, + /// Key of this entry. + pub key: Vec, + /// Hash of this entry. + pub hash: Hash, + /// Size of this entry. + pub size: u64, +} + +impl RpcMsg for DocSetHashRequest { + type Response = RpcResult; +} + +/// Response to [`DocSetHashRequest`] +#[derive(Serialize, Deserialize, Debug)] +pub struct DocSetHashResponse {} + /// Get entries from a document #[derive(Serialize, Deserialize, Debug)] pub struct DocGetManyRequest { @@ -837,6 +860,7 @@ pub enum ProviderRequest { DocDrop(DocDropRequest), DocImport(DocImportRequest), DocSet(DocSetRequest), + DocSetHash(DocSetHashRequest), DocGet(DocGetManyRequest), DocGetOne(DocGetOneRequest), DocDel(DocDelRequest), @@ -879,6 +903,7 @@ pub enum ProviderResponse { DocDrop(RpcResult), DocImport(RpcResult), DocSet(RpcResult), + DocSetHash(RpcResult), DocGet(RpcResult), DocGetOne(RpcResult), DocDel(RpcResult), diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 2f8f9a3859..e932326916 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -16,10 +16,10 @@ use crate::{ DocCreateRequest, DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, DocDropResponse, DocGetManyRequest, DocGetManyResponse, DocGetOneRequest, DocGetOneResponse, DocImportRequest, DocImportResponse, DocInfoRequest, DocInfoResponse, - DocLeaveRequest, DocLeaveResponse, DocListRequest, DocListResponse, DocSetRequest, - DocSetResponse, DocShareRequest, DocShareResponse, DocStartSyncRequest, - DocStartSyncResponse, DocSubscribeRequest, DocSubscribeResponse, DocTicket, RpcResult, - ShareMode, + DocLeaveRequest, DocLeaveResponse, DocListRequest, DocListResponse, DocSetHashRequest, + DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, + DocStartSyncRequest, DocStartSyncResponse, DocSubscribeRequest, DocSubscribeResponse, + DocTicket, RpcResult, ShareMode, }, sync_engine::{KeepCallback, LiveStatus, SyncEngine}, }; @@ -214,6 +214,22 @@ impl SyncEngine { Ok(DocDelResponse { removed }) } + pub async fn doc_set_hash(&self, req: DocSetHashRequest) -> RpcResult { + let DocSetHashRequest { + doc_id, + author_id, + key, + hash, + size, + } = req; + let replica = self.get_replica(&doc_id)?; + let author = self.get_author(&author_id)?; + replica + .insert(key, &author, hash, size) + .map_err(anyhow::Error::from)?; + Ok(DocSetHashResponse {}) + } + pub fn doc_get_many( &self, req: DocGetManyRequest, diff --git a/iroh/src/util/fs.rs b/iroh/src/util/fs.rs index f2653bbb94..f52acfbd97 100644 --- a/iroh/src/util/fs.rs +++ b/iroh/src/util/fs.rs @@ -1,6 +1,7 @@ //! Utilities for filesystem operations. use std::{ borrow::Cow, + fs::read_dir, path::{Component, Path, PathBuf}, }; @@ -171,11 +172,101 @@ pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result { } } +/// Information about the content on a path +#[derive(Debug, Clone)] +pub struct PathContent { + /// total size of all the files in the directory + pub size: u64, + /// total number of files in the directory + pub files: u64, +} + +/// Walks the directory to get the total size and number of files in directory or file +/// +// TODO: possible combine with `scan_dir` +pub fn path_content_info(path: impl AsRef) -> anyhow::Result { + path_content_info0(path) +} + +fn path_content_info0(path: impl AsRef) -> anyhow::Result { + let mut files = 0; + let mut size = 0; + let path = path.as_ref(); + + if path.is_dir() { + for entry in read_dir(path)? { + let path0 = entry?.path(); + + match path_content_info0(path0) { + Ok(path_content) => { + size += path_content.size; + files += path_content.files; + } + Err(e) => bail!(e), + } + } + } else { + match path.try_exists() { + Ok(true) => { + size = path + .metadata() + .context(format!("Error reading metadata for {path:?}"))? + .len(); + files = 1; + } + Ok(false) => { + tracing::warn!("Not including broking symlink at {path:?}"); + } + Err(e) => { + bail!(e); + } + } + } + Ok(PathContent { size, files }) +} + #[cfg(test)] mod tests { + use crate::util::fs::{path_content_info, PathContent}; #[test] fn test_canonicalize_path() { assert_eq!(super::canonicalize_path("foo/bar").unwrap(), "foo/bar"); } + + #[test] + fn test_get_path_content() { + let dir = testdir::testdir!(); + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(0, size); + assert_eq!(0, files); + let foo = b"hello_world"; + let bar = b"ipsum lorem"; + let bat = b"happy birthday"; + let expect_size = foo.len() + bar.len() + bat.len(); + std::fs::write(dir.join("foo.txt"), foo).unwrap(); + std::fs::write(dir.join("bar.txt"), bar).unwrap(); + std::fs::write(dir.join("bat.txt"), bat).unwrap(); + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(expect_size as u64, size); + assert_eq!(3, files); + + // create nested empty dirs + std::fs::create_dir(dir.join("1")).unwrap(); + std::fs::create_dir(dir.join("2")).unwrap(); + let dir3 = dir.join("3"); + std::fs::create_dir(&dir3).unwrap(); + + // create a nested dir w/ content + let dir4 = dir3.join("4"); + std::fs::create_dir(&dir4).unwrap(); + std::fs::write(dir4.join("foo.txt"), foo).unwrap(); + std::fs::write(dir4.join("bar.txt"), bar).unwrap(); + std::fs::write(dir4.join("bat.txt"), bat).unwrap(); + + let expect_size = expect_size * 2; + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(expect_size as u64, size); + assert_eq!(6, files); + } }