Skip to content

Commit

Permalink
fix(iroh-net): improve backpressure handling (#2105)
Browse files Browse the repository at this point in the history
Feed backpressure in the form of `Poll::Pending` and waking up wakers
back through `poll_send` when sending to derp.

Should help with #2104, but that might have other problems as well.
  • Loading branch information
dignifiedquire authored Mar 21, 2024
1 parent 77df843 commit b98ed9d
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,17 @@ impl Inner {
let mut udp_sent = false;
let mut derp_sent = false;
let mut udp_error = None;
let mut udp_pending = false;
let mut derp_pending = false;

// send udp
if let Some(addr) = udp_addr {
// rewrite target addresses.
for t in transmits.iter_mut() {
t.destination = addr;
}
match ready!(self.poll_send_udp(addr, &transmits, cx)) {
Ok(n) => {
match self.poll_send_udp(addr, &transmits, cx) {
Poll::Ready(Ok(n)) => {
trace!(node = %public_key.fmt_short(), dst = %addr, transmit_count=n, "sent transmits over UDP");
// truncate the transmits vec to `n`. these transmits will be sent to
// Derp further below. We only want to send those transmits to Derp that were
Expand All @@ -348,18 +350,46 @@ impl Inner {
udp_sent = true;
// record metrics.
}
Err(err) => {
Poll::Ready(Err(err)) => {
error!(node = %public_key.fmt_short(), ?addr, "failed to send udp: {err:?}");
udp_error = Some(err);
}
Poll::Pending => {
udp_pending = true;
}
}
}

// send derp
if let Some(ref derp_url) = derp_url {
self.try_send_derp(derp_url, public_key, split_packets(&transmits));
transmits_sent = transmits.len();
derp_sent = true;
match self.poll_send_derp(derp_url, public_key, split_packets(&transmits)) {
Poll::Ready(sent) => {
derp_sent = sent;
transmits_sent = transmits.len();
}
Poll::Pending => {
self.network_send_wakers.lock().replace(cx.waker().clone());
derp_pending = true;
}
}
}

if udp_addr.is_none() && derp_url.is_none() {
// Handle no addresses being available
warn!(node = %public_key.fmt_short(), "failed to send: no UDP or DERP addr");
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"no UDP or Derp address available for node",
)));
}

if (udp_addr.is_none() || udp_pending) && (derp_url.is_none() || derp_pending) {
// Handle backpressure
// The explicit choice here is to only return pending, iff all available paths returned
// pending.
// This might result in one channel being backed up, without the system noticing, but
// for now this seems to be the best choice workable in the current implementation.
return Poll::Pending;
}

if !derp_sent && !udp_sent && !pings_sent {
Expand All @@ -370,17 +400,17 @@ impl Inner {
"no UDP or Derp address available for node",
)
});
Poll::Ready(Err(err))
} else {
trace!(
node = %public_key.fmt_short(),
transmit_count = %transmits_sent,
send_udp = ?udp_addr,
send_derp = ?derp_url,
"sent transmits"
);
Poll::Ready(Ok(transmits_sent))
return Poll::Ready(Err(err));
}

trace!(
node = %public_key.fmt_short(),
transmit_count = %transmits_sent,
send_udp = ?udp_addr,
send_derp = ?derp_url,
"sent transmits"
);
Poll::Ready(Ok(transmits_sent))
}
None => {
error!(dst=%dest, "no endpoint for mapped address");
Expand Down Expand Up @@ -794,12 +824,13 @@ impl Inner {
debug!(node = %dst_key.fmt_short(), %url, %msg, "send disco message (derp)");
let pkt = self.encode_disco_message(dst_key, &msg);
inc!(MagicsockMetrics, send_disco_derp);
if self.try_send_derp(url, dst_key, smallvec![pkt]) {
inc!(MagicsockMetrics, sent_disco_derp);
disco_message_sent(&msg);
true
} else {
false
match self.poll_send_derp(url, dst_key, smallvec![pkt]) {
Poll::Ready(true) => {
inc!(MagicsockMetrics, sent_disco_derp);
disco_message_sent(&msg);
true
}
_ => false,
}
}

Expand Down Expand Up @@ -900,7 +931,7 @@ impl Inner {
Poll::Ready(Ok(()))
}

fn try_send_derp(&self, url: &DerpUrl, node: PublicKey, contents: DerpContents) -> bool {
fn poll_send_derp(&self, url: &DerpUrl, node: PublicKey, contents: DerpContents) -> Poll<bool> {
trace!(node = %node.fmt_short(), derp_url = %url, count = contents.len(), len = contents.iter().map(|c| c.len()).sum::<usize>(), "send derp");
let msg = DerpActorMessage::Send {
url: url.clone(),
Expand All @@ -910,15 +941,15 @@ impl Inner {
match self.derp_actor_sender.try_send(msg) {
Ok(_) => {
trace!(node = %node.fmt_short(), derp_url = %url, "send derp: message queued");
true
Poll::Ready(true)
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(node = %node.fmt_short(), derp_url = %url, "send derp: message dropped, channel to actor is closed");
false
Poll::Ready(false)
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(node = %node.fmt_short(), derp_url = %url, "send derp: message dropped, channel to actor is full");
false
Poll::Pending
}
}
}
Expand Down

0 comments on commit b98ed9d

Please sign in to comment.