diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index beff870a45..f6bb0d5f9f 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -226,12 +226,15 @@ impl + PublicKeyStore + 'static> Replica { /// received from in a loop. If not receiving, local and remote inserts will hang waiting for /// the receiver to be received from. // TODO: Allow to clear a previous subscription? - pub fn subscribe(&self) -> Option> { + pub fn subscribe( + &self, + channel_cap: usize, + ) -> Option> { let mut on_insert_sender = self.on_insert_sender.write(); match &*on_insert_sender { Some(_sender) => None, None => { - let (s, r) = flume::bounded(16); // TODO: should this be configurable? + let (s, r) = flume::bounded(channel_cap); *on_insert_sender = Some(s); Some(r) } diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 6c9c2f377b..5061679e58 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -38,7 +38,7 @@ impl SyncEngine { /// engine with [`Self::start_sync`], then new entries inserted locally will be sent to peers /// through iroh-gossip. /// - /// The engine will also register for [`Replica::subscribe`] events to download content for new + /// The engine will also subscribe to replica events to download content for new /// entries from peers. pub fn spawn( rt: Handle, diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 6a8ab50822..f8b2eb7c7d 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -41,7 +41,10 @@ use tracing::{debug, error, error_span, warn, Instrument}; pub use iroh_sync::ContentStatus; -const CHANNEL_CAP: usize = 8; +/// Capacity of the channel for the [`ToActor`] messages. +const ACTOR_CHANNEL_CAP: usize = 8; +/// Capacity of the subscription channel for replica events. +const REPLICA_SUBSCRIBE_CHANNEL_CAP: usize = 64; /// An iroh-sync operation /// @@ -194,7 +197,7 @@ impl LiveSync { bao_store: B, downloader: Downloader, ) -> Self { - let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP); + let (to_actor_tx, to_actor_rx) = mpsc::channel(ACTOR_CHANNEL_CAP); let me = endpoint.peer_id().fmt_short(); let mut actor = Actor::new( endpoint, @@ -627,7 +630,7 @@ impl Actor { // setup event subscription. let events = replica - .subscribe() + .subscribe(REPLICA_SUBSCRIBE_CHANNEL_CAP) .ok_or_else(|| anyhow::anyhow!("trying to subscribe twice to the same replica"))?; self.replica_events.push(events.into_stream()); diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index a20d7502d9..0141a81b64 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -24,7 +24,8 @@ use iroh_sync::{ AuthorId, ContentStatus, Entry, NamespaceId, }; -const LIMIT: Duration = Duration::from_secs(15); +/// Time limit for event collection in sync tests. +const LIMIT: Duration = Duration::from_secs(30); /// Pick up the tokio runtime from the thread local and add a /// thread per core runtime. @@ -339,7 +340,7 @@ async fn sync_big() -> Result<()> { let rt = test_runtime(); let n_nodes = std::env::var("NODES") .map(|v| v.parse().expect("NODES must be a number")) - .unwrap_or(3); + .unwrap_or(20); let n_entries_init = 1; // let n_entries_live = 2; // let n_entries_phase2 = 5;