From 7b0a7c7b7ef9aab4b12970d91e615c74eeb792be Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 2 Jun 2023 12:02:55 +0200 Subject: [PATCH] feat: add api to list collections This splits out listing content by blobs and collections --- src/blobs.rs | 4 ++-- src/main.rs | 50 ++++++++++++++++++++++++++++++++-------- src/provider/database.rs | 20 ++++++++++++++++ src/provider/mod.rs | 40 ++++++++++++++++++++++++++------ src/rpc_protocol.rs | 34 +++++++++++++++++++++------ 5 files changed, 123 insertions(+), 25 deletions(-) diff --git a/src/blobs.rs b/src/blobs.rs index 1bfdbf4205..84e2b7e161 100644 --- a/src/blobs.rs +++ b/src/blobs.rs @@ -8,9 +8,9 @@ use crate::util::Hash; #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct Collection { /// Links to the blobs in this collection - blobs: Vec, + pub(crate) blobs: Vec, /// The total size of the raw_data referred to by all links - total_blobs_size: u64, + pub(crate) total_blobs_size: u64, } impl Collection { diff --git a/src/main.rs b/src/main.rs index 7311b6eb1e..41339e5aec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use iroh::protocol::{GetRequest, RangeSpecSeq}; use iroh::provider::{Database, Provider, Ticket}; use iroh::rpc_protocol::*; use iroh::rpc_protocol::{ - ListRequest, ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, VersionRequest, + ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, VersionRequest, }; use quic_rpc::transport::quinn::{QuinnConnection, QuinnServerEndpoint}; use quic_rpc::{RpcClient, ServiceEndpoint}; @@ -123,12 +123,9 @@ enum Commands { #[clap(long, default_value_t = ProviderRpcPort::Enabled(DEFAULT_RPC_PORT))] rpc_port: ProviderRpcPort, }, - /// List hashes on the running provider. - List { - /// RPC port of the provider - #[clap(long, default_value_t = DEFAULT_RPC_PORT)] - rpc_port: u16, - }, + /// List availble content on the provider. + #[clap(subcommand)] + List(ListCommands), /// Validate hashes on the running provider. Validate { /// RPC port of the provider @@ -198,6 +195,22 @@ enum Commands { }, } +#[derive(Subcommand, Debug, Clone)] +enum ListCommands { + /// List the available blobs on the running provider. + Blobs { + /// RPC port of the provider + #[clap(long, default_value_t = DEFAULT_RPC_PORT)] + rpc_port: u16, + }, + /// List the available collections on the running provider. + Collections { + /// RPC port of the provider + #[clap(long, default_value_t = DEFAULT_RPC_PORT)] + rpc_port: u16, + }, +} + // Note about writing to STDOUT vs STDERR // Looking at https://unix.stackexchange.com/questions/331611/do-progress-reports-logging-information-belong-on-stderr-or-stdout // it is a little complicated. @@ -616,9 +629,9 @@ async fn main_impl() -> Result<()> { drop(fut); Ok(()) } - Commands::List { rpc_port } => { + Commands::List(ListCommands::Blobs { rpc_port }) => { let client = make_rpc_client(rpc_port).await?; - let mut response = client.server_streaming(ListRequest).await?; + let mut response = client.server_streaming(ListBlobsRequest).await?; while let Some(item) = response.next().await { let item = item?; println!( @@ -630,6 +643,25 @@ async fn main_impl() -> Result<()> { } Ok(()) } + Commands::List(ListCommands::Collections { rpc_port }) => { + let client = make_rpc_client(rpc_port).await?; + let mut response = client.server_streaming(ListCollectionsRequest).await?; + while let Some(collection) = response.next().await { + let collection = collection?; + println!( + "{}: {} {} ({})", + Blake3Cid(collection.hash), + collection.total_blobs_count, + if collection.total_blobs_count > 1 { + "blobs" + } else { + "blob" + }, + HumanBytes(collection.total_blobs_size), + ); + } + Ok(()) + } Commands::Validate { rpc_port } => { let client = make_rpc_client(rpc_port).await?; let mut state = ValidateProgressState::new(); diff --git a/src/provider/database.rs b/src/provider/database.rs index cbb224271d..b11fd72f26 100644 --- a/src/provider/database.rs +++ b/src/provider/database.rs @@ -1,5 +1,6 @@ use super::BlobOrCollection; use crate::{ + blobs::Collection, rpc_protocol::ValidateProgress, util::{validate_bao, BaoValidationError}, Hash, @@ -439,6 +440,25 @@ impl Database { items.into_iter() } + /// Iterate over all collections in the database. + pub fn collections(&self) -> impl Iterator + 'static { + let items = self + .0 + .read() + .unwrap() + .iter() + .filter_map(|(hash, v)| match v { + BlobOrCollection::Blob { .. } => None, + BlobOrCollection::Collection { data, .. } => { + Collection::from_bytes(&data[..]).ok().map(|c| (*hash, c)) + } + }) + .collect::>(); + // todo: make this a proper lazy iterator at some point + // e.g. by using an immutable map or a real database that supports snapshots. + items.into_iter() + } + /// Unwrap into the inner HashMap pub fn to_inner(&self) -> HashMap { self.0.read().unwrap().clone() diff --git a/src/provider/mod.rs b/src/provider/mod.rs index aeb4feb1fd..8a99bd7b65 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -42,10 +42,10 @@ use crate::protocol::{ read_lp, write_lp, Closed, GetRequest, Handshake, RangeSpec, Request, VERSION, }; use crate::rpc_protocol::{ - AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListRequest, ListResponse, ProvideProgress, - ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, ShutdownRequest, - ValidateProgress, ValidateRequest, VersionRequest, VersionResponse, WatchRequest, - WatchResponse, + AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse, + ListCollectionsRequest, ListCollectionsResponse, ProvideProgress, ProvideRequest, + ProviderRequest, ProviderResponse, ProviderService, ShutdownRequest, ValidateProgress, + ValidateRequest, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; use crate::tls::{self, Keypair, PeerId}; use crate::tokio_util::read_as_bytes; @@ -544,12 +544,31 @@ struct RpcHandler { } impl RpcHandler { - fn list(self, _msg: ListRequest) -> impl Stream + Send + 'static { + fn list_blobs( + self, + _msg: ListBlobsRequest, + ) -> impl Stream + Send + 'static { let items = self .inner .db .blobs() - .map(|(hash, path, size)| ListResponse { hash, path, size }); + .map(|(hash, path, size)| ListBlobsResponse { hash, path, size }); + futures::stream::iter(items) + } + + fn list_collections( + self, + _msg: ListCollectionsRequest, + ) -> impl Stream + Send + 'static { + let items = self + .inner + .db + .collections() + .map(|(hash, collection)| ListCollectionsResponse { + hash, + total_blobs_count: collection.blobs.len(), + total_blobs_size: collection.total_blobs_size, + }); futures::stream::iter(items) } @@ -676,7 +695,14 @@ fn handle_rpc_request>( tokio::spawn(async move { use ProviderRequest::*; match msg { - List(msg) => chan.server_streaming(msg, handler, RpcHandler::list).await, + ListBlobs(msg) => { + chan.server_streaming(msg, handler, RpcHandler::list_blobs) + .await + } + ListCollections(msg) => { + chan.server_streaming(msg, handler, RpcHandler::list_collections) + .await + } Provide(msg) => { chan.server_streaming(msg, handler, RpcHandler::provide) .await diff --git a/src/rpc_protocol.rs b/src/rpc_protocol.rs index 50ba8b50f5..2cd1b51165 100644 --- a/src/rpc_protocol.rs +++ b/src/rpc_protocol.rs @@ -71,21 +71,39 @@ impl ServerStreamingMsg for ValidateRequest { } #[derive(Debug, Serialize, Deserialize)] -pub struct ListRequest; +pub struct ListBlobsRequest; #[derive(Debug, Serialize, Deserialize)] -pub struct ListResponse { +pub struct ListBlobsResponse { pub path: PathBuf, pub hash: Hash, pub size: u64, } -impl Msg for ListRequest { +impl Msg for ListBlobsRequest { type Pattern = ServerStreaming; } -impl ServerStreamingMsg for ListRequest { - type Response = ListResponse; +impl ServerStreamingMsg for ListBlobsRequest { + type Response = ListBlobsResponse; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListCollectionsRequest; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListCollectionsResponse { + pub hash: Hash, + pub total_blobs_count: usize, + pub total_blobs_size: u64, +} + +impl Msg for ListCollectionsRequest { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for ListCollectionsRequest { + type Response = ListCollectionsResponse; } #[derive(Serialize, Deserialize, Debug)] @@ -160,7 +178,8 @@ pub struct ProviderService; pub enum ProviderRequest { Watch(WatchRequest), Version(VersionRequest), - List(ListRequest), + ListBlobs(ListBlobsRequest), + ListCollections(ListCollectionsRequest), Provide(ProvideRequest), Id(IdRequest), Addrs(AddrsRequest), @@ -173,7 +192,8 @@ pub enum ProviderRequest { pub enum ProviderResponse { Watch(WatchResponse), Version(VersionResponse), - List(ListResponse), + ListBlobs(ListBlobsResponse), + ListCollections(ListCollectionsResponse), Provide(ProvideProgress), Id(IdResponse), Addrs(AddrsResponse),