Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Mar 15, 2024
1 parent 458944a commit 96cbd60
Showing 1 changed file with 44 additions and 27 deletions.
71 changes: 44 additions & 27 deletions iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}
}

self.maybe_queue_next();
self.maybe_start_next();

#[cfg(any(test, debug_assertions))]
self.check_invariants();
Expand Down Expand Up @@ -568,7 +568,12 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
self.intents.insert(intent_id, intent);
}

// async to send abort message on progress sender
/// Cancels the download request.
///
/// This removes the registered download intent and, depending on its state, it will either
/// remove it from the scheduled requests, or cancel the future.o send abort message on progress sender
///
/// The method is async because it will send a final abort event on the progress sender.
async fn handle_cancel_download(&mut self, intent_id: IntentId, hash: Hash) {
let mut no_more_intents = false;
if let Entry::Occupied(mut occupied_entry) = self.requests.entry(hash) {
Expand Down Expand Up @@ -619,7 +624,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}

fn on_download_completed(&mut self, hash: Hash, result: TransferResult) {
// get active request info
// first remove the request
let active_request_info = self
.active_requests
.remove(&hash)
Expand Down Expand Up @@ -697,6 +702,10 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}
}

/// Finalize a download.
///
/// This triggers the intent return channels, and removes the download from the progress tracker
/// and provider map.
fn finalize_download(&mut self, hash: Hash, info: RequestInfo, result: DownloadResult) {
let intents = info
.intents
Expand All @@ -710,7 +719,15 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
self.providers.remove_hash(&hash);
}

fn maybe_queue_next(&mut self) {
/// Start the next downloads, or dial nodes, if limits permit and the queue is non-empty.
///
/// This is called after all actions. If there is nothing to do, it will return cheaply.
/// Otherwise, we will check the next hash in the queue, and:
/// * start the transfer if we are connected to a provider and limits are ok
/// * or, connect to a provider, if there is one we are not dialing yet and limits are ok
/// * or, disconnect an idle node if it would allow us to connect to a provider,
/// * or, if our limits are reached, do nothing for now
fn maybe_start_next(&mut self) {
// start as many queued downloads as allowed by the request limits.
loop {
// if request limit reached: break.
Expand All @@ -722,28 +739,22 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}

// if queue empty: break.
let Some(hash) = self.queue.pop_front() else {
let Some(hash) = self.queue.front() else {
break;
};
let hash = *hash;

match self.next_action_for_hash(&hash) {
HashAction::ShouldDial(node) => {
let can_dial = !self.at_connections_capacity()
|| {
if let Some(key) = self.goodbye_nodes_queue.peek() {
let expired = self.goodbye_nodes_queue.remove(&key);
let node = expired.into_inner();
debug!(node=%node.fmt_short(), "disconnect from idle node to make room for needed connections");
self.nodes.remove(&node);
true
} else {
false
}
};
if can_dial {
HashAction::Dial(node) => {
self.dialer.queue_dial(node);
}
HashAction::DialAfterIdleDisconnect(node) => {
if let Some(key) = self.goodbye_nodes_queue.peek() {
let expired = self.goodbye_nodes_queue.remove(&key);
let expired_node = expired.into_inner();
debug!(node=%expired_node.fmt_short(), "disconnect from idle node to make room for needed connections");
self.nodes.remove(&expired_node);
self.dialer.queue_dial(node);
} else {
break;
}
}
HashAction::OutOfProviders => {
Expand All @@ -770,7 +781,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}

let mut has_dialing = false;
let mut has_full = false;
let mut has_exhausted = false;
let mut nodes_available = vec![];
let mut node_to_dial = None;
for node in candidates {
Expand All @@ -780,7 +791,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
if has_capacity {
nodes_available.push((node, req_count));
} else {
has_full = true;
has_exhausted = true;
}
} else if self.dialer.is_pending(node) {
has_dialing = true;
Expand All @@ -795,9 +806,14 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
HashAction::StartTransfer(**node)
} else {
match node_to_dial {
// Some(node) if !self.at_connections_capacity() => HashAction::Dial(*node),
Some(node) => HashAction::ShouldDial(*node),
None => match has_dialing || has_full {
Some(node) => {
if self.at_connections_capacity() {
HashAction::DialAfterIdleDisconnect(*node)
} else {
HashAction::Dial(*node)
}
}
None => match has_dialing || has_exhausted {
true => HashAction::Waiting,
false => HashAction::OutOfProviders,
},
Expand Down Expand Up @@ -884,7 +900,8 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {

#[derive(Debug)]
enum HashAction {
ShouldDial(NodeId),
Dial(NodeId),
DialAfterIdleDisconnect(NodeId),
StartTransfer(NodeId),
OutOfProviders,
Waiting,
Expand Down

0 comments on commit 96cbd60

Please sign in to comment.