Skip to content

Commit

Permalink
Merge 4361f98 into 75d8019
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox authored Oct 7, 2024
2 parents 75d8019 + 4361f98 commit 8bc1dd8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
2 changes: 2 additions & 0 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
/// until the stream is actually polled. To avoid missing discovered nodes,
/// poll the stream as soon as possible.
///
/// If you do not regularly poll the stream, you may miss discovered nodes.
///
/// Any discovery systems that only discover when explicitly resolving a
/// specific [`NodeId`] do not need to implement this method. Any nodes or
/// addresses that are discovered by calling `resolve` should NOT be added
Expand Down
22 changes: 17 additions & 5 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use watchable::Watchable;

use iroh_base::key::PublicKey;
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::{sync::mpsc, task::JoinSet};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinSet,
};
use tokio_util::task::AbortOnDropHandle;

use crate::{
Expand Down Expand Up @@ -103,12 +106,21 @@ impl Subscribers {
/// Sends the `node_id` and `item` to each subscriber.
///
/// Cleans up any subscribers that have been dropped.
async fn send(&mut self, item: DiscoveryItem) {
fn send(&mut self, item: DiscoveryItem) {
let mut clean_up = vec![];
for (i, subscriber) in self.0.iter().enumerate() {
// assume subscriber was dropped
if (subscriber.send(item.clone()).await).is_err() {
clean_up.push(i);
if let Err(err) = subscriber.try_send(item.clone()) {
match err {
TrySendError::Full(_) => {
warn!(
?item,
idx = i,
"local swarm discovery subscriber is blocked, dropping item"
)
}
TrySendError::Closed(_) => clean_up.push(i),
}
}
}
for i in clean_up.into_iter().rev() {
Expand Down Expand Up @@ -236,7 +248,7 @@ impl LocalSwarmDiscovery {
// in other words, nodes sent to the `subscribers` should only be the ones that
// have been "passively" discovered
if !resolved {
subscribers.send(item).await;
subscribers.send(item);
}
}
Message::Resolve(node_id, sender) => {
Expand Down

0 comments on commit 8bc1dd8

Please sign in to comment.