Skip to content

Commit

Permalink
refactor: move Dialer from iroh-gossip to iroh-net (#1998)
Browse files Browse the repository at this point in the history
## Description

This moves the `Dialer` from iroh-gossip to iroh-net. This has been a
TODO for a while and is a prerequisite for moving the downloader from
iroh to iroh-bytes.

## Notes & open questions

Only code moves, no other changes.

## Change checklist

- [x] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
Frando authored Feb 6, 2024
1 parent 803eac8 commit 90a5160
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 123 deletions.
5 changes: 3 additions & 2 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use bytes::{Bytes, BytesMut};
use futures::{stream::Stream, FutureExt};
use genawaiter::sync::{Co, Gen};
use iroh_net::{
key::PublicKey, magic_endpoint::get_remote_node_id, AddrInfo, MagicEndpoint, NodeAddr,
dialer::Dialer, key::PublicKey, magic_endpoint::get_remote_node_id, AddrInfo, MagicEndpoint,
NodeAddr,
};
use rand::rngs::StdRng;
use rand_core::SeedableRng;
Expand All @@ -16,7 +17,7 @@ use tokio::{
};
use tracing::{debug, error_span, trace, warn, Instrument};

use self::util::{read_message, write_message, Dialer, Timers};
use self::util::{read_message, write_message, Timers};
use crate::proto::{self, PeerData, Scope, TopicId};

pub mod util;
Expand Down
121 changes: 2 additions & 119 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
//! Utilities for iroh-gossip networking
use std::{collections::HashMap, io, pin::Pin, time::Instant};
use std::{io, pin::Pin, time::Instant};

use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use iroh_net::{key::PublicKey, MagicEndpoint, NodeAddr, NodeId};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
task::JoinSet,
time::{sleep_until, Sleep},
};
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::proto::util::TimerMap;

Expand Down Expand Up @@ -78,118 +73,6 @@ pub async fn read_lp(
Ok(Some(buffer.split_to(size).freeze()))
}

/// Future for a pending dial operation
pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result<quinn::Connection>)>;

/// Dial nodes and maintain a queue of pending dials
///
/// This wraps a [`MagicEndpoint`], connects to nodes through the endpoint, stores
/// the pending connect futures and emits finished connect results.
///
// TODO: Move to iroh-net
#[derive(Debug)]
pub struct Dialer {
endpoint: MagicEndpoint,
pending: JoinSet<(PublicKey, anyhow::Result<quinn::Connection>)>,
pending_dials: HashMap<PublicKey, CancellationToken>,
}

impl Dialer {
/// Create a new dialer for a [`MagicEndpoint`]
pub fn new(endpoint: MagicEndpoint) -> Self {
Self {
endpoint,
pending: Default::default(),
pending_dials: Default::default(),
}
}

/// Start to dial a node.
///
/// Note that the node's addresses and/or derp url must be added to the endpoint's
/// addressbook for a dial to succeed, see [`MagicEndpoint::add_node_addr`].
pub fn queue_dial(&mut self, node_id: NodeId, alpn_protocol: &'static [u8]) {
if self.is_pending(&node_id) {
return;
}
let cancel = CancellationToken::new();
self.pending_dials.insert(node_id, cancel.clone());
let endpoint = self.endpoint.clone();
self.pending.spawn(async move {
let res = tokio::select! {
biased;
_ = cancel.cancelled() => Err(anyhow!("Cancelled")),
res = endpoint.connect(NodeAddr::new(node_id), alpn_protocol) => res
};
(node_id, res)
});
}

/// Abort a pending dial
pub fn abort_dial(&mut self, node_id: &NodeId) {
if let Some(cancel) = self.pending_dials.remove(node_id) {
cancel.cancel();
}
}

/// Check if a node is currently being dialed
pub fn is_pending(&self, node: &NodeId) -> bool {
self.pending_dials.contains_key(node)
}

/// Wait for the next dial operation to complete
pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result<quinn::Connection>) {
match self.pending_dials.is_empty() {
false => {
let (node_id, res) = loop {
match self.pending.join_next().await {
Some(Ok((node_id, res))) => {
self.pending_dials.remove(&node_id);
break (node_id, res);
}
Some(Err(e)) => {
error!("next conn error: {:?}", e);
}
None => {
error!("no more pending conns available");
futures::future::pending().await
}
}
};

(node_id, res)
}
true => futures::future::pending().await,
}
}

/// Number of pending connections to be opened.
pub fn pending_count(&self) -> usize {
self.pending_dials.len()
}
}

impl futures::Stream for Dialer {
type Item = (PublicKey, anyhow::Result<quinn::Connection>);

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.pending.poll_join_next(cx) {
std::task::Poll::Ready(Some(Ok((node_id, result)))) => {
self.pending_dials.remove(&node_id);
std::task::Poll::Ready(Some((node_id, result)))
}
std::task::Poll::Ready(Some(Err(e))) => {
error!("dialer error: {:?}", e);
std::task::Poll::Pending
}
_ => std::task::Poll::Pending,
}
}
}

/// A [`TimerMap`] with an async method to wait for the next timer expiration.
#[derive(Debug)]
pub struct Timers<T> {
Expand Down
120 changes: 120 additions & 0 deletions iroh-net/src/dialer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//! A dialer to dial nodes
use std::{collections::HashMap, pin::Pin, task::Poll};

use crate::{key::PublicKey, MagicEndpoint, NodeAddr, NodeId};
use anyhow::anyhow;
use futures::future::BoxFuture;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::error;

/// Dial nodes and maintain a queue of pending dials
///
/// This wraps a [`MagicEndpoint`], connects to nodes through the endpoint, stores
/// the pending connect futures and emits finished connect results.
#[derive(Debug)]
pub struct Dialer {
endpoint: MagicEndpoint,
pending: JoinSet<(PublicKey, anyhow::Result<quinn::Connection>)>,
pending_dials: HashMap<PublicKey, CancellationToken>,
}

impl Dialer {
/// Create a new dialer for a [`MagicEndpoint`]
pub fn new(endpoint: MagicEndpoint) -> Self {
Self {
endpoint,
pending: Default::default(),
pending_dials: Default::default(),
}
}

/// Start to dial a node.
///
/// Note that the node's addresses and/or derp url must be added to the endpoint's
/// addressbook for a dial to succeed, see [`MagicEndpoint::add_node_addr`].
pub fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
if self.is_pending(&node_id) {
return;
}
let cancel = CancellationToken::new();
self.pending_dials.insert(node_id, cancel.clone());
let endpoint = self.endpoint.clone();
self.pending.spawn(async move {
let res = tokio::select! {
biased;
_ = cancel.cancelled() => Err(anyhow!("Cancelled")),
res = endpoint.connect(NodeAddr::new(node_id), alpn) => res
};
(node_id, res)
});
}

/// Abort a pending dial
pub fn abort_dial(&mut self, node_id: &NodeId) {
if let Some(cancel) = self.pending_dials.remove(node_id) {
cancel.cancel();
}
}

/// Check if a node is currently being dialed
pub fn is_pending(&self, node: &NodeId) -> bool {
self.pending_dials.contains_key(node)
}

/// Wait for the next dial operation to complete
pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result<quinn::Connection>) {
match self.pending_dials.is_empty() {
false => {
let (node_id, res) = loop {
match self.pending.join_next().await {
Some(Ok((node_id, res))) => {
self.pending_dials.remove(&node_id);
break (node_id, res);
}
Some(Err(e)) => {
error!("next conn error: {:?}", e);
}
None => {
error!("no more pending conns available");
futures::future::pending().await
}
}
};

(node_id, res)
}
true => futures::future::pending().await,
}
}

/// Number of pending connections to be opened.
pub fn pending_count(&self) -> usize {
self.pending_dials.len()
}
}

impl futures::Stream for Dialer {
type Item = (PublicKey, anyhow::Result<quinn::Connection>);

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.pending.poll_join_next(cx) {
Poll::Ready(Some(Ok((node_id, result)))) => {
self.pending_dials.remove(&node_id);
Poll::Ready(Some((node_id, result)))
}
Poll::Ready(Some(Err(e))) => {
error!("dialer error: {:?}", e);
Poll::Pending
}
_ => Poll::Pending,
}
}
}

/// Future for a pending dial operation
pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result<quinn::Connection>)>;
1 change: 1 addition & 0 deletions iroh-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
pub mod config;
pub mod defaults;
pub mod derp;
pub mod dialer;
mod disco;
mod dns;
pub mod key;
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl Downloader {
{
let me = endpoint.node_id().fmt_short();
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
let dialer = iroh_gossip::net::util::Dialer::new(endpoint);
let dialer = iroh_net::dialer::Dialer::new(endpoint);

let create_future = move || {
let concurrency_limits = ConcurrencyLimits::default();
Expand Down Expand Up @@ -1173,7 +1173,7 @@ impl ProviderMap {
}
}

impl Dialer for iroh_gossip::net::util::Dialer {
impl Dialer for iroh_net::dialer::Dialer {
type Connection = quinn::Connection;

fn queue_dial(&mut self, node_id: NodeId) {
Expand Down

0 comments on commit 90a5160

Please sign in to comment.