From b98ed9d77655e37673efcf01ae6a854f8685e3da Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 21 Mar 2024 12:07:43 +0100 Subject: [PATCH] fix(iroh-net): improve backpressure handling (#2105) Feed backpressure in the form of `Poll::Pending` and waking up wakers back through `poll_send` when sending to derp. Should help with #2104, but that might have other problems as well. --- iroh-net/src/magicsock.rs | 83 +++++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 274dd95b57..67f6b79cfd 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -327,6 +327,8 @@ impl Inner { let mut udp_sent = false; let mut derp_sent = false; let mut udp_error = None; + let mut udp_pending = false; + let mut derp_pending = false; // send udp if let Some(addr) = udp_addr { @@ -334,8 +336,8 @@ impl Inner { for t in transmits.iter_mut() { t.destination = addr; } - match ready!(self.poll_send_udp(addr, &transmits, cx)) { - Ok(n) => { + match self.poll_send_udp(addr, &transmits, cx) { + Poll::Ready(Ok(n)) => { trace!(node = %public_key.fmt_short(), dst = %addr, transmit_count=n, "sent transmits over UDP"); // truncate the transmits vec to `n`. these transmits will be sent to // Derp further below. We only want to send those transmits to Derp that were @@ -348,18 +350,46 @@ impl Inner { udp_sent = true; // record metrics. } - Err(err) => { + Poll::Ready(Err(err)) => { error!(node = %public_key.fmt_short(), ?addr, "failed to send udp: {err:?}"); udp_error = Some(err); } + Poll::Pending => { + udp_pending = true; + } } } // send derp if let Some(ref derp_url) = derp_url { - self.try_send_derp(derp_url, public_key, split_packets(&transmits)); - transmits_sent = transmits.len(); - derp_sent = true; + match self.poll_send_derp(derp_url, public_key, split_packets(&transmits)) { + Poll::Ready(sent) => { + derp_sent = sent; + transmits_sent = transmits.len(); + } + Poll::Pending => { + self.network_send_wakers.lock().replace(cx.waker().clone()); + derp_pending = true; + } + } + } + + if udp_addr.is_none() && derp_url.is_none() { + // Handle no addresses being available + warn!(node = %public_key.fmt_short(), "failed to send: no UDP or DERP addr"); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "no UDP or Derp address available for node", + ))); + } + + if (udp_addr.is_none() || udp_pending) && (derp_url.is_none() || derp_pending) { + // Handle backpressure + // The explicit choice here is to only return pending, iff all available paths returned + // pending. + // This might result in one channel being backed up, without the system noticing, but + // for now this seems to be the best choice workable in the current implementation. + return Poll::Pending; } if !derp_sent && !udp_sent && !pings_sent { @@ -370,17 +400,17 @@ impl Inner { "no UDP or Derp address available for node", ) }); - Poll::Ready(Err(err)) - } else { - trace!( - node = %public_key.fmt_short(), - transmit_count = %transmits_sent, - send_udp = ?udp_addr, - send_derp = ?derp_url, - "sent transmits" - ); - Poll::Ready(Ok(transmits_sent)) + return Poll::Ready(Err(err)); } + + trace!( + node = %public_key.fmt_short(), + transmit_count = %transmits_sent, + send_udp = ?udp_addr, + send_derp = ?derp_url, + "sent transmits" + ); + Poll::Ready(Ok(transmits_sent)) } None => { error!(dst=%dest, "no endpoint for mapped address"); @@ -794,12 +824,13 @@ impl Inner { debug!(node = %dst_key.fmt_short(), %url, %msg, "send disco message (derp)"); let pkt = self.encode_disco_message(dst_key, &msg); inc!(MagicsockMetrics, send_disco_derp); - if self.try_send_derp(url, dst_key, smallvec![pkt]) { - inc!(MagicsockMetrics, sent_disco_derp); - disco_message_sent(&msg); - true - } else { - false + match self.poll_send_derp(url, dst_key, smallvec![pkt]) { + Poll::Ready(true) => { + inc!(MagicsockMetrics, sent_disco_derp); + disco_message_sent(&msg); + true + } + _ => false, } } @@ -900,7 +931,7 @@ impl Inner { Poll::Ready(Ok(())) } - fn try_send_derp(&self, url: &DerpUrl, node: PublicKey, contents: DerpContents) -> bool { + fn poll_send_derp(&self, url: &DerpUrl, node: PublicKey, contents: DerpContents) -> Poll { trace!(node = %node.fmt_short(), derp_url = %url, count = contents.len(), len = contents.iter().map(|c| c.len()).sum::(), "send derp"); let msg = DerpActorMessage::Send { url: url.clone(), @@ -910,15 +941,15 @@ impl Inner { match self.derp_actor_sender.try_send(msg) { Ok(_) => { trace!(node = %node.fmt_short(), derp_url = %url, "send derp: message queued"); - true + Poll::Ready(true) } Err(mpsc::error::TrySendError::Closed(_)) => { warn!(node = %node.fmt_short(), derp_url = %url, "send derp: message dropped, channel to actor is closed"); - false + Poll::Ready(false) } Err(mpsc::error::TrySendError::Full(_)) => { warn!(node = %node.fmt_short(), derp_url = %url, "send derp: message dropped, channel to actor is full"); - false + Poll::Pending } } }