Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 3, 2023
1 parent b2b53d0 commit fac22d7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
7 changes: 5 additions & 2 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + 'static> Replica<S> {
/// received from in a loop. If not receiving, local and remote inserts will hang waiting for
/// the receiver to be received from.
// TODO: Allow to clear a previous subscription?
pub fn subscribe(&self) -> Option<flume::Receiver<(InsertOrigin, SignedEntry)>> {
pub fn subscribe(
&self,
channel_cap: usize,
) -> Option<flume::Receiver<(InsertOrigin, SignedEntry)>> {
let mut on_insert_sender = self.on_insert_sender.write();
match &*on_insert_sender {
Some(_sender) => None,
None => {
let (s, r) = flume::bounded(16); // TODO: should this be configurable?
let (s, r) = flume::bounded(channel_cap);
*on_insert_sender = Some(s);
Some(r)
}
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/sync_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<S: Store> SyncEngine<S> {
/// engine with [`Self::start_sync`], then new entries inserted locally will be sent to peers
/// through iroh-gossip.
///
/// The engine will also register for [`Replica::subscribe`] events to download content for new
/// The engine will also subscribe to replica events to download content for new
/// entries from peers.
pub fn spawn<B: BaoStore>(
rt: Handle,
Expand Down
9 changes: 6 additions & 3 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use tracing::{debug, error, error_span, warn, Instrument};

pub use iroh_sync::ContentStatus;

const CHANNEL_CAP: usize = 8;
/// Capacity of the channel for the [`ToActor`] messages.
const ACTOR_CHANNEL_CAP: usize = 8;
/// Capacity of the subscription channel for replica events.
const REPLICA_SUBSCRIBE_CHANNEL_CAP: usize = 64;

/// An iroh-sync operation
///
Expand Down Expand Up @@ -194,7 +197,7 @@ impl<S: store::Store> LiveSync<S> {
bao_store: B,
downloader: Downloader,
) -> Self {
let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP);
let (to_actor_tx, to_actor_rx) = mpsc::channel(ACTOR_CHANNEL_CAP);
let me = endpoint.peer_id().fmt_short();
let mut actor = Actor::new(
endpoint,
Expand Down Expand Up @@ -627,7 +630,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {

// setup event subscription.
let events = replica
.subscribe()
.subscribe(REPLICA_SUBSCRIBE_CHANNEL_CAP)
.ok_or_else(|| anyhow::anyhow!("trying to subscribe twice to the same replica"))?;
self.replica_events.push(events.into_stream());

Expand Down
5 changes: 3 additions & 2 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use iroh_sync::{
AuthorId, ContentStatus, Entry, NamespaceId,
};

const LIMIT: Duration = Duration::from_secs(15);
/// Time limit for event collection in sync tests.
const LIMIT: Duration = Duration::from_secs(30);

/// Pick up the tokio runtime from the thread local and add a
/// thread per core runtime.
Expand Down Expand Up @@ -339,7 +340,7 @@ async fn sync_big() -> Result<()> {
let rt = test_runtime();
let n_nodes = std::env::var("NODES")
.map(|v| v.parse().expect("NODES must be a number"))
.unwrap_or(3);
.unwrap_or(20);
let n_entries_init = 1;
// let n_entries_live = 2;
// let n_entries_phase2 = 5;
Expand Down

0 comments on commit fac22d7

Please sign in to comment.