Skip to content

Commit

Permalink
refactor(iroh-net): remove unneeded async interactions with the magic…
Browse files Browse the repository at this point in the history
…sock actor (#2058)

## Description

While working on #2056 I spotted that we use the actor inbox with return
channels for information that is readily available already on the shared
inner magicsock. This removes the unneeded complexity and thus
simplifies `get_mapping_addr`, `endpoint_info` and `endpoint_infos` to
return the information non-async and infallible. Yay!

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
  • Loading branch information
Frando authored Mar 6, 2024
1 parent bc1af2e commit a42c1b2
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 86 deletions.
22 changes: 9 additions & 13 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,20 +322,17 @@ impl MagicEndpoint {
/// Connections are currently only pruned on user action (when we explicitly add a new address
/// to the internal addressbook through [`MagicEndpoint::add_node_addr`]), so these connections
/// are not necessarily active connections.
pub async fn connection_infos(&self) -> anyhow::Result<Vec<ConnectionInfo>> {
self.msock.tracked_endpoints().await
pub fn connection_infos(&self) -> Vec<ConnectionInfo> {
self.msock.tracked_endpoints()
}

/// Get connection information about a specific node.
///
/// Includes the node's [`PublicKey`], potential DERP Url, its addresses with any known
/// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are
/// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection.
pub async fn connection_info(
&self,
node_id: PublicKey,
) -> anyhow::Result<Option<ConnectionInfo>> {
self.msock.tracked_endpoint(node_id).await
pub fn connection_info(&self, node_id: PublicKey) -> Option<ConnectionInfo> {
self.msock.tracked_endpoint(node_id)
}

async fn resolve(&self, node_id: &PublicKey) -> Result<AddrInfo> {
Expand All @@ -353,7 +350,7 @@ impl MagicEndpoint {
node_id: &PublicKey,
alpn: &[u8],
) -> Result<quinn::Connection> {
let addr = match self.msock.get_mapping_addr(node_id).await {
let addr = match self.msock.get_mapping_addr(node_id) {
Some(addr) => addr,
None => {
let info = self.resolve(node_id).await?;
Expand All @@ -362,7 +359,7 @@ impl MagicEndpoint {
info,
};
self.add_node_addr(peer_addr)?;
self.msock.get_mapping_addr(node_id).await.ok_or_else(|| {
self.msock.get_mapping_addr(node_id).ok_or_else(|| {
anyhow!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}")
})?
}
Expand All @@ -386,7 +383,7 @@ impl MagicEndpoint {
self.add_node_addr(node_addr.clone())?;

let NodeAddr { node_id, info } = node_addr;
let addr = self.msock.get_mapping_addr(&node_id).await;
let addr = self.msock.get_mapping_addr(&node_id);
let Some(addr) = addr else {
return Err(match (info.direct_addresses.is_empty(), info.derp_url) {
(true, None) => {
Expand Down Expand Up @@ -685,7 +682,7 @@ mod tests {
// first time, create a magic endpoint without peers but a peers file and add addressing
// information for a peer
let endpoint = new_endpoint(secret_key.clone(), path.clone()).await;
assert!(endpoint.connection_infos().await.unwrap().is_empty());
assert!(endpoint.connection_infos().is_empty());
endpoint.add_node_addr(node_addr).unwrap();

info!("closing endpoint");
Expand All @@ -695,8 +692,7 @@ mod tests {
info!("restarting endpoint");
// now restart it and check the addressing info of the peer
let endpoint = new_endpoint(secret_key, path).await;
let ConnectionInfo { mut addrs, .. } =
endpoint.connection_info(peer_id).await.unwrap().unwrap();
let ConnectionInfo { mut addrs, .. } = endpoint.connection_info(peer_id).unwrap();
let conn_addr = addrs.pop().unwrap().addr;
assert_eq!(conn_addr, direct_addr);
}
Expand Down
60 changes: 11 additions & 49 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,25 +1251,13 @@ impl MagicSock {
}

/// Retrieve connection information about nodes in the network.
pub async fn tracked_endpoints(&self) -> Result<Vec<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
.actor_sender
.send(ActorMessage::TrackedEndpoints(s))
.await?;
let res = r.await?;
Ok(res)
pub fn tracked_endpoints(&self) -> Vec<EndpointInfo> {
self.inner.node_map.endpoint_infos(Instant::now())
}

/// Retrieve connection information about a node in the network.
pub async fn tracked_endpoint(&self, node_key: PublicKey) -> Result<Option<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
.actor_sender
.send(ActorMessage::TrackedEndpoint(node_key, s))
.await?;
let res = r.await?;
Ok(res)
pub fn tracked_endpoint(&self, node_key: PublicKey) -> Option<EndpointInfo> {
self.inner.node_map.endpoint_info(&node_key)
}

/// Returns the local endpoints as a stream.
Expand Down Expand Up @@ -1319,18 +1307,11 @@ impl MagicSock {
///
/// Note this is a user-facing API and does not wrap the [`SocketAddr`] in a
/// `QuicMappedAddr` as we do internally.
pub async fn get_mapping_addr(&self, node_key: &PublicKey) -> Option<SocketAddr> {
let (s, r) = tokio::sync::oneshot::channel();
if self
.inner
.actor_sender
.send(ActorMessage::GetMappingAddr(*node_key, s))
.await
.is_ok()
{
return r.await.ok().flatten().map(|m| m.0);
}
None
pub fn get_mapping_addr(&self, node_key: &PublicKey) -> Option<SocketAddr> {
self.inner
.node_map
.get_quic_mapped_addr_for_node_key(node_key)
.map(|a| a.0)
}

/// Sets the connection's preferred local port.
Expand Down Expand Up @@ -1595,9 +1576,6 @@ impl AsyncUdpSocket for MagicSock {

#[derive(Debug)]
enum ActorMessage {
TrackedEndpoints(sync::oneshot::Sender<Vec<EndpointInfo>>),
TrackedEndpoint(PublicKey, sync::oneshot::Sender<Option<EndpointInfo>>),
GetMappingAddr(PublicKey, sync::oneshot::Sender<Option<QuicMappedAddr>>),
SetPreferredPort(u16, sync::oneshot::Sender<()>),
RebindAll(sync::oneshot::Sender<()>),
Shutdown,
Expand Down Expand Up @@ -1756,20 +1734,6 @@ impl Actor {
/// Returns `true` if it was a shutdown.
async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool {
match msg {
ActorMessage::TrackedEndpoints(s) => {
let eps: Vec<_> = self.inner.node_map.endpoint_infos(Instant::now());
let _ = s.send(eps);
}
ActorMessage::TrackedEndpoint(node_key, s) => {
let _ = s.send(self.inner.node_map.endpoint_info(&node_key));
}
ActorMessage::GetMappingAddr(node_key, s) => {
let res = self
.inner
.node_map
.get_quic_mapped_addr_for_node_key(&node_key);
let _ = s.send(res);
}
ActorMessage::Shutdown => {
debug!("shutting down");

Expand Down Expand Up @@ -2739,12 +2703,10 @@ pub(crate) mod tests {
})
}

async fn tracked_endpoints(&self) -> Vec<PublicKey> {
fn tracked_endpoints(&self) -> Vec<PublicKey> {
self.endpoint
.magic_sock()
.tracked_endpoints()
.await
.unwrap_or_default()
.into_iter()
.map(|ep| ep.public_key)
.collect()
Expand Down Expand Up @@ -2830,7 +2792,7 @@ pub(crate) mod tests {
loop {
let mut ready = Vec::with_capacity(stacks.len());
for ms in stacks.iter() {
let endpoints = ms.tracked_endpoints().await;
let endpoints = ms.tracked_endpoints();
let my_node_id = ms.endpoint.node_id();
let all_nodes_meshed = all_node_ids
.iter()
Expand Down
17 changes: 6 additions & 11 deletions iroh/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Gui {
let counter_task = AbortingJoinHandle(tokio::spawn(async move {
loop {
Self::update_counters(&counters2);
Self::update_connection_info(&conn_info, &endpoint, &node_id).await;
Self::update_connection_info(&conn_info, &endpoint, &node_id);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}));
Expand All @@ -330,23 +330,19 @@ impl Gui {
}
}

async fn update_connection_info(
target: &ProgressBar,
endpoint: &MagicEndpoint,
node_id: &NodeId,
) {
fn update_connection_info(target: &ProgressBar, endpoint: &MagicEndpoint, node_id: &NodeId) {
let format_latency = |x: Option<Duration>| {
x.map(|x| format!("{:.6}s", x.as_secs_f64()))
.unwrap_or_else(|| "unknown".to_string())
};
let msg = match endpoint.connection_info(*node_id).await {
Ok(Some(EndpointInfo {
let msg = match endpoint.connection_info(*node_id) {
Some(EndpointInfo {
derp_url,
conn_type,
latency,
addrs,
..
})) => {
}) => {
let derp_url = derp_url
.map(|x| x.to_string())
.unwrap_or_else(|| "unknown".to_string());
Expand All @@ -363,8 +359,7 @@ impl Gui {
derp_url, latency, conn_type, addrs
)
}
Ok(None) => "connection info unavailable".to_string(),
Err(cause) => format!("error getting connection info: {}", cause),
None => "connection info unavailable".to_string(),
};
target.set_message(msg);
}
Expand Down
22 changes: 9 additions & 13 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,30 +1441,26 @@ impl<D: BaoStore> RpcHandler<D> {
) -> impl Stream<Item = RpcResult<NodeConnectionsResponse>> + Send + 'static {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = flume::bounded(32);
let mut conn_infos = self.inner.endpoint.connection_infos();
conn_infos.sort_by_key(|n| n.public_key.to_string());
self.rt().spawn_pinned(|| async move {
match self.inner.endpoint.connection_infos().await {
Ok(mut conn_infos) => {
conn_infos.sort_by_key(|n| n.public_key.to_string());
for conn_info in conn_infos {
tx.send_async(Ok(NodeConnectionsResponse { conn_info }))
.await
.ok();
}
}
Err(e) => {
tx.send_async(Err(e.into())).await.ok();
}
for conn_info in conn_infos {
tx.send_async(Ok(NodeConnectionsResponse { conn_info }))
.await
.ok();
}
});
rx.into_stream()
}

// This method is called as an RPC method, which have to be async
#[allow(clippy::unused_async)]
async fn node_connection_info(
self,
req: NodeConnectionInfoRequest,
) -> RpcResult<NodeConnectionInfoResponse> {
let NodeConnectionInfoRequest { node_id } = req;
let conn_info = self.inner.endpoint.connection_info(node_id).await?;
let conn_info = self.inner.endpoint.connection_info(node_id);
Ok(NodeConnectionInfoResponse { conn_info })
}

Expand Down

0 comments on commit a42c1b2

Please sign in to comment.