Skip to content

Commit

Permalink
refactor: make export a seperate operation from download (#2113)
Browse files Browse the repository at this point in the history
## Description

This changes two things about how we expose Iroh in the RPC API:

* Add `iroh.blobs.export()` / `BlobExportRequest` to export a blob from
the internal blob store to the local filesystem. We prominently missed
this in our API.
* Add `blob export` to the CLI. The CLI also keeps the "export after
download" functionality, it just issues a second RPC request for the
export now.
* Remove the `out: DownloadLocation` field from `BlobDownloadRequest`
and thus the functionality to automatically export a blob after the
download finishes. Instead, users may call `iroh.blobs.export`
themselves now after the download finishes. This does not make a
difference performance-wise (apart from a single RPC roundtrip), because
the export operation was already running sequentially after the download
before this PR.

The untangling of export and download will simplify #2085 quite a bit,
because at the moment the fact that the export progress events were
emitted from outside the downloader introduced a complication in how we
can emit progress events from the downloader (we had to remap the
progress IDs to avoid conflicts). With this change, a download will only
get progress event from the download itself, so this is solved much more
simple.

## Notes & open questions

There's no progress reporting for the CLI yet. But we also didn't have
that before, so I think it's fine to do as a followup.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
Frando authored Mar 22, 2024
1 parent cbc5906 commit 488be5b
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 142 deletions.
11 changes: 5 additions & 6 deletions iroh-bytes/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::trace;

use crate::{
format::collection::Collection,
store::{BaoBlobSize, ExportMode, MapEntry, Store as BaoStore},
store::{BaoBlobSize, ExportFormat, ExportMode, MapEntry, Store as BaoStore},
util::progress::{IdGenerator, ProgressSender},
Hash,
};
Expand All @@ -27,14 +27,13 @@ pub async fn export<D: BaoStore>(
db: &D,
hash: Hash,
outpath: PathBuf,
recursive: bool,
format: ExportFormat,
mode: ExportMode,
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
if recursive {
export_collection(db, hash, outpath, mode, progress).await
} else {
export_blob(db, hash, outpath, mode, progress).await
match format {
ExportFormat::Blob => export_blob(db, hash, outpath, mode, progress).await,
ExportFormat::Collection => export_collection(db, hash, outpath, mode, progress).await,
}
}

Expand Down
20 changes: 2 additions & 18 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::hashseq::parse_hash_seq;
use crate::store::BaoBatchWriter;

use crate::{
export::ExportProgress,
get::{
self,
error::GetError,
Expand All @@ -38,8 +37,7 @@ use tracing::trace;
///
/// Progress is reported as [`DownloadProgress`] through a [`ProgressSender`]. Note that the
/// [`DownloadProgress::AllDone`] event is not emitted from here, but left to an upper layer to send,
/// if desired. The [`DownloadProgress::Export`] variant will also never be sent from this
/// function.
/// if desired.
pub async fn get_to_db<
D: BaoStore,
C: FnOnce() -> F,
Expand Down Expand Up @@ -563,26 +561,12 @@ pub enum DownloadProgress {
/// The unique id of the entry.
id: u64,
},
/// All network operations finished
NetworkDone(Stats),
/// If a download is to be exported to the local filesyste, this will report the export
/// progress.
Export(ExportProgress),
/// All operations finished.
///
/// This will be the last message in the stream.
AllDone,
AllDone(Stats),
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

impl From<ExportProgress> for DownloadProgress {
fn from(value: ExportProgress) -> Self {
match value {
ExportProgress::Abort(err) => Self::Abort(err),
value => Self::Export(value),
}
}
}
18 changes: 18 additions & 0 deletions iroh-bytes/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,24 @@ pub enum ExportMode {
TryReference,
}

/// The expected format of a hash being exported.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum ExportFormat {
/// The hash refers to any blob and will be exported to a single file.
#[default]
Blob,
/// The hash refers to a [`crate::format::collection::Collection`] blob
/// and all children of the collection shall be exported to one file per child.
///
/// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
/// collection metadata, all other children of the collection will be exported to
/// a file each, with their collection name treated as a relative path to the export
/// destination path.
///
/// If the blob cannot be parsed as a collection, the operation will fail.
Collection,
}

#[allow(missing_docs)]
#[derive(Debug)]
pub enum ExportProgress {
Expand Down
130 changes: 89 additions & 41 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use indicatif::{
use iroh::bytes::{
get::{db::DownloadProgress, Stats},
provider::AddProgress,
store::{ValidateLevel, ValidateProgress},
store::{ExportFormat, ExportMode, ValidateLevel, ValidateProgress},
BlobFormat, Hash, HashAndFormat, Tag,
};
use iroh::net::{key::PublicKey, relay::RelayUrl, NodeAddr};
use iroh::{
client::{BlobStatus, Iroh, ShareTicketOptions},
rpc_protocol::{
BlobDownloadRequest, BlobListCollectionsResponse, BlobListIncompleteResponse,
BlobListResponse, DownloadLocation, ProviderService, SetTagOption, WrapOption,
BlobListResponse, ProviderService, SetTagOption, WrapOption,
},
ticket::BlobTicket,
};
Expand Down Expand Up @@ -82,6 +82,23 @@ pub enum BlobCommands {
#[clap(long)]
tag: Option<String>,
},
/// Export a blob from the internal blob store to the local filesystem.
Export {
/// The hash to export.
hash: Hash,
/// Directory or file in which to save the file(s).
///
/// If set to `STDOUT` the output will be redirected to stdout.
out: OutputTarget,
/// Set to true if the hash refers to a collection and you want to export all children of
/// the collection.
#[clap(long, default_value_t = false)]
recursive: bool,
/// If set, the data will be moved to the output directory, and iroh will assume that it
/// will not change.
#[clap(long, default_value_t = false)]
stable: bool,
},
/// List available content on the node.
#[clap(subcommand)]
List(ListCommands),
Expand Down Expand Up @@ -216,53 +233,89 @@ impl BlobCommands {
None => SetTagOption::Auto,
};

let out_location = match out {
None => DownloadLocation::Internal,
Some(OutputTarget::Stdout) => DownloadLocation::Internal,
Some(OutputTarget::Path(ref path)) => {
let absolute = std::env::current_dir()?.join(path);
match format {
BlobFormat::HashSeq => {
// no validation necessary for now
}
BlobFormat::Raw => {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
}
tracing::info!(
"output path is {} -> {}",
path.display(),
absolute.display()
);
DownloadLocation::External {
path: absolute,
in_place: stable,
}
}
};

let mut stream = iroh
.blobs
.download(BlobDownloadRequest {
hash,
format,
peer: node_addr,
out: out_location,
tag,
})
.await?;

show_download_progress(hash, &mut stream).await?;

// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
if out == Some(OutputTarget::Stdout) {
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
match out {
None => {}
Some(OutputTarget::Stdout) => {
// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
Some(OutputTarget::Path(path)) => {
let absolute = std::env::current_dir()?.join(&path);
if matches!(format, BlobFormat::HashSeq) {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
let recursive = format == BlobFormat::HashSeq;
let mode = match stable {
true => ExportMode::TryReference,
false => ExportMode::Copy,
};
let format = match recursive {
true => ExportFormat::Collection,
false => ExportFormat::Blob,
};
tracing::info!("exporting to {} -> {}", path.display(), absolute.display());
let stream = iroh.blobs.export(hash, absolute, format, mode).await?;
// TODO: report export progress
stream.await?;
}
};

Ok(())
}
Self::Export {
hash,
out,
recursive,
stable,
} => {
match out {
OutputTarget::Stdout => {
ensure!(
!recursive,
"Recursive option is not supported when exporting to STDOUT"
);
let mut blob_read = iroh.blobs.read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
}
OutputTarget::Path(path) => {
let absolute = std::env::current_dir()?.join(&path);
if !recursive {
ensure!(!absolute.is_dir(), "output must not be a directory");
}
let mode = match stable {
true => ExportMode::TryReference,
false => ExportMode::Copy,
};
let format = match recursive {
true => ExportFormat::Collection,
false => ExportFormat::Blob,
};
tracing::info!(
"exporting {hash} to {} -> {}",
path.display(),
absolute.display()
);
let stream = iroh.blobs.export(hash, absolute, format, mode).await?;
// TODO: report export progress
stream.await?;
}
};
Ok(())
}
Self::List(cmd) => cmd.run(iroh).await,
Self::Delete(cmd) => cmd.run(iroh).await,
Self::Validate { verbose, repair } => validate(iroh, verbose, repair).await,
Expand Down Expand Up @@ -896,7 +949,7 @@ pub async fn show_download_progress(
DownloadProgress::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::NetworkDone(Stats {
DownloadProgress::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -908,16 +961,11 @@ pub async fn show_download_progress(
HumanDuration(elapsed),
HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64)
);
break;
}
DownloadProgress::Abort(e) => {
bail!("download aborted: {:?}", e);
}
DownloadProgress::Export(_p) => {
// TODO: report export progress
}
DownloadProgress::AllDone => {
break;
}
}
}
Ok(())
Expand Down
3 changes: 0 additions & 3 deletions iroh/examples/collection-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,

// The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well).
out: iroh::rpc_protocol::DownloadLocation::Internal,
};

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download.
Expand Down
3 changes: 0 additions & 3 deletions iroh/examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,

// The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well).
out: iroh::rpc_protocol::DownloadLocation::Internal,
};

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download.
Expand Down
Loading

0 comments on commit 488be5b

Please sign in to comment.