diff --git a/iroh-blobs/src/export.rs b/iroh-blobs/src/export.rs index 75b282fd6c..cdbda28881 100644 --- a/iroh-blobs/src/export.rs +++ b/iroh-blobs/src/export.rs @@ -46,7 +46,7 @@ pub async fn export_collection( progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { tokio::fs::create_dir_all(&outpath).await?; - let collection = Collection::load(db, &hash).await?; + let collection = Collection::load_db(db, &hash).await?; for (name, hash) in collection.into_iter() { #[allow(clippy::needless_borrow)] let path = outpath.join(pathbuf_from_name(&name)); diff --git a/iroh-blobs/src/format/collection.rs b/iroh-blobs/src/format/collection.rs index ab13572cc1..cdf4448e98 100644 --- a/iroh-blobs/src/format/collection.rs +++ b/iroh-blobs/src/format/collection.rs @@ -1,5 +1,5 @@ //! The collection type used by iroh -use std::collections::BTreeMap; +use std::{collections::BTreeMap, future::Future}; use anyhow::Context; use bao_tree::blake3; @@ -64,6 +64,12 @@ impl IntoIterator for Collection { } } +/// A simple store trait for loading blobs +pub trait SimpleStore { + /// Load a blob from the store + fn load(&self, hash: Hash) -> impl Future> + Send + '_; +} + /// Metadata for a collection /// /// This is the wire format for the metadata blob. @@ -84,7 +90,7 @@ impl Collection { /// /// To persist the collection, write all the blobs to storage, and use the /// hash of the last blob as the collection hash. - pub fn to_blobs(&self) -> impl Iterator { + pub fn to_blobs(&self) -> impl DoubleEndedIterator { let meta = CollectionMeta { header: *Self::HEADER, names: self.names(), @@ -160,11 +166,25 @@ impl Collection { Ok((collection, res, stats)) } + /// Create a new collection from a hash sequence and metadata. + pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result { + let hs = store.load(root).await?; + let hs = HashSeq::try_from(hs)?; + let meta_hash = hs.iter().next().context("empty hash seq")?; + let meta = store.load(meta_hash).await?; + let meta: CollectionMeta = postcard::from_bytes(&meta)?; + anyhow::ensure!( + meta.names.len() + 1 == hs.len(), + "names and links length mismatch" + ); + Ok(Self::from_parts(hs.into_iter(), meta)) + } + /// Load a collection from a store given a root hash /// /// This assumes that both the links and the metadata of the collection is stored in the store. /// It does not require that all child blobs are stored in the store. - pub async fn load(db: &D, root: &Hash) -> anyhow::Result + pub async fn load_db(db: &D, root: &Hash) -> anyhow::Result where D: crate::store::Map, { diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 82ea5bd4e9..cb1a9fb2e6 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -467,7 +467,7 @@ impl ListCommands { } } Self::Collections => { - let mut response = iroh.blobs.list_collections().await?; + let mut response = iroh.blobs.list_collections()?; while let Some(item) = response.next().await { let CollectionInfo { tag, diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 61d075e7fc..e1e98cae2e 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result}; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; +use genawaiter::sync::{Co, Gen}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_blobs::{ export::ExportProgress as BytesExportProgress, - format::collection::Collection, + format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, @@ -31,13 +32,12 @@ use tracing::warn; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, - BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, }; -use super::{flatten, Iroh}; +use super::{flatten, tags, Iroh}; /// Iroh blobs client. #[derive(Debug, Clone)] @@ -322,18 +322,35 @@ where /// Read the content of a collection. pub async fn get_collection(&self, hash: Hash) -> Result { - let BlobGetCollectionResponse { collection } = - self.rpc.rpc(BlobGetCollectionRequest { hash }).await??; - Ok(collection) + Collection::load(hash, self).await } /// List all collections. - pub async fn list_collections(&self) -> Result>> { - let stream = self - .rpc - .server_streaming(BlobListCollectionsRequest) - .await?; - Ok(flatten(stream)) + pub fn list_collections(&self) -> Result>> { + let this = self.clone(); + Ok(Gen::new(|co| async move { + if let Err(cause) = this.list_collections_impl(&co).await { + co.yield_(Err(cause)).await; + } + })) + } + + async fn list_collections_impl(&self, co: &Co>) -> Result<()> { + let tags = self.tags_client(); + let mut tags = tags.list_hash_seq().await?; + while let Some(tag) = tags.next().await { + let tag = tag?; + if let Ok(collection) = self.get_collection(tag.hash).await { + let info = CollectionInfo { + tag: tag.name, + hash: tag.hash, + total_blobs_count: Some(collection.len() as u64 + 1), + total_blobs_size: Some(0), + }; + co.yield_(Ok(info)).await; + } + } + Ok(()) } /// Delete a blob. @@ -366,6 +383,21 @@ where Ok(BlobStatus::Partial { size: reader.size }) } } + + fn tags_client(&self) -> tags::Client { + tags::Client { + rpc: self.rpc.clone(), + } + } +} + +impl SimpleStore for Client +where + C: ServiceConnection, +{ + async fn load(&self, hash: Hash) -> anyhow::Result { + self.read_to_bytes(hash).await + } } /// Whether to wrap the added data in a collection. @@ -929,7 +961,7 @@ mod tests { .create_collection(collection, SetTagOption::Auto, tags) .await?; - let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?; + let collections: Vec<_> = client.blobs.list_collections()?.try_collect().await?; assert_eq!(collections.len(), 1); { diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index c2d4309977..c25111e3e3 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -20,7 +20,16 @@ where { /// List all tags. pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListTagsRequest).await?; + let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// List all tags with a hash_seq format. + pub async fn list_hash_seq(&self) -> Result>> { + let stream = self + .rpc + .server_streaming(ListTagsRequest::hash_seq()) + .await?; Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index fc500b7566..56219110f1 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::BlobFormat; use iroh_blobs::{ - hashseq::parse_hash_seq, provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, @@ -33,16 +32,13 @@ use quic_rpc::{ use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; -use crate::client::blobs::{ - BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption, -}; +use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; use crate::client::tags::TagInfo; use crate::client::NodeStatus; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, @@ -95,12 +91,7 @@ impl Handler { chan.server_streaming(msg, handler, Self::blob_list_incomplete) .await } - BlobListCollections(msg) => { - chan.server_streaming(msg, handler, Self::blob_list_collections) - .await - } CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await, - BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await, ListTags(msg) => { chan.server_streaming(msg, handler, Self::blob_list_tags) .await @@ -348,39 +339,6 @@ impl Handler { Ok(()) } - async fn blob_list_collections_impl( - self, - co: &Co>, - ) -> anyhow::Result<()> { - let db = self.inner.db.clone(); - let local = self.inner.rt.clone(); - let tags = db.tags().await.unwrap(); - for item in tags { - let (name, HashAndFormat { hash, format }) = item?; - if !format.is_hash_seq() { - continue; - } - let Some(entry) = db.get(&hash).await? else { - continue; - }; - let count = local - .spawn_pinned(|| async move { - let reader = entry.data_reader().await?; - let (_collection, count) = parse_hash_seq(reader).await?; - anyhow::Ok(count) - }) - .await??; - co.yield_(Ok(CollectionInfo { - tag: name, - hash, - total_blobs_count: Some(count), - total_blobs_size: None, - })) - .await; - } - Ok(()) - } - fn blob_list( self, _msg: BlobListRequest, @@ -403,17 +361,6 @@ impl Handler { }) } - fn blob_list_collections( - self, - _msg: BlobListCollectionsRequest, - ) -> impl Stream> + Send + 'static { - Gen::new(move |co| async move { - if let Err(e) = self.blob_list_collections_impl(&co).await { - co.yield_(Err(e.into())).await; - } - }) - } - async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> { self.inner.db.set_tag(msg.name, None).await?; Ok(()) @@ -424,15 +371,16 @@ impl Handler { Ok(()) } - fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { + fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { tracing::info!("blob_list_tags"); Gen::new(|co| async move { let tags = self.inner.db.tags().await.unwrap(); #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { - tracing::info!("{:?} {} {:?}", name, hash, format); - co.yield_(TagInfo { name, hash, format }).await; + if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { + co.yield_(TagInfo { name, hash, format }).await; + } } } }) @@ -1044,21 +992,6 @@ impl Handler { Ok(CreateCollectionResponse { hash, tag }) } - - async fn blob_get_collection( - self, - req: BlobGetCollectionRequest, - ) -> RpcResult { - let hash = req.hash; - let db = self.inner.db.clone(); - let collection = self - .rt() - .spawn_pinned(move || async move { Collection::load(&db, &hash).await }) - .await - .map_err(|_| anyhow!("join failed"))??; - - Ok(BlobGetCollectionResponse { collection }) - } } async fn download( diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index ccfbc45671..8fe71e7d6a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -44,7 +44,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; use iroh_docs::engine::LiveEvent; use crate::client::{ - blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, docs::{ImportProgress, ShareMode}, tags::TagInfo, NodeStatus, @@ -205,22 +205,39 @@ impl ServerStreamingMsg for BlobListIncompleteRequest { /// /// Lists all collections that have been explicitly added to the database. #[derive(Debug, Serialize, Deserialize)] -pub struct BlobListCollectionsRequest; - -impl Msg for BlobListCollectionsRequest { - type Pattern = ServerStreaming; +pub struct ListTagsRequest { + /// List raw tags + pub raw: bool, + /// List hash seq tags + pub hash_seq: bool, +} + +impl ListTagsRequest { + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + } + } } -impl ServerStreamingMsg for BlobListCollectionsRequest { - type Response = RpcResult; -} - -/// List all collections -/// -/// Lists all collections that have been explicitly added to the database. -#[derive(Debug, Serialize, Deserialize)] -pub struct ListTagsRequest; - impl Msg for ListTagsRequest { type Pattern = ServerStreaming; } @@ -250,25 +267,6 @@ pub struct DeleteTagRequest { impl RpcMsg for DeleteTagRequest { type Response = RpcResult<()>; } - -/// Get a collection -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionRequest { - /// Hash of the collection - pub hash: Hash, -} - -impl RpcMsg for BlobGetCollectionRequest { - type Response = RpcResult; -} - -/// The response for a `BlobGetCollectionRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionResponse { - /// The collection. - pub collection: Collection, -} - /// Create a collection. #[derive(Debug, Serialize, Deserialize)] pub struct CreateCollectionRequest { @@ -1063,12 +1061,10 @@ pub enum Request { BlobExport(BlobExportRequest), BlobList(BlobListRequest), BlobListIncomplete(BlobListIncompleteRequest), - BlobListCollections(BlobListCollectionsRequest), BlobDeleteBlob(BlobDeleteBlobRequest), BlobValidate(BlobValidateRequest), BlobFsck(BlobConsistencyCheckRequest), CreateCollection(CreateCollectionRequest), - BlobGetCollection(BlobGetCollectionRequest), DeleteTag(DeleteTagRequest), ListTags(ListTagsRequest), @@ -1123,13 +1119,11 @@ pub enum Response { BlobAddPath(BlobAddPathResponse), BlobList(RpcResult), BlobListIncomplete(RpcResult), - BlobListCollections(RpcResult), BlobDownload(BlobDownloadResponse), BlobFsck(ConsistencyCheckProgress), BlobExport(BlobExportResponse), BlobValidate(ValidateProgress), CreateCollection(RpcResult), - BlobGetCollection(RpcResult), ListTags(TagInfo), DeleteTag(RpcResult<()>),