Skip to content

Commit

Permalink
feat(provider): add 'CollectionAdded' Provider event
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Jun 23, 2023
1 parent de43b59 commit 5a16586
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 2 deletions.
5 changes: 5 additions & 0 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ pub use ticket::Ticket;
/// Events emitted by the provider informing about the current status.
#[derive(Debug, Clone)]
pub enum Event {
/// A new collection has been added via an RPC call
CollectionAdded {
/// The hash of the added collection
hash: Hash,
},
/// A new client connected to the node.
ClientConnected {
/// An unique connection id.
Expand Down
1 change: 1 addition & 0 deletions iroh/src/commands/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ async fn get_keypair(key: Option<PathBuf>) -> Result<Keypair> {
}
}

/// Makes a an RPC endpoint that uses a QUIC transport
fn make_rpc_endpoint(
keypair: &Keypair,
rpc_port: u16,
Expand Down
80 changes: 78 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use quic_rpc::{RpcClient, RpcServer, ServiceConnection, ServiceEndpoint};
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

use crate::rpc_protocol::{
AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse,
Expand Down Expand Up @@ -585,13 +585,19 @@ impl RpcHandler {
let data_sources = iroh_bytes::provider::create_data_sources(root)?;
// create the collection
// todo: provide feedback for progress
let (db, _) = iroh_bytes::provider::collection::create_collection(
let (db, hash) = iroh_bytes::provider::collection::create_collection(
data_sources,
Progress::new(progress),
)
.await?;
self.inner.db.union_with(db);

if let Err(e) = self.inner.events.send(Event::ByteProvide(
iroh_bytes::provider::Event::CollectionAdded { hash },
)) {
warn!("failed to send CollectionAdded event: {:?}", e);
};

Ok(())
}
async fn version(self, _: VersionRequest) -> VersionResponse {
Expand Down Expand Up @@ -699,6 +705,8 @@ pub fn make_server_config(

#[cfg(test)]
mod tests {
use futures::StreamExt;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::path::Path;

Expand Down Expand Up @@ -728,4 +736,72 @@ mod tests {
println!("addrs: {:?}", ticket.addrs());
assert!(!ticket.addrs().is_empty());
}

#[tokio::test]
async fn test_node_add_collection_event() {
let db = Database::from(HashMap::new());
let node = Builder::with_db(db)
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
.runtime(&test_runtime())
.spawn()
.await
.unwrap();

let _drop_guard = node.cancel_token().drop_guard();

let mut events = node.subscribe();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let mut got_hash = None;
while let Ok(msg) = events.recv().await {
match msg {
Event::ByteProvide(e) => {
if let iroh_bytes::provider::Event::CollectionAdded { hash } = e {
got_hash = Some(hash);
break;
}
}
}
}
tx.send(got_hash.unwrap()).unwrap();
});

let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let stream = node
.controller()
.server_streaming(ProvideRequest {
path: Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
})
.await
.unwrap();

let mut stream = stream;

while let Some(item) = stream.next().await {
match item.unwrap() {
ProvideProgress::AllDone { hash } => {
return Ok(hash);
}
ProvideProgress::Abort(e) => {
anyhow::bail!("Error while adding data: {e}");
}
_ => {}
}
}
anyhow::bail!("stream ended without providing data");
})
.await
.expect("timeout")
.expect("get failed");

match rx.await {
Ok(event_hash) => {
// assert!(Some(AuthToken::from(vec![1, 2, 3, 4, 5, 6])) == token);
assert_eq!(got_hash, event_hash);
}
Err(e) => {
panic!("error receiving token: {:?}", e);
}
}
}
}

0 comments on commit 5a16586

Please sign in to comment.