Skip to content

Commit

Permalink
implement some collection related things on the client side
Browse files Browse the repository at this point in the history
this allows us to reduce the rpc api
  • Loading branch information
rklaehn committed Jun 6, 2024
1 parent 98914ee commit a8534ef
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 122 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn export_collection<D: BaoStore>(
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&outpath).await?;
let collection = Collection::load(db, &hash).await?;
let collection = Collection::load_db(db, &hash).await?;
for (name, hash) in collection.into_iter() {
#[allow(clippy::needless_borrow)]
let path = outpath.join(pathbuf_from_name(&name));
Expand Down
26 changes: 23 additions & 3 deletions iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The collection type used by iroh
use std::collections::BTreeMap;
use std::{collections::BTreeMap, future::Future};

use anyhow::Context;
use bao_tree::blake3;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl IntoIterator for Collection {
}
}

/// A simple store trait for loading blobs
pub trait SimpleStore {
/// Load a blob from the store
fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
Expand All @@ -84,7 +90,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down Expand Up @@ -160,11 +166,25 @@ impl Collection {
Ok((collection, res, stats))
}

/// Create a new collection from a hash sequence and metadata.
pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
let hs = store.load(root).await?;
let hs = HashSeq::try_from(hs)?;
let meta_hash = hs.iter().next().context("empty hash seq")?;
let meta = store.load(meta_hash).await?;
let meta: CollectionMeta = postcard::from_bytes(&meta)?;
anyhow::ensure!(
meta.names.len() + 1 == hs.len(),
"names and links length mismatch"
);
Ok(Self::from_parts(hs.into_iter(), meta))
}

/// Load a collection from a store given a root hash
///
/// This assumes that both the links and the metadata of the collection is stored in the store.
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
pub async fn load_db<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: crate::store::Map,
{
Expand Down
58 changes: 46 additions & 12 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use genawaiter::sync::{Co, Gen};
use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
Expand All @@ -31,13 +32,12 @@ use tracing::warn;

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption,
};

use super::{flatten, Iroh};
use super::{flatten, tags, Iroh};

/// Iroh blobs client.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -322,18 +322,37 @@ where

/// Read the content of a collection.
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
let BlobGetCollectionResponse { collection } =
self.rpc.rpc(BlobGetCollectionRequest { hash }).await??;
Ok(collection)
Ok(Collection::load(hash, self).await?)
}

/// List all collections.
pub async fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let stream = self
.rpc
.server_streaming(BlobListCollectionsRequest)
.await?;
Ok(flatten(stream))
let this = self.clone();
Ok(Gen::new(|co| async move {
if let Err(cause) = this.list_collections_impl(&co).await {
co.yield_(Err(cause)).await;
}
}))
}

async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
let tags = self.tags_client();
let mut tags = tags.list().await?;
while let Some(tag) = tags.next().await {
let tag = tag?;
if tag.format == BlobFormat::HashSeq {
if let Ok(collection) = self.get_collection(tag.hash).await {
let info = CollectionInfo {
tag: tag.name,
hash: tag.hash,
total_blobs_count: Some(collection.len() as u64 + 1),
total_blobs_size: Some(0),
};
co.yield_(Ok(info)).await;
}
}
}
Ok(())
}

/// Delete a blob.
Expand Down Expand Up @@ -366,6 +385,21 @@ where
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client<C> {
tags::Client {
rpc: self.rpc.clone(),
}
}
}

impl<C> SimpleStore for Client<C>
where
C: ServiceConnection<RpcService>,
{
async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
self.read_to_bytes(hash).await
}
}

/// Whether to wrap the added data in a collection.
Expand Down
70 changes: 2 additions & 68 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress,
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
hashseq::parse_hash_seq,
provider::AddProgress,
store::{Store as BaoStore, ValidateProgress},
util::progress::FlumeProgressSender,
Expand All @@ -34,15 +33,14 @@ use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info};

use crate::client::blobs::{
BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
};
use crate::client::tags::TagInfo;
use crate::client::NodeStatus;
use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest,
Expand Down Expand Up @@ -95,12 +93,7 @@ impl<D: BaoStore> Handler<D> {
chan.server_streaming(msg, handler, Self::blob_list_incomplete)
.await
}
BlobListCollections(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_collections)
.await
}
CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await,
BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await,
ListTags(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_tags)
.await
Expand Down Expand Up @@ -348,39 +341,6 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

async fn blob_list_collections_impl(
self,
co: &Co<RpcResult<CollectionInfo>>,
) -> anyhow::Result<()> {
let db = self.inner.db.clone();
let local = self.inner.rt.clone();
let tags = db.tags().await.unwrap();
for item in tags {
let (name, HashAndFormat { hash, format }) = item?;
if !format.is_hash_seq() {
continue;
}
let Some(entry) = db.get(&hash).await? else {
continue;
};
let count = local
.spawn_pinned(|| async move {
let reader = entry.data_reader().await?;
let (_collection, count) = parse_hash_seq(reader).await?;
anyhow::Ok(count)
})
.await??;
co.yield_(Ok(CollectionInfo {
tag: name,
hash,
total_blobs_count: Some(count),
total_blobs_size: None,
}))
.await;
}
Ok(())
}

fn blob_list(
self,
_msg: BlobListRequest,
Expand All @@ -403,17 +363,6 @@ impl<D: BaoStore> Handler<D> {
})
}

fn blob_list_collections(
self,
_msg: BlobListCollectionsRequest,
) -> impl Stream<Item = RpcResult<CollectionInfo>> + Send + 'static {
Gen::new(move |co| async move {
if let Err(e) = self.blob_list_collections_impl(&co).await {
co.yield_(Err(e.into())).await;
}
})
}

async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
Ok(())
Expand Down Expand Up @@ -1044,21 +993,6 @@ impl<D: BaoStore> Handler<D> {

Ok(CreateCollectionResponse { hash, tag })
}

async fn blob_get_collection(
self,
req: BlobGetCollectionRequest,
) -> RpcResult<BlobGetCollectionResponse> {
let hash = req.hash;
let db = self.inner.db.clone();
let collection = self
.rt()
.spawn_pinned(move || async move { Collection::load(&db, &hash).await })
.await
.map_err(|_| anyhow!("join failed"))??;

Ok(BlobGetCollectionResponse { collection })
}
}

async fn download<D>(
Expand Down
39 changes: 1 addition & 38 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress};
use iroh_docs::engine::LiveEvent;

use crate::client::{
blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
docs::{ImportProgress, ShareMode},
tags::TagInfo,
NodeStatus,
Expand Down Expand Up @@ -201,20 +201,6 @@ impl ServerStreamingMsg<RpcService> for BlobListIncompleteRequest {
type Response = RpcResult<IncompleteBlobInfo>;
}

/// List all collections
///
/// Lists all collections that have been explicitly added to the database.
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobListCollectionsRequest;

impl Msg<RpcService> for BlobListCollectionsRequest {
type Pattern = ServerStreaming;
}

impl ServerStreamingMsg<RpcService> for BlobListCollectionsRequest {
type Response = RpcResult<CollectionInfo>;
}

/// List all collections
///
/// Lists all collections that have been explicitly added to the database.
Expand Down Expand Up @@ -250,25 +236,6 @@ pub struct DeleteTagRequest {
impl RpcMsg<RpcService> for DeleteTagRequest {
type Response = RpcResult<()>;
}

/// Get a collection
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobGetCollectionRequest {
/// Hash of the collection
pub hash: Hash,
}

impl RpcMsg<RpcService> for BlobGetCollectionRequest {
type Response = RpcResult<BlobGetCollectionResponse>;
}

/// The response for a `BlobGetCollectionRequest`.
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobGetCollectionResponse {
/// The collection.
pub collection: Collection,
}

/// Create a collection.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateCollectionRequest {
Expand Down Expand Up @@ -1063,12 +1030,10 @@ pub enum Request {
BlobExport(BlobExportRequest),
BlobList(BlobListRequest),
BlobListIncomplete(BlobListIncompleteRequest),
BlobListCollections(BlobListCollectionsRequest),
BlobDeleteBlob(BlobDeleteBlobRequest),
BlobValidate(BlobValidateRequest),
BlobFsck(BlobConsistencyCheckRequest),
CreateCollection(CreateCollectionRequest),
BlobGetCollection(BlobGetCollectionRequest),

DeleteTag(DeleteTagRequest),
ListTags(ListTagsRequest),
Expand Down Expand Up @@ -1123,13 +1088,11 @@ pub enum Response {
BlobAddPath(BlobAddPathResponse),
BlobList(RpcResult<BlobInfo>),
BlobListIncomplete(RpcResult<IncompleteBlobInfo>),
BlobListCollections(RpcResult<CollectionInfo>),
BlobDownload(BlobDownloadResponse),
BlobFsck(ConsistencyCheckProgress),
BlobExport(BlobExportResponse),
BlobValidate(ValidateProgress),
CreateCollection(RpcResult<CreateCollectionResponse>),
BlobGetCollection(RpcResult<BlobGetCollectionResponse>),

ListTags(TagInfo),
DeleteTag(RpcResult<()>),
Expand Down

0 comments on commit a8534ef

Please sign in to comment.