Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make export a seperate operation from download #2113

Merged
merged 5 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::hashseq::parse_hash_seq;
use crate::store::BaoBatchWriter;

use crate::{
export::ExportProgress,
get::{
self,
error::GetError,
Expand All @@ -38,8 +37,7 @@ use tracing::trace;
///
/// Progress is reported as [`DownloadProgress`] through a [`ProgressSender`]. Note that the
/// [`DownloadProgress::AllDone`] event is not emitted from here, but left to an upper layer to send,
/// if desired. The [`DownloadProgress::Export`] variant will also never be sent from this
/// function.
/// if desired.
pub async fn get_to_db<
D: BaoStore,
C: FnOnce() -> F,
Expand Down Expand Up @@ -563,26 +561,12 @@ pub enum DownloadProgress {
/// The unique id of the entry.
id: u64,
},
/// All network operations finished
NetworkDone(Stats),
/// If a download is to be exported to the local filesyste, this will report the export
/// progress.
Export(ExportProgress),
/// All operations finished.
///
/// This will be the last message in the stream.
AllDone,
AllDone(Stats),
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

impl From<ExportProgress> for DownloadProgress {
fn from(value: ExportProgress) -> Self {
match value {
ExportProgress::Abort(err) => Self::Abort(err),
value => Self::Export(value),
}
}
}
122 changes: 81 additions & 41 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use indicatif::{
use iroh::bytes::{
get::{db::DownloadProgress, Stats},
provider::AddProgress,
store::{ValidateLevel, ValidateProgress},
store::{ExportMode, ValidateLevel, ValidateProgress},
BlobFormat, Hash, HashAndFormat, Tag,
};
use iroh::net::{key::PublicKey, relay::RelayUrl, NodeAddr};
use iroh::{
client::{BlobStatus, Iroh, ShareTicketOptions},
rpc_protocol::{
BlobDownloadRequest, BlobListCollectionsResponse, BlobListIncompleteResponse,
BlobListResponse, DownloadLocation, ProviderService, SetTagOption, WrapOption,
BlobListResponse, ProviderService, SetTagOption, WrapOption,
},
ticket::BlobTicket,
};
Expand Down Expand Up @@ -82,6 +82,23 @@ pub enum BlobCommands {
#[clap(long)]
tag: Option<String>,
},
/// Export a blob from the internal blob store to the local filesystem.
Export {
/// The hash to export.
hash: Hash,
/// Directory or file in which to save the file(s).
///
/// If set to `STDOUT` the output will be redirected to stdout.
out: OutputTarget,
/// Set to true if the hash refers to a collection and you want to export all children of
/// the collection.
#[clap(long, default_value_t = false)]
recursive: bool,
/// If set, the data will be moved to the output directory, and iroh will assume that it
/// will not change.
#[clap(long, default_value_t = false)]
stable: bool,
},
/// List available content on the node.
#[clap(subcommand)]
List(ListCommands),
Expand Down Expand Up @@ -216,53 +233,81 @@ impl BlobCommands {
None => SetTagOption::Auto,
};

let out_location = match out {
None => DownloadLocation::Internal,
Some(OutputTarget::Stdout) => DownloadLocation::Internal,
Some(OutputTarget::Path(ref path)) => {
let absolute = std::env::current_dir()?.join(path);
match format {
BlobFormat::HashSeq => {
// no validation necessary for now
}
BlobFormat::Raw => {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
}
tracing::info!(
"output path is {} -> {}",
path.display(),
absolute.display()
);
DownloadLocation::External {
path: absolute,
in_place: stable,
}
}
};

let mut stream = iroh
.blobs
.download(BlobDownloadRequest {
hash,
format,
peer: node_addr,
out: out_location,
tag,
})
.await?;

show_download_progress(hash, &mut stream).await?;

// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
if out == Some(OutputTarget::Stdout) {
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
match out {
None => {}
Some(OutputTarget::Stdout) => {
// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
Some(OutputTarget::Path(path)) => {
let absolute = std::env::current_dir()?.join(&path);
if matches!(format, BlobFormat::HashSeq) {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
let recursive = format == BlobFormat::HashSeq;
let mode = match stable {
true => ExportMode::TryReference,
false => ExportMode::Copy,
};
tracing::info!("exporting to {} -> {}", path.display(), absolute.display());
let stream = iroh.blobs.export(hash, absolute, recursive, mode).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this time, but this is not a generic HashSeq export but an export that will only work for a specific type of HashSeq, a collection.

// TODO: report export progress
stream.await?;
}
};

Ok(())
}
Self::Export {
hash,
out,
recursive,
stable,
} => {
match out {
OutputTarget::Stdout => {
ensure!(
!recursive,
"Recursive option is not supported when exporting to STDOUT"
);
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
OutputTarget::Path(path) => {
let absolute = std::env::current_dir()?.join(&path);
if !recursive {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
let mode = match stable {
true => ExportMode::TryReference,
false => ExportMode::Copy,
};
tracing::info!(
"exporting {hash} to {} -> {}",
path.display(),
absolute.display()
);
let stream = iroh.blobs.export(hash, absolute, recursive, mode).await?;
// TODO: report export progress
stream.await?;
}
};
Ok(())
}
Self::List(cmd) => cmd.run(iroh).await,
Self::Delete(cmd) => cmd.run(iroh).await,
Self::Validate { verbose, repair } => validate(iroh, verbose, repair).await,
Expand Down Expand Up @@ -896,7 +941,7 @@ pub async fn show_download_progress(
DownloadProgress::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::NetworkDone(Stats {
DownloadProgress::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -908,16 +953,11 @@ pub async fn show_download_progress(
HumanDuration(elapsed),
HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64)
);
break;
}
DownloadProgress::Abort(e) => {
bail!("download aborted: {:?}", e);
}
DownloadProgress::Export(_p) => {
// TODO: report export progress
}
DownloadProgress::AllDone => {
break;
}
}
}
Ok(())
Expand Down
3 changes: 0 additions & 3 deletions iroh/examples/collection-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,

// The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well).
out: iroh::rpc_protocol::DownloadLocation::Internal,
};

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download.
Expand Down
3 changes: 0 additions & 3 deletions iroh/examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,

// The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well).
out: iroh::rpc_protocol::DownloadLocation::Internal,
};

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download.
Expand Down
Loading
Loading