Skip to content

Commit

Permalink
fix: make sure to never remove busy nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Apr 18, 2024
1 parent 0367468 commit 51e251d
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
}
Some(expired) = self.goodbye_nodes_queue.next() => {
let node = expired.into_inner();
self.connected_nodes.remove(&node);
trace!(node=%node.fmt_short(), "tick: goodbye node");
self.disconnect_idle_node(node, "idle expired");
}
}

Expand Down Expand Up @@ -781,6 +781,10 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,

/// Handle receiving a new connection.
fn on_connection_ready(&mut self, node: NodeId, result: anyhow::Result<D::Connection>) {
debug_assert!(
!self.connected_nodes.contains_key(&node),
"newly connected node is not yet connected"
);
match result {
Ok(connection) => {
trace!(node=%node.fmt_short(), "connected to node");
Expand All @@ -790,7 +794,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
}
Err(err) => {
debug!(%node, %err, "connection to node failed");
self.try_queue_node_retry(node);
self.disconnect_and_retry(node);
}
}
}
Expand Down Expand Up @@ -850,7 +854,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
Err(FailureAction::RetryLater(reason)) => {
debug!(%kind, node=%node.fmt_short(), %reason, "download failed: retry later");
if node_info.is_idle() {
self.try_queue_node_retry(node);
self.disconnect_and_retry(node);
}
}
};
Expand Down Expand Up @@ -950,7 +954,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
}
NextStep::DialQueuedDisconnect(node, key) => {
let idle_node = self.goodbye_nodes_queue.remove(&key).into_inner();
self.connected_nodes.remove(&idle_node);
self.disconnect_idle_node(idle_node, "drop idle for new dial");
debug!(%kind, node=%node.fmt_short(), idle_node=%idle_node.fmt_short(), "dial node, disconnect idle node)");
self.dialer.queue_dial(node);
}
Expand All @@ -968,8 +972,9 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
}
}

fn try_queue_node_retry(&mut self, node: NodeId) {
self.connected_nodes.remove(&node);
/// Drop the connection to a node and insert it into the the retry queue.
fn disconnect_and_retry(&mut self, node: NodeId) {
self.disconnect_idle_node(node, "queue retry");
let retry_state = self.retry_node_state.entry(node).or_default();
retry_state.retry_count += 1;
if retry_state.retry_count <= self.retry_config.max_retries_per_node {
Expand Down Expand Up @@ -1160,24 +1165,30 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
self.in_progress_downloads.spawn_local(fut);
}

fn remove_node(&mut self, node: NodeId, reason: &'static str) {
debug!(node = %node.fmt_short(), %reason, "remove node");

// ensure that the node is idle or disconnected
fn disconnect_idle_node(&mut self, node: NodeId, reason: &'static str) -> bool {
if let Some(info) = self.connected_nodes.remove(&node) {
match info.state {
ConnectedState::Idle { drop_key } => {
self.goodbye_nodes_queue.try_remove(&drop_key);
true
}
ConnectedState::Busy { .. } => {
debug_assert!(false,"expected removed node to be idle, but is busy (removal reason: {reason:?})");
warn!("expected removed node to be idle, but is busy (removal reason: {reason:?})");
self.connected_nodes.insert(node, info);
return;
false
}
}
} else {
true
}
}

fn remove_node(&mut self, node: NodeId, reason: &'static str) {
debug!(node = %node.fmt_short(), %reason, "remove node");
if self.disconnect_idle_node(node, reason) {
self.providers.remove_node(&node);
self.retry_node_state.remove(&node);
}
self.providers.remove_node(&node);
self.retry_node_state.remove(&node);
}

fn node_state<'a>(&'a self, node: &NodeId) -> NodeState<'a, D::Connection> {
Expand Down

0 comments on commit 51e251d

Please sign in to comment.