Skip to content

Commit

Permalink
fix(iroh-net): Improve direct connectivity establishment speed and re…
Browse files Browse the repository at this point in the history
…liablity (#1984)

## Description

This includes various fixes and improvements in holepunching behaviour,
improving both how fast a direct connection is established and how
reliably it is established.

- Improve sending of call-me-maybe: this sometimes wasn't sent if
  there were no known addresses for an endpoint because no pings
  were sent and there was an active connection on via the derp
  path.  Instead we now send it when needed, regardless of pings
  or active connections.  To avoid sending it all the time when
  no direct connection can be established we only send it once
  every heartbeat cycle, i.e. every 5s.  In the worst case (like
  when UDP pings get lost) this means a call-me-maybe will be sent
  after just under 2 heartbeat intervals, and should be regular
  every heartbeat interval afterwards.  So in the face of packet
  loss this will still establish a connection eventually.

- Not every scenario will result in a direct connection in both
  directions after each endpoint has sent one call-me-maybe
  request, because direct connections are asymmetric and each
  side has to receive their own disco pong before it will use
  the direct connection.  To alleviate this and speed up the
  direct connection on the return path in cases were only one
  side manages to establish the direct path, this now also sends
  a ping message upon receipt of a ping.  This opens the direct
  path much quicker in both directions.

- When queuing a call-me-maybe and the last netcheck report is
  deemed too old, the call-me-maybe is not immediately sent.
  Instead a new netcheck is kicked off and the queued
  call-me-maybes should be sent once the report is in, so that
  they are sent with accurate local addresses.  However they
  were never sent after the report finished.  This is now fixed
  and they are sent after the report is ready.

- When a ping timeout occurs, we were clearing the best_addr, i.e.
  the selected direct path, if the ping was for the same path as
  currently used.  However that could have been for a timeout of
  an earlier ping that was still stopped by a firewall or NAT.  It
  happens that a later ping is sent and receives a pong, before the
  timeout of that earlier ping.  In that case we should not clear
  the direct path in use.

- We used to prune addresses even if they were only just received from a
  call-me-maybe.  This did not allow us to ping all the addresses from a
  call-me-maybe breaking hole punching.

  We take two measures:

  - Do not prune messages that were recently "alive", which means they
    received a ping, pong or call-me-maybe recently.

  - We increase the number of allowed inactive addresses.  They used to
    be 100.  If someone switches between several networks regularly, we
    would like to still know those previous networks as they might start
    working again.


- When looking for a destination address for an endpoint, the
  addr_for_send() function uses un-confirmed UDP address if available.
  However if we were given an IPv6 address (probably via a ticket,
  effectively via add_node_addr()) we should only select the IPv6
  address if netcheck repored we have working IPv6 ourself.

- The netcheck report was wrongly overwriting whether an IP
  family was usable on dual stack hosts.  Resulting in the magic
  socket being confused about which connectivity it had.

This also begins to clean up some terminology: "endpoint" is used for
many different things.  Here the term "path" is consistently used
when referring to a specific destination address of a node, whenter
over the derper or a direct UDP socket.  This naming cleanup however
is not complete as that would obscure the fixes too much.  More to
be done later.

Closes #1955 

## Notes & open questions

This also changes the first address of our QuicMappedAddresses to
start at 1.  Not that 0 is reserved, but it is rather odd for people
who are used to IPv4 to see 0 being a real node.  And it's not like
you're short of addresses in IPv6.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: dignifiedquire <me@dignifiedquire.com>
  • Loading branch information
flub and dignifiedquire authored Feb 15, 2024
1 parent 983edcc commit b173520
Show file tree
Hide file tree
Showing 8 changed files with 541 additions and 248 deletions.
3 changes: 3 additions & 0 deletions iroh-net/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::derp::DerpUrl;

use super::portmapper;

// TODO: This re-uses "Endpoint" again, a term that already means "a quic endpoint" and "a
// magicsock endpoint". this time it means "an IP address on which our local magicsock
// endpoint is listening". Name this better.
/// An endpoint IPPort and an associated type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Endpoint {
Expand Down
61 changes: 44 additions & 17 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use tokio::{
time,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument};
use tracing::{
debug, error, error_span, info, info_span, instrument, trace, trace_span, warn, Instrument,
};
use watchable::Watchable;

use crate::{
Expand Down Expand Up @@ -88,6 +90,9 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How often to save node data.
const SAVE_NODES_INTERVAL: Duration = Duration::from_secs(30);

/// Maximum duration to wait for a netcheck report.
const NETCHECK_REPORT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum CurrentPortFate {
Keep,
Expand Down Expand Up @@ -339,7 +344,10 @@ impl Inner {

let dest = QuicMappedAddr(dest);

match self.node_map.get_send_addrs_for_quic_mapped_addr(&dest) {
match self
.node_map
.get_send_addrs_for_quic_mapped_addr(&dest, self.ipv6_reported.load(Ordering::Relaxed))
{
Some((public_key, udp_addr, derp_url, mut msgs)) => {
let mut pings_sent = false;
// If we have pings to send, we *have* to send them out first.
Expand Down Expand Up @@ -645,7 +653,9 @@ impl Inner {
inc!(MagicsockMetrics, recv_disco_udp);
}

trace!(message = ?dm, "receive disco message");
let span = trace_span!("handle_disco", ?dm);
let _guard = span.enter();
trace!("receive disco message");
match dm {
disco::Message::Ping(ping) => {
inc!(MagicsockMetrics, recv_disco_ping);
Expand All @@ -658,8 +668,7 @@ impl Inner {
disco::Message::CallMeMaybe(cm) => {
inc!(MagicsockMetrics, recv_disco_call_me_maybe);
if !matches!(src, DiscoMessageSource::Derp { .. }) {
// CallMeMaybe messages should only come via DERP.
debug!("[unexpected] call-me-maybe packets should only come via DERP");
warn!("call-me-maybe packets should only come via DERP");
return;
};
let ping_actions = self.node_map.handle_call_me_maybe(sender, cm);
Expand All @@ -675,15 +684,16 @@ impl Inner {
}
}
}
trace!("disco message handled");
}

/// Handle a ping message.
fn handle_ping(&self, dm: disco::Ping, sender: &PublicKey, src: DiscoMessageSource) {
// Insert the ping into the node map, and return whether a ping with this tx_id was already
// received.
let addr: SendAddr = src.clone().into();
let role = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id);
match role {
let handled = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id);
match handled.role {
PingRole::Duplicate => {
debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint already confirmed, skip");
return;
Expand All @@ -698,7 +708,8 @@ impl Inner {
}

// Send a pong.
debug!(tx = %hex::encode(dm.tx_id), "send pong");
debug!(tx = %hex::encode(dm.tx_id), %addr, dstkey = %sender.fmt_short(),
"sending pong");
let pong = disco::Message::Pong(disco::Pong {
tx_id: dm.tx_id,
src: addr.clone(),
Expand All @@ -707,6 +718,15 @@ impl Inner {
if !self.send_disco_message_queued(addr.clone(), *sender, pong) {
warn!(%addr, "failed to queue pong");
}

if let Some(ping) = handled.needs_ping_back {
debug!(
%addr,
dstkey = %sender.fmt_short(),
"sending direct ping back",
);
self.send_ping_queued(ping);
}
}

fn encode_disco_message(&self, dst_key: PublicKey, msg: &disco::Message) -> Bytes {
Expand All @@ -726,14 +746,13 @@ impl Inner {
tx_id,
node_key: self.public_key(),
});
trace!(%dst, %tx_id, ?purpose, "send ping");
let sent = match dst {
SendAddr::Udp(addr) => self.udp_disco_sender.try_send((addr, dst_key, msg)).is_ok(),
SendAddr::Derp(ref url) => self.send_disco_message_derp(url, dst_key, msg),
};
if sent {
let msg_sender = self.actor_sender.clone();
debug!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
trace!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
self.node_map
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
} else {
Expand Down Expand Up @@ -953,7 +972,10 @@ impl Inner {
let msg = endpoints.to_call_me_maybe_message();
let msg = disco::Message::CallMeMaybe(msg);
if !self.send_disco_message_derp(url, dst_key, msg) {
warn!(node = %dst_key.fmt_short(), "derp channel full, dropping call-me-maybe");
warn!(dstkey = %dst_key.fmt_short(), derpurl = ?url,
"derp channel full, dropping call-me-maybe");
} else {
debug!(dstkey = %dst_key.fmt_short(), derpurl = ?url, "call-me-maybe sent");
}
} else {
self.pending_call_me_maybes
Expand Down Expand Up @@ -1805,7 +1827,6 @@ impl Actor {
Ok(part) => {
if self.handle_derp_disco_message(&part, url, dm.src) {
// Message was internal, do not bubble up.
debug!(node = %dm.src.fmt_short(), "handled disco message from derp");
continue;
}

Expand Down Expand Up @@ -2002,8 +2023,11 @@ impl Actor {
};
discovery.publish(&info);
}
self.inner.send_queued_call_me_maybes();
}

// Regardless of whether our local endpoints changed, we now want to send any queued
// call-me-maybe messages.
self.inner.send_queued_call_me_maybes();
}

/// Called when an endpoints update is done, no matter if it was successful or not.
Expand Down Expand Up @@ -2069,7 +2093,7 @@ impl Actor {
Ok(rx) => {
let msg_sender = self.msg_sender.clone();
tokio::task::spawn(async move {
let report = time::timeout(Duration::from_secs(10), rx).await;
let report = time::timeout(NETCHECK_REPORT_TIMEOUT, rx).await;
let report: anyhow::Result<_> = match report {
Ok(Ok(Ok(report))) => Ok(Some(report)),
Ok(Ok(Err(err))) => Err(err),
Expand All @@ -2080,6 +2104,8 @@ impl Actor {
.send(ActorMessage::NetcheckReport(report, why))
.await
.ok();
// The receiver of the NetcheckReport message will call
// .finalize_endpoints_update().
});
}
Err(err) => {
Expand All @@ -2095,9 +2121,10 @@ impl Actor {
.ipv6_reported
.store(report.ipv6, Ordering::Relaxed);
let r = &report;
debug!(
trace!(
"setting no_v4_send {} -> {}",
self.no_v4_send, !r.ipv4_can_send
self.no_v4_send,
!r.ipv4_can_send
);
self.no_v4_send = !r.ipv4_can_send;

Expand Down Expand Up @@ -2484,7 +2511,7 @@ impl Iterator for PacketSplitIter {
pub(crate) struct QuicMappedAddr(SocketAddr);

/// Counter to always generate unique addresses for [`QuicMappedAddr`].
static ADDR_COUNTER: AtomicU64 = AtomicU64::new(0);
static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1);

impl QuicMappedAddr {
/// The Prefix/L of our Unique Local Addresses.
Expand Down
56 changes: 39 additions & 17 deletions iroh-net/src/magicsock/peer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use stun_rs::TransactionId;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info, instrument, trace, warn};

use self::endpoint::{Endpoint, Options};
use self::endpoint::{Endpoint, Options, PingHandled};
use super::{
metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr,
};
Expand Down Expand Up @@ -141,7 +141,12 @@ impl NodeMap {

/// Insert a received ping into the node map, and return whether a ping with this tx_id was already
/// received.
pub fn handle_ping(&self, sender: PublicKey, src: SendAddr, tx_id: TransactionId) -> PingRole {
pub fn handle_ping(
&self,
sender: PublicKey,
src: SendAddr,
tx_id: TransactionId,
) -> PingHandled {
self.inner.lock().handle_ping(sender, src, tx_id)
}

Expand All @@ -158,6 +163,7 @@ impl NodeMap {
pub fn get_send_addrs_for_quic_mapped_addr(
&self,
addr: &QuicMappedAddr,
have_ipv6: bool,
) -> Option<(
PublicKey,
Option<SocketAddr>,
Expand All @@ -167,7 +173,7 @@ impl NodeMap {
let mut inner = self.inner.lock();
let ep = inner.get_mut(EndpointId::QuicMappedAddr(addr))?;
let public_key = *ep.public_key();
let (udp_addr, derp_url, msgs) = ep.get_send_addrs();
let (udp_addr, derp_url, msgs) = ep.get_send_addrs(have_ipv6);
Some((public_key, udp_addr, derp_url, msgs))
}

Expand Down Expand Up @@ -346,9 +352,10 @@ impl NodeMapInner {
Some((*endpoint.public_key(), *endpoint.quic_mapped_addr()))
}

#[instrument(skip_all, fields(src = %src.fmt_short()))]
fn receive_derp(&mut self, derp_url: &DerpUrl, src: &PublicKey) -> QuicMappedAddr {
let endpoint = self.get_or_insert_with(EndpointId::NodeKey(src), || {
info!(node=%src.fmt_short(), "receive_derp: packets from unknown node, insert into node map");
trace!("packets from unknown node, insert into node map");
Options {
public_key: *src,
derp_url: Some(derp_url.clone()),
Expand Down Expand Up @@ -384,7 +391,7 @@ impl NodeMapInner {
if let Some((src, key)) = insert {
self.set_node_key_for_ip_port(src, &key);
}
debug!(?insert, "received pong")
trace!(?insert, "received pong")
} else {
warn!("received pong: node unknown, ignore")
}
Expand Down Expand Up @@ -413,7 +420,12 @@ impl NodeMapInner {
}
}

fn handle_ping(&mut self, sender: PublicKey, src: SendAddr, tx_id: TransactionId) -> PingRole {
fn handle_ping(
&mut self,
sender: PublicKey,
src: SendAddr,
tx_id: TransactionId,
) -> PingHandled {
let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&sender), || {
debug!("received ping: node unknown, add to node map");
Options {
Expand All @@ -423,18 +435,22 @@ impl NodeMapInner {
}
});

let role = endpoint.handle_ping(src.clone(), tx_id);
let handled = endpoint.handle_ping(src.clone(), tx_id);
if let SendAddr::Udp(ref addr) = src {
if matches!(role, PingRole::NewEndpoint) {
if matches!(handled.role, PingRole::NewEndpoint) {
self.set_node_key_for_ip_port(*addr, &sender);
}
}
role
handled
}

/// Inserts a new endpoint into the [`NodeMap`].
fn insert_endpoint(&mut self, options: Options) -> &mut Endpoint {
info!(node = %options.public_key.fmt_short(), derp_url = ?options.derp_url, "inserting new node endpoint in NodeMap");
info!(
node = %options.public_key.fmt_short(),
derp_url = ?options.derp_url,
"inserting new node endpoint in NodeMap",
);
let id = self.next_id;
self.next_id = self.next_id.wrapping_add(1);
let ep = Endpoint::new(id, options);
Expand Down Expand Up @@ -520,7 +536,7 @@ impl NodeMapInner {
///
/// NOTE: storing an [`IpPort`] is safer than storing a [`SocketAddr`] because for IPv6 socket
/// addresses include fields that can't be assumed consistent even within a single connection.
#[derive(Debug, derive_more::Display, Clone, Copy, Hash, PartialEq, Eq)]
#[derive(Debug, derive_more::Display, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[display("{}", SocketAddr::from(*self))]
pub struct IpPort {
ip: IpAddr,
Expand Down Expand Up @@ -635,7 +651,7 @@ mod tests {
// add [`MAX_INACTIVE_DIRECT_ADDRESSES`] active direct addresses and double
// [`MAX_INACTIVE_DIRECT_ADDRESSES`] that are inactive

// active addresses
info!("Adding active addresses");
for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES {
let addr = SocketAddr::new(LOCALHOST, 5000 + i as u16);
let node_addr = NodeAddr::new(public_key).with_direct_addresses([addr]);
Expand All @@ -645,8 +661,8 @@ mod tests {
node_map.inner.lock().receive_udp(addr);
}

// offline addresses
for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES {
info!("Adding offline/inactive addresses");
for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES * 2 {
let addr = SocketAddr::new(LOCALHOST, 6000 + i as u16);
let node_addr = NodeAddr::new(public_key).with_direct_addresses([addr]);
node_map.add_node_addr(node_addr);
Expand All @@ -655,26 +671,32 @@ mod tests {
let mut node_map_inner = node_map.inner.lock();
let endpoint = node_map_inner.by_id.get_mut(&id).unwrap();

// online but inactive addresses discovered via ping
info!("Adding alive addresses");
for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES {
let addr = SendAddr::Udp(SocketAddr::new(LOCALHOST, 7000 + i as u16));
let txid = stun::TransactionId::from([i as u8; 12]);
// Note that this already invokes .prune_direct_addresses() because these are
// new UDP paths.
endpoint.handle_ping(addr, txid);
}

info!("Pruning addresses");
endpoint.prune_direct_addresses();

// Half the offline addresses should have been pruned. All the active and alive
// addresses should have been kept.
assert_eq!(
endpoint.direct_addresses().count(),
MAX_INACTIVE_DIRECT_ADDRESSES * 2
MAX_INACTIVE_DIRECT_ADDRESSES * 3
);

// We should have both offline and alive addresses which are not active.
assert_eq!(
endpoint
.direct_address_states()
.filter(|(_addr, state)| !state.is_active())
.count(),
MAX_INACTIVE_DIRECT_ADDRESSES
MAX_INACTIVE_DIRECT_ADDRESSES * 2
)
}

Expand Down
Loading

0 comments on commit b173520

Please sign in to comment.