diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index a7d0b08d01..88009f8904 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -11,7 +11,7 @@ use futures_util::TryFutureExt; use iroh_metrics::inc; use iroh_net::{ dialer::Dialer, - endpoint::{get_remote_node_id, Connection}, + endpoint::{get_remote_node_id, Connection, DirectAddr}, key::PublicKey, AddrInfo, Endpoint, NodeAddr, NodeId, }; @@ -54,8 +54,6 @@ const SEND_QUEUE_CAP: usize = 64; const TO_ACTOR_CAP: usize = 64; /// Channel capacity for the InEvent message queue (single) const IN_EVENT_CAP: usize = 1024; -/// Channel capacity for endpoint change message queue (single) -const ON_ENDPOINTS_CAP: usize = 64; /// Name used for logging when new node addresses are added from gossip. const SOURCE_NAME: &str = "gossip"; @@ -90,7 +88,6 @@ type ProtoMessage = proto::Message; #[derive(Debug, Clone)] pub struct Gossip { to_actor_tx: mpsc::Sender, - on_direct_addrs_tx: mpsc::Sender>, _actor_handle: Arc>, max_message_size: usize, } @@ -108,7 +105,6 @@ impl Gossip { ); let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP); let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP); - let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP); let me = endpoint.node_id().fmt_short(); let max_message_size = state.max_message_size(); @@ -119,7 +115,6 @@ impl Gossip { to_actor_rx, in_event_rx, in_event_tx, - on_direct_addr_rx: on_endpoints_rx, timers: Timers::new(), command_rx: StreamGroup::new().keyed(), peers: Default::default(), @@ -138,7 +133,6 @@ impl Gossip { ); Self { to_actor_tx, - on_direct_addrs_tx: on_endpoints_tx, _actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)), max_message_size, } @@ -222,23 +216,6 @@ impl Gossip { .try_flatten_stream() } - /// Set info on our direct addresses. - /// - /// This will be sent to peers on Neighbor and Join requests so that they can connect directly - /// to us. - /// - /// This is only best effort, and will drop new events if backed up. - pub fn update_direct_addresses( - &self, - addrs: &[iroh_net::endpoint::DirectAddr], - ) -> anyhow::Result<()> { - let addrs = addrs.to_vec(); - self.on_direct_addrs_tx - .try_send(addrs) - .map_err(|_| anyhow!("endpoints channel dropped"))?; - Ok(()) - } - async fn send(&self, event: ToActor) -> anyhow::Result<()> { self.to_actor_tx .send(event) @@ -274,8 +251,6 @@ struct Actor { in_event_tx: mpsc::Sender, /// Input events to the state (emitted from the connection loops) in_event_rx: mpsc::Receiver, - /// Updates of discovered endpoint addresses - on_direct_addr_rx: mpsc::Receiver>, /// Queued timers timers: Timers, /// Map of topics to their state. @@ -292,6 +267,21 @@ struct Actor { impl Actor { pub async fn run(mut self) -> anyhow::Result<()> { + // Watch for changes in direct addresses to update our peer data. + let mut direct_addresses_stream = self.endpoint.direct_addresses(); + // Watch for changes of our home relay to update our peer data. + let mut home_relay_stream = self.endpoint.watch_home_relay(); + + // With each gossip message we provide addressing information to reach our node. + // We wait until at least one direct address is discovered. + let mut current_addresses = direct_addresses_stream + .next() + .await + .ok_or_else(|| anyhow!("Failed to discover direct addresses"))?; + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()) + .await?; + let mut i = 0; loop { i += 1; @@ -314,24 +304,16 @@ impl Actor { trace!(?i, "tick: command_rx"); self.handle_command(topic, key, command).await?; }, - new_endpoints = self.on_direct_addr_rx.recv() => { + Some(new_addresses) = direct_addresses_stream.next() => { trace!(?i, "tick: new_endpoints"); - match new_endpoints { - Some(endpoints) => { - inc!(Metrics, actor_tick_endpoint); - let addr = NodeAddr::from_parts( - self.endpoint.node_id(), - self.endpoint.home_relay(), - endpoints.into_iter().map(|x| x.addr).collect(), - ); - let peer_data = encode_peer_data(&addr.info)?; - self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; - } - None => { - debug!("endpoint change handle dropped, stopping gossip actor"); - break; - } - } + inc!(Metrics, actor_tick_endpoint); + current_addresses = new_addresses; + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; + } + Some(_relay_url) = home_relay_stream.next() => { + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; } (peer_id, res) = self.dialer.next_conn() => { trace!(?i, "tick: dialer"); @@ -822,6 +804,15 @@ impl Stream for TopicCommandStream { } } +fn our_peer_data(endpoint: &Endpoint, direct_addresses: &[DirectAddr]) -> Result { + let addr = NodeAddr::from_parts( + endpoint.node_id(), + endpoint.home_relay(), + direct_addresses.iter().map(|x| x.addr).collect(), + ); + encode_peer_data(&addr.info) +} + #[cfg(test)] mod test { use std::time::Duration; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 94163cb745..06b85e89bf 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -52,7 +52,6 @@ use iroh_blobs::protocol::Closed; use iroh_blobs::store::Store as BaoStore; use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle}; use iroh_docs::net::DOCS_ALPN; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::endpoint::{DirectAddrsStream, RemoteInfo}; use iroh_net::{AddrInfo, Endpoint, NodeAddr}; use protocol::BlobsProtocol; @@ -288,19 +287,6 @@ impl NodeInner { let external_rpc = RpcServer::new(external_rpc); let internal_rpc = RpcServer::new(internal_rpc); - let gossip = protocols - .get_typed::(GOSSIP_ALPN) - .expect("missing gossip"); - - // TODO(frando): I think this is not needed as we do the same in a task just below. - // forward the initial endpoints to the gossip protocol. - // it may happen the the first endpoint update callback is missed because the gossip cell - // is only initialized once the endpoint is fully bound - if let Some(direct_addresses) = self.endpoint.direct_addresses().next().await { - debug!(me = ?self.endpoint.node_id(), "gossip initial update: {direct_addresses:?}"); - gossip.update_direct_addresses(&direct_addresses).ok(); - } - // Spawn a task for the garbage collection. if let GcPolicy::Interval(gc_period) = gc_policy { let protocols = protocols.clone(); @@ -396,19 +382,6 @@ impl NodeInner { ); } - // Spawn a task that updates the gossip endpoints. - let inner = self.clone(); - join_set.spawn(async move { - let mut stream = inner.endpoint.direct_addresses(); - while let Some(eps) = stream.next().await { - if let Err(err) = gossip.update_direct_addresses(&eps) { - warn!("Failed to update direct addresses for gossip: {err:?}"); - } - } - warn!("failed to retrieve local endpoints"); - Ok(()) - }); - loop { tokio::select! { biased;