diff --git a/iroh-bytes/src/export.rs b/iroh-bytes/src/export.rs index 7660379ba5..75b282fd6c 100644 --- a/iroh-bytes/src/export.rs +++ b/iroh-bytes/src/export.rs @@ -10,7 +10,7 @@ use tracing::trace; use crate::{ format::collection::Collection, - store::{BaoBlobSize, ExportMode, MapEntry, Store as BaoStore}, + store::{BaoBlobSize, ExportFormat, ExportMode, MapEntry, Store as BaoStore}, util::progress::{IdGenerator, ProgressSender}, Hash, }; @@ -27,14 +27,13 @@ pub async fn export( db: &D, hash: Hash, outpath: PathBuf, - recursive: bool, + format: ExportFormat, mode: ExportMode, progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { - if recursive { - export_collection(db, hash, outpath, mode, progress).await - } else { - export_blob(db, hash, outpath, mode, progress).await + match format { + ExportFormat::Blob => export_blob(db, hash, outpath, mode, progress).await, + ExportFormat::Collection => export_collection(db, hash, outpath, mode, progress).await, } } diff --git a/iroh-bytes/src/get/db.rs b/iroh-bytes/src/get/db.rs index 31a7afdad3..713d2eb310 100644 --- a/iroh-bytes/src/get/db.rs +++ b/iroh-bytes/src/get/db.rs @@ -14,7 +14,6 @@ use crate::hashseq::parse_hash_seq; use crate::store::BaoBatchWriter; use crate::{ - export::ExportProgress, get::{ self, error::GetError, @@ -38,8 +37,7 @@ use tracing::trace; /// /// Progress is reported as [`DownloadProgress`] through a [`ProgressSender`]. Note that the /// [`DownloadProgress::AllDone`] event is not emitted from here, but left to an upper layer to send, -/// if desired. The [`DownloadProgress::Export`] variant will also never be sent from this -/// function. +/// if desired. pub async fn get_to_db< D: BaoStore, C: FnOnce() -> F, @@ -563,26 +561,12 @@ pub enum DownloadProgress { /// The unique id of the entry. id: u64, }, - /// All network operations finished - NetworkDone(Stats), - /// If a download is to be exported to the local filesyste, this will report the export - /// progress. - Export(ExportProgress), /// All operations finished. /// /// This will be the last message in the stream. - AllDone, + AllDone(Stats), /// We got an error and need to abort. /// /// This will be the last message in the stream. Abort(RpcError), } - -impl From for DownloadProgress { - fn from(value: ExportProgress) -> Self { - match value { - ExportProgress::Abort(err) => Self::Abort(err), - value => Self::Export(value), - } - } -} diff --git a/iroh-bytes/src/store/traits.rs b/iroh-bytes/src/store/traits.rs index 734569f8ea..0868822181 100644 --- a/iroh-bytes/src/store/traits.rs +++ b/iroh-bytes/src/store/traits.rs @@ -629,6 +629,24 @@ pub enum ExportMode { TryReference, } +/// The expected format of a hash being exported. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub enum ExportFormat { + /// The hash refers to any blob and will be exported to a single file. + #[default] + Blob, + /// The hash refers to a [`crate::format::collection::Collection`] blob + /// and all children of the collection shall be exported to one file per child. + /// + /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains + /// collection metadata, all other children of the collection will be exported to + /// a file each, with their collection name treated as a relative path to the export + /// destination path. + /// + /// If the blob cannot be parsed as a collection, the operation will fail. + Collection, +} + #[allow(missing_docs)] #[derive(Debug)] pub enum ExportProgress { diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index d68e7525a8..d3d3c38255 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -16,7 +16,7 @@ use indicatif::{ use iroh::bytes::{ get::{db::DownloadProgress, Stats}, provider::AddProgress, - store::{ValidateLevel, ValidateProgress}, + store::{ExportFormat, ExportMode, ValidateLevel, ValidateProgress}, BlobFormat, Hash, HashAndFormat, Tag, }; use iroh::net::{key::PublicKey, relay::RelayUrl, NodeAddr}; @@ -24,7 +24,7 @@ use iroh::{ client::{BlobStatus, Iroh, ShareTicketOptions}, rpc_protocol::{ BlobDownloadRequest, BlobListCollectionsResponse, BlobListIncompleteResponse, - BlobListResponse, DownloadLocation, ProviderService, SetTagOption, WrapOption, + BlobListResponse, ProviderService, SetTagOption, WrapOption, }, ticket::BlobTicket, }; @@ -82,6 +82,23 @@ pub enum BlobCommands { #[clap(long)] tag: Option, }, + /// Export a blob from the internal blob store to the local filesystem. + Export { + /// The hash to export. + hash: Hash, + /// Directory or file in which to save the file(s). + /// + /// If set to `STDOUT` the output will be redirected to stdout. + out: OutputTarget, + /// Set to true if the hash refers to a collection and you want to export all children of + /// the collection. + #[clap(long, default_value_t = false)] + recursive: bool, + /// If set, the data will be moved to the output directory, and iroh will assume that it + /// will not change. + #[clap(long, default_value_t = false)] + stable: bool, + }, /// List available content on the node. #[clap(subcommand)] List(ListCommands), @@ -216,53 +233,89 @@ impl BlobCommands { None => SetTagOption::Auto, }; - let out_location = match out { - None => DownloadLocation::Internal, - Some(OutputTarget::Stdout) => DownloadLocation::Internal, - Some(OutputTarget::Path(ref path)) => { - let absolute = std::env::current_dir()?.join(path); - match format { - BlobFormat::HashSeq => { - // no validation necessary for now - } - BlobFormat::Raw => { - ensure!(!absolute.is_dir(), "output must not be a directory"); - } - } - tracing::info!( - "output path is {} -> {}", - path.display(), - absolute.display() - ); - DownloadLocation::External { - path: absolute, - in_place: stable, - } - } - }; - let mut stream = iroh .blobs .download(BlobDownloadRequest { hash, format, peer: node_addr, - out: out_location, tag, }) .await?; show_download_progress(hash, &mut stream).await?; - // we asserted above that `OutputTarget::Stdout` is only permitted if getting a - // single hash and not a hashseq. - if out == Some(OutputTarget::Stdout) { - let mut blob_read = iroh.blobs.read(hash).await?; - tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?; - } + match out { + None => {} + Some(OutputTarget::Stdout) => { + // we asserted above that `OutputTarget::Stdout` is only permitted if getting a + // single hash and not a hashseq. + let mut blob_read = iroh.blobs.read(hash).await?; + tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?; + } + Some(OutputTarget::Path(path)) => { + let absolute = std::env::current_dir()?.join(&path); + if matches!(format, BlobFormat::HashSeq) { + ensure!(!absolute.is_dir(), "output must not be a directory"); + } + let recursive = format == BlobFormat::HashSeq; + let mode = match stable { + true => ExportMode::TryReference, + false => ExportMode::Copy, + }; + let format = match recursive { + true => ExportFormat::Collection, + false => ExportFormat::Blob, + }; + tracing::info!("exporting to {} -> {}", path.display(), absolute.display()); + let stream = iroh.blobs.export(hash, absolute, format, mode).await?; + // TODO: report export progress + stream.await?; + } + }; Ok(()) } + Self::Export { + hash, + out, + recursive, + stable, + } => { + match out { + OutputTarget::Stdout => { + ensure!( + !recursive, + "Recursive option is not supported when exporting to STDOUT" + ); + let mut blob_read = iroh.blobs.read(hash).await?; + tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?; + } + OutputTarget::Path(path) => { + let absolute = std::env::current_dir()?.join(&path); + if !recursive { + ensure!(!absolute.is_dir(), "output must not be a directory"); + } + let mode = match stable { + true => ExportMode::TryReference, + false => ExportMode::Copy, + }; + let format = match recursive { + true => ExportFormat::Collection, + false => ExportFormat::Blob, + }; + tracing::info!( + "exporting {hash} to {} -> {}", + path.display(), + absolute.display() + ); + let stream = iroh.blobs.export(hash, absolute, format, mode).await?; + // TODO: report export progress + stream.await?; + } + }; + Ok(()) + } Self::List(cmd) => cmd.run(iroh).await, Self::Delete(cmd) => cmd.run(iroh).await, Self::Validate { verbose, repair } => validate(iroh, verbose, repair).await, @@ -896,7 +949,7 @@ pub async fn show_download_progress( DownloadProgress::Done { .. } => { ip.finish_and_clear(); } - DownloadProgress::NetworkDone(Stats { + DownloadProgress::AllDone(Stats { bytes_read, elapsed, .. @@ -908,16 +961,11 @@ pub async fn show_download_progress( HumanDuration(elapsed), HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64) ); + break; } DownloadProgress::Abort(e) => { bail!("download aborted: {:?}", e); } - DownloadProgress::Export(_p) => { - // TODO: report export progress - } - DownloadProgress::AllDone => { - break; - } } } Ok(()) diff --git a/iroh/examples/collection-fetch.rs b/iroh/examples/collection-fetch.rs index 1575a09200..e21ac30139 100644 --- a/iroh/examples/collection-fetch.rs +++ b/iroh/examples/collection-fetch.rs @@ -69,9 +69,6 @@ async fn main() -> Result<()> { // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. tag: iroh::rpc_protocol::SetTagOption::Auto, - - // The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well). - out: iroh::rpc_protocol::DownloadLocation::Internal, }; // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download. diff --git a/iroh/examples/hello-world-fetch.rs b/iroh/examples/hello-world-fetch.rs index d3b3a4de55..7294c67093 100644 --- a/iroh/examples/hello-world-fetch.rs +++ b/iroh/examples/hello-world-fetch.rs @@ -69,9 +69,6 @@ async fn main() -> Result<()> { // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. tag: iroh::rpc_protocol::SetTagOption::Auto, - - // The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well). - out: iroh::rpc_protocol::DownloadLocation::Internal, }; // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download. diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index c42e7583cf..3800d01b6b 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -11,8 +11,12 @@ use bytes::Bytes; use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt}; use iroh_base::ticket::BlobTicket; use iroh_bytes::{ - format::collection::Collection, get::db::DownloadProgress, provider::AddProgress, - store::ValidateProgress, BlobFormat, Hash, Tag, + export::ExportProgress, + format::collection::Collection, + get::db::DownloadProgress, + provider::AddProgress, + store::{ExportFormat, ExportMode, ValidateProgress}, + BlobFormat, Hash, Tag, }; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; @@ -23,7 +27,7 @@ use tracing::warn; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobDeleteBlobRequest, - BlobDownloadRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, + BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, @@ -214,6 +218,32 @@ where )) } + /// Export a blob from the internal blob store to a path on the node's filesystem. + /// + /// `destination` should be an writeable, absolute path on the local node's filesystem. + /// + /// If `format` is set to [`ExportFormat::Collection`], and the `hash` refers to a collection, + /// all children of the collection will be exported. See [`ExportFormat`] for details. + /// + /// The `mode` argument defines if the blob should be copied to the target location or moved out of + /// the internal store into the target location. See [`ExportMode`] for details. + pub async fn export( + &self, + hash: Hash, + destination: PathBuf, + format: ExportFormat, + mode: ExportMode, + ) -> Result { + let req = BlobExportRequest { + hash, + path: destination, + format, + mode, + }; + let stream = self.rpc.server_streaming(req).await?; + Ok(BlobExportProgress::new(stream.map_err(anyhow::Error::from))) + } + /// List all complete blobs. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(BlobListRequest).await?; @@ -426,6 +456,8 @@ pub struct BlobDownloadOutcome { pub local_size: u64, /// The size of the data we downloaded from the network pub downloaded_size: u64, + /// Statistics about the download + pub stats: iroh_bytes::get::Stats, } /// Progress stream for blob download operations. @@ -503,10 +535,11 @@ impl Future for BlobDownloadProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - DownloadProgress::AllDone => { + DownloadProgress::AllDone(stats) => { let outcome = BlobDownloadOutcome { local_size: self.current_local_size.load(Ordering::Relaxed), downloaded_size: self.current_network_size.load(Ordering::Relaxed), + stats, }; return Poll::Ready(Ok(outcome)); } @@ -520,6 +553,92 @@ impl Future for BlobDownloadProgress { } } +/// Outcome of a blob export operation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BlobExportOutcome { + /// The total size of the exported data. + total_size: u64, +} + +/// Progress stream for blob export operations. +#[derive(derive_more::Debug)] +pub struct BlobExportProgress { + #[debug(skip)] + stream: Pin> + Send + Unpin + 'static>>, + current_total_size: Arc, +} + +impl BlobExportProgress { + /// Create a `BlobExportProgress` that can help you easily poll the `ExportProgress` stream from your download until it is finished or errors. + pub fn new( + stream: (impl Stream, impl Into>> + + Send + + Unpin + + 'static), + ) -> Self { + let current_total_size = Arc::new(AtomicU64::new(0)); + let total_size = current_total_size.clone(); + let stream = stream.map(move |item| match item { + Ok(item) => { + let item = item.into(); + if let ExportProgress::Found { size, .. } = &item { + let size = size.value(); + total_size.fetch_add(size, Ordering::Relaxed); + } + + Ok(item) + } + Err(err) => Err(err.into()), + }); + Self { + stream: Box::pin(stream), + current_total_size, + } + } + + /// Finish writing the stream, ignoring all intermediate progress events. + /// + /// Returns a [`BlobExportOutcome`] which contains the size of the content we exported. + pub async fn finish(self) -> Result { + self.await + } +} + +impl Stream for BlobExportProgress { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_next_unpin(cx) + } +} + +impl Future for BlobExportProgress { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match self.stream.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Ok(msg))) => match msg { + ExportProgress::AllDone => { + let outcome = BlobExportOutcome { + total_size: self.current_total_size.load(Ordering::Relaxed), + }; + return Poll::Ready(Ok(outcome)); + } + ExportProgress::Abort(err) => { + return Poll::Ready(Err(err.into())); + } + _ => {} + }, + } + } + } +} + /// Data reader for a single blob. /// /// Implements [`AsyncRead`]. diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 61a0c3db0c..8d0c8d6776 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -10,7 +10,7 @@ use iroh_base::rpc::RpcResult; use iroh_bytes::export::ExportProgress; use iroh_bytes::format::collection::Collection; use iroh_bytes::get::db::DownloadProgress; -use iroh_bytes::store::{ExportMode, ImportProgress, MapEntry}; +use iroh_bytes::store::{ExportFormat, ImportProgress, MapEntry}; use iroh_bytes::util::progress::{IdGenerator, ProgressSender}; use iroh_bytes::BlobFormat; use iroh_bytes::{ @@ -32,12 +32,12 @@ use tracing::{debug, info}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, - BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, - BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, - BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocImportProgress, - DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse, + BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, + BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, + BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadAtRequest, + BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, + DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, + DocImportFileResponse, DocImportProgress, DocSetHashRequest, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, @@ -102,6 +102,7 @@ impl Handler { chan.server_streaming(msg, handler, Self::blob_download) .await } + BlobExport(msg) => chan.server_streaming(msg, handler, Self::blob_export).await, BlobValidate(msg) => { chan.server_streaming(msg, handler, Self::blob_validate) .await @@ -548,7 +549,7 @@ impl Handler { &self.inner.db, entry.content_hash(), path, - false, + ExportFormat::Blob, mode, export_progress, ) @@ -566,7 +567,6 @@ impl Handler { format, peer, tag, - out, } = msg; let db = self.inner.db.clone(); @@ -584,22 +584,40 @@ impl Handler { self.inner.rt.spawn_pinned(move || async move { if let Err(err) = - download_and_export(db, get_conn, hash_and_format, out, tag, progress.clone()).await + download_blob(db, get_conn, hash_and_format, tag, progress.clone()).await { progress .send(DownloadProgress::Abort(err.into())) .await .ok(); - drop(temp_pin); - } else { - drop(temp_pin); - progress.send(DownloadProgress::AllDone).await.ok(); } + drop(temp_pin); }); receiver.into_stream().map(BlobDownloadResponse) } + fn blob_export(self, msg: BlobExportRequest) -> impl Stream { + let (tx, rx) = flume::bounded(1024); + let progress = FlumeProgressSender::new(tx); + self.rt().spawn_pinned(move || async move { + let res = iroh_bytes::export::export( + &self.inner.db, + msg.hash, + msg.path, + msg.format, + msg.mode, + progress.clone(), + ) + .await; + match res { + Ok(()) => progress.send(ExportProgress::AllDone).await.ok(), + Err(err) => progress.send(ExportProgress::Abort(err.into())).await.ok(), + } + }); + rx.into_stream().map(BlobExportResponse) + } + async fn blob_add_from_path0( self, msg: BlobAddPathRequest, @@ -992,11 +1010,10 @@ impl Handler { } } -async fn download_and_export( +async fn download_blob( db: D, get_conn: C, hash_and_format: HashAndFormat, - out: DownloadLocation, tag: SetTagOption, progress: impl ProgressSender + IdGenerator, ) -> Result<()> @@ -1008,35 +1025,6 @@ where let stats = iroh_bytes::get::db::get_to_db(&db, get_conn, &hash_and_format, progress.clone()).await?; - progress - .send(DownloadProgress::NetworkDone(stats)) - .await - .ok(); - - match out { - DownloadLocation::External { path, in_place } => { - let mode = match in_place { - true => ExportMode::TryReference, - false => ExportMode::Copy, - }; - - let export_progress = progress.clone().with_map(DownloadProgress::Export); - iroh_bytes::export::export( - &db, - hash_and_format.hash, - path, - hash_and_format.format.is_hash_seq(), - mode, - export_progress, - ) - .await?; - } - - DownloadLocation::Internal => { - // nothing to do - } - } - match tag { SetTagOption::Named(tag) => { db.set_tag(tag, Some(hash_and_format)).await?; @@ -1046,5 +1034,7 @@ where } } + progress.send(DownloadProgress::AllDone(stats)).await.ok(); + Ok(()) } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 649468c880..0b15200918 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -30,7 +30,7 @@ use quic_rpc::{ use serde::{Deserialize, Serialize}; pub use iroh_base::rpc::{RpcError, RpcResult}; -use iroh_bytes::store::ExportMode; +use iroh_bytes::store::{ExportFormat, ExportMode}; pub use iroh_bytes::{provider::AddProgress, store::ValidateProgress}; use crate::sync_engine::LiveEvent; @@ -104,27 +104,6 @@ pub struct BlobDownloadRequest { pub peer: NodeAddr, /// Optional tag to tag the data with. pub tag: SetTagOption, - /// This field contains the location to store the data at. - pub out: DownloadLocation, -} - -/// Location to store a downloaded blob at. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum DownloadLocation { - /// Store in the node's blob storage directory. - Internal, - /// Store at the provided path. - External { - /// The path to store the data at. - path: PathBuf, - /// If this flag is true, the data is shared in place, i.e. it is moved to the - /// out path instead of being copied. The database itself contains only a - /// reference to the out path of the file. - /// - /// If the data is modified in the location specified by the out path, - /// download attempts for the associated hash will fail. - in_place: bool, - }, } impl Msg for BlobDownloadRequest { @@ -139,6 +118,37 @@ impl ServerStreamingMsg for BlobDownloadRequest { #[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] pub struct BlobDownloadResponse(pub DownloadProgress); +/// A request to the node to download and share the data specified by the hash. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlobExportRequest { + /// The hash of the blob to export. + pub hash: Hash, + /// The filepath to where the data should be saved + /// + /// This should be an absolute path valid for the file system on which + /// the node runs. + pub path: PathBuf, + /// Set to [`ExportFormat::Collection`] if the `hash` refers to a [`Collection`] and you want + /// to export all children of the collection into individual files. + pub format: ExportFormat, + /// The mode of exporting. + /// + /// The default is [`ExportMode::Copy`]. See [`ExportMode`] for details. + pub mode: ExportMode, +} + +impl Msg for BlobExportRequest { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for BlobExportRequest { + type Response = BlobExportResponse; +} + +/// Progress resposne for [`BlobExportRequest`] +#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct BlobExportResponse(pub ExportProgress); + /// A request to the node to validate the integrity of all provided data #[derive(Debug, Serialize, Deserialize)] pub struct BlobValidateRequest { @@ -1056,6 +1066,7 @@ pub enum ProviderRequest { BlobAddStreamUpdate(BlobAddStreamUpdate), BlobAddPath(BlobAddPathRequest), BlobDownload(BlobDownloadRequest), + BlobExport(BlobExportRequest), BlobList(BlobListRequest), BlobListIncomplete(BlobListIncompleteRequest), BlobListCollections(BlobListCollectionsRequest), @@ -1112,6 +1123,7 @@ pub enum ProviderResponse { BlobListIncomplete(RpcResult), BlobListCollections(RpcResult), BlobDownload(BlobDownloadResponse), + BlobExport(BlobExportResponse), BlobValidate(ValidateProgress), CreateCollection(RpcResult), BlobGetCollection(RpcResult),