Skip to content

Commit

Permalink
feat: add api to list collections
Browse files Browse the repository at this point in the history
This splits out listing content by blobs and collections
  • Loading branch information
dignifiedquire committed Jun 2, 2023
1 parent dfcbf7e commit 7b0a7c7
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::util::Hash;
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct Collection {
/// Links to the blobs in this collection
blobs: Vec<Blob>,
pub(crate) blobs: Vec<Blob>,
/// The total size of the raw_data referred to by all links
total_blobs_size: u64,
pub(crate) total_blobs_size: u64,
}

impl Collection {
Expand Down
50 changes: 41 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use iroh::protocol::{GetRequest, RangeSpecSeq};
use iroh::provider::{Database, Provider, Ticket};
use iroh::rpc_protocol::*;
use iroh::rpc_protocol::{
ListRequest, ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, VersionRequest,
ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, VersionRequest,
};
use quic_rpc::transport::quinn::{QuinnConnection, QuinnServerEndpoint};
use quic_rpc::{RpcClient, ServiceEndpoint};
Expand Down Expand Up @@ -123,12 +123,9 @@ enum Commands {
#[clap(long, default_value_t = ProviderRpcPort::Enabled(DEFAULT_RPC_PORT))]
rpc_port: ProviderRpcPort,
},
/// List hashes on the running provider.
List {
/// RPC port of the provider
#[clap(long, default_value_t = DEFAULT_RPC_PORT)]
rpc_port: u16,
},
/// List availble content on the provider.
#[clap(subcommand)]
List(ListCommands),
/// Validate hashes on the running provider.
Validate {
/// RPC port of the provider
Expand Down Expand Up @@ -198,6 +195,22 @@ enum Commands {
},
}

#[derive(Subcommand, Debug, Clone)]
enum ListCommands {
/// List the available blobs on the running provider.
Blobs {
/// RPC port of the provider
#[clap(long, default_value_t = DEFAULT_RPC_PORT)]
rpc_port: u16,
},
/// List the available collections on the running provider.
Collections {
/// RPC port of the provider
#[clap(long, default_value_t = DEFAULT_RPC_PORT)]
rpc_port: u16,
},
}

// Note about writing to STDOUT vs STDERR
// Looking at https://unix.stackexchange.com/questions/331611/do-progress-reports-logging-information-belong-on-stderr-or-stdout
// it is a little complicated.
Expand Down Expand Up @@ -616,9 +629,9 @@ async fn main_impl() -> Result<()> {
drop(fut);
Ok(())
}
Commands::List { rpc_port } => {
Commands::List(ListCommands::Blobs { rpc_port }) => {
let client = make_rpc_client(rpc_port).await?;
let mut response = client.server_streaming(ListRequest).await?;
let mut response = client.server_streaming(ListBlobsRequest).await?;
while let Some(item) = response.next().await {
let item = item?;
println!(
Expand All @@ -630,6 +643,25 @@ async fn main_impl() -> Result<()> {
}
Ok(())
}
Commands::List(ListCommands::Collections { rpc_port }) => {
let client = make_rpc_client(rpc_port).await?;
let mut response = client.server_streaming(ListCollectionsRequest).await?;
while let Some(collection) = response.next().await {
let collection = collection?;
println!(
"{}: {} {} ({})",
Blake3Cid(collection.hash),
collection.total_blobs_count,
if collection.total_blobs_count > 1 {
"blobs"
} else {
"blob"
},
HumanBytes(collection.total_blobs_size),
);
}
Ok(())
}
Commands::Validate { rpc_port } => {
let client = make_rpc_client(rpc_port).await?;
let mut state = ValidateProgressState::new();
Expand Down
20 changes: 20 additions & 0 deletions src/provider/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::BlobOrCollection;
use crate::{
blobs::Collection,
rpc_protocol::ValidateProgress,
util::{validate_bao, BaoValidationError},
Hash,
Expand Down Expand Up @@ -439,6 +440,25 @@ impl Database {
items.into_iter()
}

/// Iterate over all collections in the database.
pub fn collections(&self) -> impl Iterator<Item = (Hash, Collection)> + 'static {
let items = self
.0
.read()
.unwrap()
.iter()
.filter_map(|(hash, v)| match v {
BlobOrCollection::Blob { .. } => None,
BlobOrCollection::Collection { data, .. } => {
Collection::from_bytes(&data[..]).ok().map(|c| (*hash, c))
}
})
.collect::<Vec<_>>();
// todo: make this a proper lazy iterator at some point
// e.g. by using an immutable map or a real database that supports snapshots.
items.into_iter()
}

/// Unwrap into the inner HashMap
pub fn to_inner(&self) -> HashMap<Hash, BlobOrCollection> {
self.0.read().unwrap().clone()
Expand Down
40 changes: 33 additions & 7 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ use crate::protocol::{
read_lp, write_lp, Closed, GetRequest, Handshake, RangeSpec, Request, VERSION,
};
use crate::rpc_protocol::{
AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListRequest, ListResponse, ProvideProgress,
ProvideRequest, ProviderRequest, ProviderResponse, ProviderService, ShutdownRequest,
ValidateProgress, ValidateRequest, VersionRequest, VersionResponse, WatchRequest,
WatchResponse,
AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse,
ListCollectionsRequest, ListCollectionsResponse, ProvideProgress, ProvideRequest,
ProviderRequest, ProviderResponse, ProviderService, ShutdownRequest, ValidateProgress,
ValidateRequest, VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};
use crate::tls::{self, Keypair, PeerId};
use crate::tokio_util::read_as_bytes;
Expand Down Expand Up @@ -544,12 +544,31 @@ struct RpcHandler {
}

impl RpcHandler {
fn list(self, _msg: ListRequest) -> impl Stream<Item = ListResponse> + Send + 'static {
fn list_blobs(
self,
_msg: ListBlobsRequest,
) -> impl Stream<Item = ListBlobsResponse> + Send + 'static {
let items = self
.inner
.db
.blobs()
.map(|(hash, path, size)| ListResponse { hash, path, size });
.map(|(hash, path, size)| ListBlobsResponse { hash, path, size });
futures::stream::iter(items)
}

fn list_collections(
self,
_msg: ListCollectionsRequest,
) -> impl Stream<Item = ListCollectionsResponse> + Send + 'static {
let items = self
.inner
.db
.collections()
.map(|(hash, collection)| ListCollectionsResponse {
hash,
total_blobs_count: collection.blobs.len(),
total_blobs_size: collection.total_blobs_size,
});
futures::stream::iter(items)
}

Expand Down Expand Up @@ -676,7 +695,14 @@ fn handle_rpc_request<C: ServiceEndpoint<ProviderService>>(
tokio::spawn(async move {
use ProviderRequest::*;
match msg {
List(msg) => chan.server_streaming(msg, handler, RpcHandler::list).await,
ListBlobs(msg) => {
chan.server_streaming(msg, handler, RpcHandler::list_blobs)
.await
}
ListCollections(msg) => {
chan.server_streaming(msg, handler, RpcHandler::list_collections)
.await
}
Provide(msg) => {
chan.server_streaming(msg, handler, RpcHandler::provide)
.await
Expand Down
34 changes: 27 additions & 7 deletions src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,39 @@ impl ServerStreamingMsg<ProviderService> for ValidateRequest {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListRequest;
pub struct ListBlobsRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct ListResponse {
pub struct ListBlobsResponse {
pub path: PathBuf,
pub hash: Hash,
pub size: u64,
}

impl Msg<ProviderService> for ListRequest {
impl Msg<ProviderService> for ListBlobsRequest {
type Pattern = ServerStreaming;
}

impl ServerStreamingMsg<ProviderService> for ListRequest {
type Response = ListResponse;
impl ServerStreamingMsg<ProviderService> for ListBlobsRequest {
type Response = ListBlobsResponse;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListCollectionsRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct ListCollectionsResponse {
pub hash: Hash,
pub total_blobs_count: usize,
pub total_blobs_size: u64,
}

impl Msg<ProviderService> for ListCollectionsRequest {
type Pattern = ServerStreaming;
}

impl ServerStreamingMsg<ProviderService> for ListCollectionsRequest {
type Response = ListCollectionsResponse;
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -160,7 +178,8 @@ pub struct ProviderService;
pub enum ProviderRequest {
Watch(WatchRequest),
Version(VersionRequest),
List(ListRequest),
ListBlobs(ListBlobsRequest),
ListCollections(ListCollectionsRequest),
Provide(ProvideRequest),
Id(IdRequest),
Addrs(AddrsRequest),
Expand All @@ -173,7 +192,8 @@ pub enum ProviderRequest {
pub enum ProviderResponse {
Watch(WatchResponse),
Version(VersionResponse),
List(ListResponse),
ListBlobs(ListBlobsResponse),
ListCollections(ListCollectionsResponse),
Provide(ProvideProgress),
Id(IdResponse),
Addrs(AddrsResponse),
Expand Down

0 comments on commit 7b0a7c7

Please sign in to comment.