Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net): persist known peer info #1488

Merged
merged 30 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5b5902b
refactor(iroh-net): cleanup Node
divagant-martian Sep 17, 2023
0e4cec6
completely remove `iroh-net::config::Node`
divagant-martian Sep 17, 2023
e35ebba
fix spelling
divagant-martian Sep 17, 2023
0e9d3fa
add peers path to the magic endpoint builder
divagant-martian Sep 18, 2023
e36012f
connect peers_path
divagant-martian Sep 18, 2023
5d948ad
add addr_info to the Endpoint to retrieve addressing information
divagant-martian Sep 18, 2023
fae6979
convert between KnownPeers and PeerMap
divagant-martian Sep 18, 2023
a296afa
Merge branch 'remove-unused-node-fields' into persist-peer-map
divagant-martian Sep 18, 2023
2e59718
convert between KnownPeers and PeerMap passing the sender
divagant-martian Sep 18, 2023
a9c1c4d
save on shutdown
divagant-martian Sep 18, 2023
69e85b6
load peer data
divagant-martian Sep 18, 2023
d9313a2
remove the NodeAddr type
divagant-martian Sep 18, 2023
4276a20
unify anb make comparable
divagant-martian Sep 18, 2023
a597d08
make AddrInfo comparisons stable
divagant-martian Sep 18, 2023
718056e
offerings to clippy
divagant-martian Sep 18, 2023
0be1d39
Merge branch 'main' into persist-peer-map
divagant-martian Sep 19, 2023
c8439e2
connect `peers_data_path` up to `bin`
divagant-martian Sep 19, 2023
cf6262a
do better handling of the peers file not being present
divagant-martian Sep 19, 2023
84825ef
save on a timer, improve loading
divagant-martian Sep 19, 2023
d6fe89d
cleanup
divagant-martian Sep 19, 2023
afa69d0
remove unnecessary alias
divagant-martian Sep 20, 2023
d036c3c
expand docs
divagant-martian Sep 20, 2023
a59c5cd
Merge branch 'main' into persist-peer-map
divagant-martian Sep 22, 2023
0a54603
post merge corrections
divagant-martian Sep 22, 2023
ee2d35e
use hashset in addrinfo
divagant-martian Sep 22, 2023
7d2246b
stream peer data contents
divagant-martian Sep 22, 2023
68582b7
clippy
divagant-martian Sep 22, 2023
36e5c98
docs and stuff
divagant-martian Sep 22, 2023
d312516
do not mess around with files if there is no data to be saved
divagant-martian Sep 24, 2023
a099524
Merge branch 'main' into persist-peer-map
divagant-martian Sep 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 8 additions & 34 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
//! Networking for the `iroh-gossip` protocol

use std::{
collections::HashMap, fmt, future::Future, net::SocketAddr, sync::Arc, task::Poll,
time::Instant,
};

use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use futures::{stream::Stream, FutureExt};
use genawaiter::sync::{Co, Gen};
use iroh_net::magic_endpoint::AddrInfo as IrohInfo;
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint};
use rand::rngs::StdRng;
use rand_core::SeedableRng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt, future::Future, sync::Arc, task::Poll, time::Instant};
use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
Expand Down Expand Up @@ -253,32 +249,6 @@ impl Future for JoinTopicFut {
}
}

/// Addressing information for peers.
///
/// This struct is serialized and transmitted to peers in `Join` and `ForwardJoin` messages.
/// It contains the information needed by `iroh-net` to connect to peers.
///
/// TODO: Replace with type from iroh-net
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct IrohInfo {
addrs: Vec<SocketAddr>,
derp_region: Option<u16>,
}

impl IrohInfo {
async fn from_endpoint(endpoint: &MagicEndpoint) -> anyhow::Result<Self> {
Ok(Self {
addrs: endpoint
.local_endpoints()
.await?
.iter()
.map(|ep| ep.addr)
.collect(),
derp_region: endpoint.my_derp().await,
})
}
}

/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
#[derive(Debug)]
enum ConnOrigin {
Expand Down Expand Up @@ -370,7 +340,7 @@ impl Actor {
}
},
_ = self.on_endpoints_rx.changed() => {
let info = IrohInfo::from_endpoint(&self.endpoint).await?;
let info = self.endpoint.my_addr_info().await?;
let peer_data = postcard::to_stdvec(&info)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?;
}
Expand Down Expand Up @@ -537,9 +507,13 @@ impl Actor {
Err(err) => warn!("Failed to decode PeerData from {peer}: {err}"),
Ok(info) => {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
let IrohInfo {
derp_region,
endpoints,
} = info;
if let Err(err) = self
.endpoint
.add_known_addrs(peer, info.derp_region, &info.addrs)
.add_known_addrs(peer, derp_region, &endpoints)
.await
{
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}");
Expand Down
93 changes: 62 additions & 31 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! An endpoint that leverages a [quinn::Endpoint] backed by a [magicsock::MagicSock].

use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::{anyhow, ensure, Context, Result};
use quinn_proto::VarInt;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};

use crate::{
Expand All @@ -17,18 +18,32 @@ use crate::{

pub use super::magicsock::EndpointInfo as ConnectionInfo;

/// Address information for a node.
#[derive(Debug)]
pub struct NodeAddr {
/// The node's public key.
pub node_id: PublicKey,
/// Addressing information to connect to a peer.
#[derive(Debug, Serialize, Deserialize, Eq)]
pub struct AddrInfo {
/// The node's home DERP region.
pub derp_region: Option<u16>,
/// Socket addresses where this node might be reached directly.
pub endpoints: Vec<SocketAddr>,
}

/// Builder for [MagicEndpoint]
impl PartialEq for AddrInfo {
fn eq(&self, other: &Self) -> bool {
if self.derp_region != other.derp_region {
return false;
}
if self.endpoints.len() != other.endpoints.len() {
return false;
}
let mut self_eps = self.endpoints.clone();
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
self_eps.sort();
let mut others_eps = self.endpoints.clone();
others_eps.sort();
self_eps == others_eps
}
}

/// Builder for [`MagicEndpoint`]
#[derive(Debug)]
pub struct MagicEndpointBuilder {
secret_key: Option<SecretKey>,
Expand All @@ -38,6 +53,8 @@ pub struct MagicEndpointBuilder {
concurrent_connections: Option<u32>,
keylog: bool,
callbacks: Callbacks,
/// Path for known peers. See [`MagicEndpointBuilder::peers_data_path`].
peers_path: Option<PathBuf>,
}

impl Default for MagicEndpointBuilder {
Expand All @@ -50,6 +67,7 @@ impl Default for MagicEndpointBuilder {
concurrent_connections: Default::default(),
keylog: Default::default(),
callbacks: Default::default(),
peers_path: None,
}
}
}
Expand Down Expand Up @@ -154,6 +172,14 @@ impl MagicEndpointBuilder {
self
}

/// Optionally set the path where peer info should be stored.
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
///
/// If the file exists, it will be used to populate an initial set of peers.
pub fn peers_data_path(mut self, path: PathBuf) -> Self {
self.peers_path = Some(path);
self
}

/// Bind the magic endpoint on the specified socket address.
///
/// The *bind_port* is the port that should be bound locally.
Expand All @@ -178,15 +204,14 @@ impl MagicEndpointBuilder {
if let Some(c) = self.concurrent_connections {
server_config.concurrent_connections(c);
}
MagicEndpoint::bind(
let msock_opts = magicsock::Options {
port: bind_port,
secret_key,
bind_port,
Some(server_config),
self.derp_map.unwrap_or_default(),
Some(self.callbacks),
self.keylog,
)
.await
derp_map: self.derp_map.unwrap_or_default(),
callbacks: self.callbacks,
peers_path: self.peers_path,
};
MagicEndpoint::bind(Some(server_config), msock_opts, self.keylog).await
}
}

Expand Down Expand Up @@ -219,23 +244,15 @@ impl MagicEndpoint {

/// Create a quinn endpoint backed by a magicsock.
///
/// This is for internal use, the public interface is the [MagicEndpointBuilder] obtained from
/// This is for internal use, the public interface is the [`MagicEndpointBuilder`] obtained from
/// [Self::builder]. See the methods on the builder for documentation of the parameters.
async fn bind(
secret_key: SecretKey,
bind_port: u16,
server_config: Option<quinn::ServerConfig>,
derp_map: DerpMap,
callbacks: Option<Callbacks>,
msock_opts: magicsock::Options,
keylog: bool,
) -> Result<Self> {
let msock = magicsock::MagicSock::new(magicsock::Options {
port: bind_port,
derp_map,
secret_key: secret_key.clone(),
callbacks: callbacks.unwrap_or_default(),
})
.await?;
let secret_key = msock_opts.secret_key.clone();
let msock = magicsock::MagicSock::new(msock_opts).await?;
trace!("created magicsock");

let endpoint = quinn::Endpoint::new_with_abstract_socket(
Expand Down Expand Up @@ -391,16 +408,15 @@ impl MagicEndpoint {
/// If no UDP addresses are added, and the given `derp_region` cannot be dialed, it will error.
pub async fn add_known_addrs(
&self,
node_id: PublicKey,
peer: PublicKey,
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
derp_region: Option<u16>,
endpoints: &[SocketAddr],
) -> Result<()> {
let addr = NodeAddr {
node_id,
let info = AddrInfo {
derp_region,
endpoints: endpoints.to_vec(),
};
self.msock.add_known_addr(addr).await?;
self.msock.add_known_addr(peer, info).await?;
Ok(())
}

Expand All @@ -422,6 +438,21 @@ impl MagicEndpoint {
Ok(())
}

/// Get addressing information of this [`MagicEndpoint`].
pub async fn my_addr_info(&self) -> Result<AddrInfo> {
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
let endpoints = self
.local_endpoints()
.await?
.iter()
.map(|ep| ep.addr)
.collect();
let derp_region = self.my_derp().await;
Ok(AddrInfo {
derp_region,
endpoints,
})
}

#[cfg(test)]
pub(crate) fn magic_sock(&self) -> &MagicSock {
&self.msock
Expand Down
Loading