Skip to content

Commit

Permalink
fix: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 3, 2023
1 parent f3b6e19 commit e45e12f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
15 changes: 9 additions & 6 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum Op {
/// A peer now has content available for a hash.
ContentReady(Hash),
/// We synced with another peer, here's the news.
DidSync(SyncReport),
SyncReport(SyncReport),
}

/// Report of a successful sync with the new heads.
Expand Down Expand Up @@ -840,8 +840,11 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
namespace,
heads: state.progress.state_vector.clone(),
};
let op = Op::DidSync(report);
debug!(?namespace, "broadcast to neighbors: DidSync(peer={peer:?})");
let op = Op::SyncReport(report);
debug!(
?namespace,
"broadcast to neighbors: sync report from {peer:?})"
);
let msg = postcard::to_stdvec(&op)?;
// TODO: We should debounce and merge these neighbor announcements likely.
if let Err(err) = self
Expand Down Expand Up @@ -912,20 +915,20 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
.peers_have(hash, vec![(msg.delivered_from, PeerRole::Provider).into()])
.await;
}
Op::DidSync(report) => {
Op::SyncReport(report) => {
let peer = msg.delivered_from;
if self
.replica_store
.has_news_for_us(report.namespace, &report.heads)?
{
debug!(?namespace, ?peer, "Recv SyncReport: have news, sync now");
debug!(?namespace, ?peer, "recv sync report: have news, sync now");
self.sync_with_peer(
report.namespace,
peer,
SyncReason::ResyncAfterReport,
);
} else {
debug!(?namespace, ?peer, "Recv SyncReport: no news, no sync");
debug!(?namespace, ?peer, "recv sync report: no news");
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ async fn sync_big() -> Result<()> {

// join nodes together
for (i, doc) in docs.iter().enumerate().skip(1) {
info!("peer {i} {:?}: join {:?}", peer_ids[i], peer0.peer_id);
info!(me = ?peer_ids[i], peer = ?peer0.peer_id, "join");
doc.start_sync(vec![peer0.clone()]).await?;
}

Expand All @@ -418,8 +418,9 @@ async fn sync_big() -> Result<()> {
for (i, events) in events.into_iter().enumerate() {
let doc = docs[i].clone();
let expected = expected.clone();
let me = peer_ids[i];
let fut = async move {
wait_for_events(events, expected_inserts, Duration::from_secs(30), i, |e| {
wait_for_events(events, expected_inserts, Duration::from_secs(30), me, |e| {
matches!(e, LiveEvent::InsertRemote { .. })
})
.await?;
Expand All @@ -431,7 +432,7 @@ async fn sync_big() -> Result<()> {
expected.len()
))
} else {
info!("Node {i}: All done, all good");
info!(?me, "All done, all good");
Ok(())
}
};
Expand Down Expand Up @@ -492,7 +493,7 @@ async fn wait_for_events(
mut events: impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static,
expected_n: usize,
timeout_per_event: Duration,
node_id: usize,
me: PublicKey,
matcher: impl Fn(LiveEvent) -> bool,
) -> anyhow::Result<()> {
let mut i = 0;
Expand All @@ -503,7 +504,7 @@ async fn wait_for_events(
.ok_or_else(|| anyhow!("end of event stream for after {i}"))??;
if matcher(event) {
i += 1;
debug!(node = %node_id, "recv event {i} of {expected_n}");
debug!(?me, "recv event {i} of {expected_n}");
}
}
Ok(())
Expand Down

0 comments on commit e45e12f

Please sign in to comment.