From 741170205fcfaa80ddb2c5e869da5fa6224ec72e Mon Sep 17 00:00:00 2001 From: Kasey Date: Mon, 7 Oct 2024 15:19:31 +0100 Subject: [PATCH 1/2] use `try_send` rather than `send` so we dont block the local swarm discovery service --- iroh-net/src/discovery.rs | 2 ++ iroh-net/src/discovery/local_swarm_discovery.rs | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index e3848a7db5..0cbfa285d0 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync { /// until the stream is actually polled. To avoid missing discovered nodes, /// poll the stream as soon as possible. /// + /// If you do not regularly poll the stream, you may miss discovered nodes. + /// /// Any discovery systems that only discover when explicitly resolving a /// specific [`NodeId`] do not need to implement this method. Any nodes or /// addresses that are discovered by calling `resolve` should NOT be added diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index 58ff2ccb33..c5870b55d3 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -47,7 +47,10 @@ use watchable::Watchable; use iroh_base::key::PublicKey; use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; -use tokio::{sync::mpsc, task::JoinSet}; +use tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinSet, +}; use tokio_util::task::AbortOnDropHandle; use crate::{ @@ -107,8 +110,13 @@ impl Subscribers { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped - if (subscriber.send(item.clone()).await).is_err() { - clean_up.push(i); + if let Err(err) = subscriber.try_send(item.clone()) { + match err { + TrySendError::Full(_) => { + warn!("local swarm discovery subscriber {i} is blocked, dropping item {item:?}") + } + TrySendError::Closed(_) => clean_up.push(i), + } } } for i in clean_up.into_iter().rev() { From 4361f98857147a163ed6c4b71ae0edd4ed82c127 Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 7 Oct 2024 09:54:46 -0500 Subject: [PATCH 2/2] fix clippy and use structured logging --- iroh-net/src/discovery/local_swarm_discovery.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index c5870b55d3..5b6b86dbc5 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -106,14 +106,18 @@ impl Subscribers { /// Sends the `node_id` and `item` to each subscriber. /// /// Cleans up any subscribers that have been dropped. - async fn send(&mut self, item: DiscoveryItem) { + fn send(&mut self, item: DiscoveryItem) { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped if let Err(err) = subscriber.try_send(item.clone()) { match err { TrySendError::Full(_) => { - warn!("local swarm discovery subscriber {i} is blocked, dropping item {item:?}") + warn!( + ?item, + idx = i, + "local swarm discovery subscriber is blocked, dropping item" + ) } TrySendError::Closed(_) => clean_up.push(i), } @@ -244,7 +248,7 @@ impl LocalSwarmDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(item).await; + subscribers.send(item); } } Message::Resolve(node_id, sender) => {