diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 3fe949233d..faa7021d8a 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -322,8 +322,8 @@ impl MagicEndpoint { /// Connections are currently only pruned on user action (when we explicitly add a new address /// to the internal addressbook through [`MagicEndpoint::add_node_addr`]), so these connections /// are not necessarily active connections. - pub async fn connection_infos(&self) -> anyhow::Result> { - self.msock.tracked_endpoints().await + pub fn connection_infos(&self) -> Vec { + self.msock.tracked_endpoints() } /// Get connection information about a specific node. @@ -331,11 +331,8 @@ impl MagicEndpoint { /// Includes the node's [`PublicKey`], potential DERP Url, its addresses with any known /// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are /// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection. - pub async fn connection_info( - &self, - node_id: PublicKey, - ) -> anyhow::Result> { - self.msock.tracked_endpoint(node_id).await + pub fn connection_info(&self, node_id: PublicKey) -> Option { + self.msock.tracked_endpoint(node_id) } async fn resolve(&self, node_id: &PublicKey) -> Result { @@ -353,7 +350,7 @@ impl MagicEndpoint { node_id: &PublicKey, alpn: &[u8], ) -> Result { - let addr = match self.msock.get_mapping_addr(node_id).await { + let addr = match self.msock.get_mapping_addr(node_id) { Some(addr) => addr, None => { let info = self.resolve(node_id).await?; @@ -362,7 +359,7 @@ impl MagicEndpoint { info, }; self.add_node_addr(peer_addr)?; - self.msock.get_mapping_addr(node_id).await.ok_or_else(|| { + self.msock.get_mapping_addr(node_id).ok_or_else(|| { anyhow!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}") })? } @@ -386,7 +383,7 @@ impl MagicEndpoint { self.add_node_addr(node_addr.clone())?; let NodeAddr { node_id, info } = node_addr; - let addr = self.msock.get_mapping_addr(&node_id).await; + let addr = self.msock.get_mapping_addr(&node_id); let Some(addr) = addr else { return Err(match (info.direct_addresses.is_empty(), info.derp_url) { (true, None) => { @@ -685,7 +682,7 @@ mod tests { // first time, create a magic endpoint without peers but a peers file and add addressing // information for a peer let endpoint = new_endpoint(secret_key.clone(), path.clone()).await; - assert!(endpoint.connection_infos().await.unwrap().is_empty()); + assert!(endpoint.connection_infos().is_empty()); endpoint.add_node_addr(node_addr).unwrap(); info!("closing endpoint"); @@ -695,8 +692,7 @@ mod tests { info!("restarting endpoint"); // now restart it and check the addressing info of the peer let endpoint = new_endpoint(secret_key, path).await; - let ConnectionInfo { mut addrs, .. } = - endpoint.connection_info(peer_id).await.unwrap().unwrap(); + let ConnectionInfo { mut addrs, .. } = endpoint.connection_info(peer_id).unwrap(); let conn_addr = addrs.pop().unwrap().addr; assert_eq!(conn_addr, direct_addr); } diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 4c096e5b80..3ccc3aaa5b 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -1251,25 +1251,13 @@ impl MagicSock { } /// Retrieve connection information about nodes in the network. - pub async fn tracked_endpoints(&self) -> Result> { - let (s, r) = sync::oneshot::channel(); - self.inner - .actor_sender - .send(ActorMessage::TrackedEndpoints(s)) - .await?; - let res = r.await?; - Ok(res) + pub fn tracked_endpoints(&self) -> Vec { + self.inner.node_map.endpoint_infos(Instant::now()) } /// Retrieve connection information about a node in the network. - pub async fn tracked_endpoint(&self, node_key: PublicKey) -> Result> { - let (s, r) = sync::oneshot::channel(); - self.inner - .actor_sender - .send(ActorMessage::TrackedEndpoint(node_key, s)) - .await?; - let res = r.await?; - Ok(res) + pub fn tracked_endpoint(&self, node_key: PublicKey) -> Option { + self.inner.node_map.endpoint_info(&node_key) } /// Returns the local endpoints as a stream. @@ -1319,18 +1307,11 @@ impl MagicSock { /// /// Note this is a user-facing API and does not wrap the [`SocketAddr`] in a /// `QuicMappedAddr` as we do internally. - pub async fn get_mapping_addr(&self, node_key: &PublicKey) -> Option { - let (s, r) = tokio::sync::oneshot::channel(); - if self - .inner - .actor_sender - .send(ActorMessage::GetMappingAddr(*node_key, s)) - .await - .is_ok() - { - return r.await.ok().flatten().map(|m| m.0); - } - None + pub fn get_mapping_addr(&self, node_key: &PublicKey) -> Option { + self.inner + .node_map + .get_quic_mapped_addr_for_node_key(node_key) + .map(|a| a.0) } /// Sets the connection's preferred local port. @@ -1595,9 +1576,6 @@ impl AsyncUdpSocket for MagicSock { #[derive(Debug)] enum ActorMessage { - TrackedEndpoints(sync::oneshot::Sender>), - TrackedEndpoint(PublicKey, sync::oneshot::Sender>), - GetMappingAddr(PublicKey, sync::oneshot::Sender>), SetPreferredPort(u16, sync::oneshot::Sender<()>), RebindAll(sync::oneshot::Sender<()>), Shutdown, @@ -1756,20 +1734,6 @@ impl Actor { /// Returns `true` if it was a shutdown. async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool { match msg { - ActorMessage::TrackedEndpoints(s) => { - let eps: Vec<_> = self.inner.node_map.endpoint_infos(Instant::now()); - let _ = s.send(eps); - } - ActorMessage::TrackedEndpoint(node_key, s) => { - let _ = s.send(self.inner.node_map.endpoint_info(&node_key)); - } - ActorMessage::GetMappingAddr(node_key, s) => { - let res = self - .inner - .node_map - .get_quic_mapped_addr_for_node_key(&node_key); - let _ = s.send(res); - } ActorMessage::Shutdown => { debug!("shutting down"); @@ -2739,12 +2703,10 @@ pub(crate) mod tests { }) } - async fn tracked_endpoints(&self) -> Vec { + fn tracked_endpoints(&self) -> Vec { self.endpoint .magic_sock() .tracked_endpoints() - .await - .unwrap_or_default() .into_iter() .map(|ep| ep.public_key) .collect() @@ -2830,7 +2792,7 @@ pub(crate) mod tests { loop { let mut ready = Vec::with_capacity(stacks.len()); for ms in stacks.iter() { - let endpoints = ms.tracked_endpoints().await; + let endpoints = ms.tracked_endpoints(); let my_node_id = ms.endpoint.node_id(); let all_nodes_meshed = all_node_ids .iter() diff --git a/iroh/src/commands/doctor.rs b/iroh/src/commands/doctor.rs index f244acfec8..9ec894acec 100644 --- a/iroh/src/commands/doctor.rs +++ b/iroh/src/commands/doctor.rs @@ -315,7 +315,7 @@ impl Gui { let counter_task = AbortingJoinHandle(tokio::spawn(async move { loop { Self::update_counters(&counters2); - Self::update_connection_info(&conn_info, &endpoint, &node_id).await; + Self::update_connection_info(&conn_info, &endpoint, &node_id); tokio::time::sleep(Duration::from_millis(100)).await; } })); @@ -330,23 +330,19 @@ impl Gui { } } - async fn update_connection_info( - target: &ProgressBar, - endpoint: &MagicEndpoint, - node_id: &NodeId, - ) { + fn update_connection_info(target: &ProgressBar, endpoint: &MagicEndpoint, node_id: &NodeId) { let format_latency = |x: Option| { x.map(|x| format!("{:.6}s", x.as_secs_f64())) .unwrap_or_else(|| "unknown".to_string()) }; - let msg = match endpoint.connection_info(*node_id).await { - Ok(Some(EndpointInfo { + let msg = match endpoint.connection_info(*node_id) { + Some(EndpointInfo { derp_url, conn_type, latency, addrs, .. - })) => { + }) => { let derp_url = derp_url .map(|x| x.to_string()) .unwrap_or_else(|| "unknown".to_string()); @@ -363,8 +359,7 @@ impl Gui { derp_url, latency, conn_type, addrs ) } - Ok(None) => "connection info unavailable".to_string(), - Err(cause) => format!("error getting connection info: {}", cause), + None => "connection info unavailable".to_string(), }; target.set_message(msg); } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index fd599e7355..faf69e66fe 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1441,30 +1441,26 @@ impl RpcHandler { ) -> impl Stream> + Send + 'static { // provide a little buffer so that we don't slow down the sender let (tx, rx) = flume::bounded(32); + let mut conn_infos = self.inner.endpoint.connection_infos(); + conn_infos.sort_by_key(|n| n.public_key.to_string()); self.rt().spawn_pinned(|| async move { - match self.inner.endpoint.connection_infos().await { - Ok(mut conn_infos) => { - conn_infos.sort_by_key(|n| n.public_key.to_string()); - for conn_info in conn_infos { - tx.send_async(Ok(NodeConnectionsResponse { conn_info })) - .await - .ok(); - } - } - Err(e) => { - tx.send_async(Err(e.into())).await.ok(); - } + for conn_info in conn_infos { + tx.send_async(Ok(NodeConnectionsResponse { conn_info })) + .await + .ok(); } }); rx.into_stream() } + // This method is called as an RPC method, which have to be async + #[allow(clippy::unused_async)] async fn node_connection_info( self, req: NodeConnectionInfoRequest, ) -> RpcResult { let NodeConnectionInfoRequest { node_id } = req; - let conn_info = self.inner.endpoint.connection_info(node_id).await?; + let conn_info = self.inner.endpoint.connection_info(node_id); Ok(NodeConnectionInfoResponse { conn_info }) }