Skip to content

Commit

Permalink
Get rid of MagicEndpoint::dial_peer and refactor dialing in get (#1169)
Browse files Browse the repository at this point in the history
* refactor: add keypair as explicit option for dial_peer

also document that dial_peer should not be used to establish multiple
connections.

* refactor: get rid of dial_peer footgun in MagicEndpoint

also get rid of some duplicate code paths involving the ticket

* docs: fix some doc comments

* review: rename get_options to as_get_options

* fix: make sure the token from the ticket or cli parameter is set
  • Loading branch information
rklaehn authored Jul 5, 2023
1 parent 0905155 commit 86fdbb9
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 236 deletions.
2 changes: 1 addition & 1 deletion iroh-bytes/src/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Collection {
})
}

/// Serialize this collection to a std Vec<u8>
/// Serialize this collection to a std `Vec<u8>`
pub fn to_bytes(&self) -> Result<Vec<u8>> {
Ok(postcard::to_stdvec(self)?)
}
Expand Down
94 changes: 30 additions & 64 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
//! The client side API
//!
//! The main entry point is [`run`]. This function takes callbacks that will
//! be invoked when blobs or collections are received. It is up to the caller
//! to store the received data.
//! To get data, create a connection using the `dial` function or use any quinn
//! connection that was obtained in another way.
//!
//! Create a request describing the data you want to get.
//!
//! Then create a state machine using [get_response_machine::AtInitial::new] and
//! drive it to completion by calling next on each state.
use std::error::Error;
use std::fmt::{self, Debug};
use std::net::SocketAddr;
Expand All @@ -14,6 +18,7 @@ use bao_tree::io::DecodeResponseItem;
use bao_tree::outboard::PreOrderMemOutboard;
use bao_tree::{ByteNum, ChunkNum};
use bytes::BytesMut;
use iroh_net::tls::Keypair;
use iroh_net::{hp::derp::DerpMap, tls::PeerId};
use postcard::experimental::max_size::MaxSize;
use quinn::RecvStream;
Expand All @@ -25,21 +30,22 @@ pub use crate::util::Hash;

use crate::blobs::Collection;
use crate::protocol::{write_lp, AnyGetRequest, Handshake, RangeSpecSeq};
use crate::provider::Ticket;
use crate::tokio_util::{TrackingReader, TrackingWriter};
use crate::util::pathbuf_from_name;
use crate::IROH_BLOCK_SIZE;

/// Options for the client
#[derive(Clone, Debug)]
pub struct Options {
/// The addresses to connect to.
/// The keypair of the node
pub keypair: Keypair,
/// The addresses to connect to
pub addrs: Vec<SocketAddr>,
/// The peer id to expect
/// The peer id to dial
pub peer_id: PeerId,
/// Whether to log the SSL keys when `SSLKEYLOGFILE` environment variable is set.
/// Whether to log the SSL keys when `SSLKEYLOGFILE` environment variable is set
pub keylog: bool,
/// The configuration of the derp services.
/// The configuration of the derp services
pub derp_map: Option<DerpMap>,
}

Expand All @@ -62,40 +68,6 @@ impl Stats {
}
}

/// Gets a collection and all its blobs using a [`Ticket`].
pub async fn run_ticket(
ticket: &Ticket,
request: AnyGetRequest,
keylog: bool,
derp_map: Option<DerpMap>,
) -> Result<get_response_machine::AtInitial> {
let connection = iroh_net::MagicEndpoint::dial_peer(
ticket.peer(),
&crate::P2P_ALPN,
ticket.addrs(),
derp_map,
keylog,
)
.await?;

let request = if ticket.token().is_some() && request.token().is_none() {
// we have a ticket, but no token, so we need to add the token to the request
match request {
AnyGetRequest::Get(get_request) => {
AnyGetRequest::Get(get_request.with_token(ticket.token().cloned()))
}
AnyGetRequest::CustomGet(mut custom_get_request) => {
custom_get_request.token = ticket.token().cloned();
AnyGetRequest::CustomGet(custom_get_request)
}
}
} else {
request
};

Ok(run_connection(connection, request))
}

/// Finite state machine for get responses
///
#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
Expand Down Expand Up @@ -632,28 +604,22 @@ pub mod get_response_machine {
}
}

/// Dial a peer and run a get request
pub async fn run(
request: AnyGetRequest,
opts: Options,
) -> anyhow::Result<get_response_machine::AtInitial> {
let connection = iroh_net::MagicEndpoint::dial_peer(
opts.peer_id,
&crate::P2P_ALPN,
&opts.addrs,
opts.derp_map,
opts.keylog,
)
.await?;
Ok(run_connection(connection, request))
}

/// Do a get request and return a stream of responses
pub fn run_connection(
connection: quinn::Connection,
request: AnyGetRequest,
) -> get_response_machine::AtInitial {
get_response_machine::AtInitial::new(connection, request)
/// Create a new endpoint and dial a peer, returning the connection
///
/// Note that this will create an entirely new endpoint, so it should be only
/// used for short lived connections. If you want to connect to multiple peers,
/// it is preferable to create an endpoint and use `connect` on the endpoint.
pub async fn dial(opts: Options) -> anyhow::Result<quinn::Connection> {
let endpoint = iroh_net::MagicEndpoint::builder()
.keypair(opts.keypair)
.derp_map(opts.derp_map)
.keylog(opts.keylog)
.bind(0)
.await?;
endpoint
.connect(opts.peer_id, &crate::P2P_ALPN, &opts.addrs)
.await
.context("failed to connect to provider")
}

/// Error when processing a response
Expand Down
10 changes: 10 additions & 0 deletions iroh-bytes/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,22 @@ pub enum Request {
}

impl Request {
/// Gets the request token.
pub fn token(&self) -> Option<&RequestToken> {
match self {
Request::Get(get) => get.token(),
Request::CustomGet(get) => get.token.as_ref(),
}
}

/// Sets the request token and returns a new request.
pub fn with_token(mut self, value: Option<RequestToken>) -> Self {
match &mut self {
Request::Get(get) => get.token = value,
Request::CustomGet(get) => get.token = value,
}
self
}
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
Expand Down
14 changes: 12 additions & 2 deletions iroh-bytes/src/provider/ticket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{ensure, Result};
use iroh_net::tls::PeerId;
use iroh_net::tls::{Keypair, PeerId};
use serde::{Deserialize, Serialize};

use crate::protocol::RequestToken;
use crate::Hash;
use crate::{get, Hash};

/// A token containing everything to get a file from the provider.
///
Expand Down Expand Up @@ -92,6 +92,16 @@ impl Ticket {
} = self;
(hash, peer, addrs, token)
}

pub fn as_get_options(&self, keypair: Keypair) -> get::Options {
get::Options {
peer_id: self.peer,
addrs: self.addrs.clone(),
keypair,
keylog: true,
derp_map: None,
}
}
}

/// Serializes to base32.
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/src/hp/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ impl Client {
/// The *stun_conn4* and *stun_conn6* endpoints are bound UDP sockets to use to send out
/// STUN packets. This function **will not read from the sockets**, as they may be
/// receiving other traffic as well, normally they are the sockets carrying the real
/// traffic. Thus all stun packets received on those sockets should be passed to
/// [`Client::get_msg_sender`] in order for this function to receive the stun
/// traffic. Thus all stun packets received on those sockets should be passed to
/// [`Client::receive_stun_packet`] in order for this function to receive the stun
/// responses and function correctly.
///
/// If these are not passed in this will bind sockets for STUN itself, though results
Expand Down
29 changes: 0 additions & 29 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,6 @@ impl MagicEndpoint {
MagicEndpointBuilder::default()
}

/// Connect to a remote endpoint, creating an endpoint on the fly.
///
/// The PeerId and the ALPN protocol are required. If you happen to know dialable addresses of
/// the remote endpoint, they can be specified and will be used to try and establish a direct
/// connection without involving a DERP server. If no addresses are specified, the endpoint
/// will try to dial the peer through the configured DERP servers.
///
/// If *derp_map* is set, these DERP servers are used to discover the dialed peer by its
/// [`PeerId`], help establish the connection being an initial relay for traffic and assist in
/// holepunching.
///
/// If *keylog* is `true` and the KEYLOGFILE environment variable is present it will be
/// considered a filename to which the TLS pre-master keys are logged. This can be useful
/// to be able to decrypt captured traffic for debugging purposes.
pub async fn dial_peer(
peer_id: PeerId,
alpn_protocol: &[u8],
known_addrs: &[SocketAddr],
derp_map: Option<DerpMap>,
keylog: bool,
) -> anyhow::Result<quinn::Connection> {
let endpoint =
MagicEndpoint::bind(Keypair::generate(), 0, None, derp_map, None, keylog).await?;
endpoint
.connect(peer_id, alpn_protocol, known_addrs)
.await
.context("failed to connect to provider")
}

/// Create a quinn endpoint backed by a magicsock.
///
/// This is for internal use, the public interface is the [MagicEndpointBuilder] obtained from
Expand Down
3 changes: 2 additions & 1 deletion iroh/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{net::SocketAddr, path::PathBuf};
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use iroh_bytes::{cid::Blake3Cid, protocol::RequestToken, provider::Ticket, runtime};
use iroh_net::tls::PeerId;
use iroh_net::tls::{Keypair, PeerId};
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::RpcClient;

Expand Down Expand Up @@ -73,6 +73,7 @@ impl Cli {
peer_id: peer,
keylog: self.keylog,
derp_map: config.derp_map(),
keypair: Keypair::generate(),
},
token,
single,
Expand Down
55 changes: 26 additions & 29 deletions iroh/src/commands/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use iroh_bytes::{
cid::Blake3Cid,
get::{
self,
get_response_machine::{ConnectedNext, EndBlobNext},
get_response_machine::{self, ConnectedNext, EndBlobNext},
},
protocol::{GetRequest, RangeSpecSeq, Request, RequestToken},
provider::Ticket,
Expand All @@ -19,7 +19,7 @@ use iroh_bytes::{
Hash,
};
use iroh_io::{AsyncSliceWriter, FileAdapter};
use iroh_net::hp::derp::DerpMap;
use iroh_net::{hp::derp::DerpMap, tls::Keypair};
use range_collections::RangeSet2;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -66,6 +66,21 @@ impl GetInteractive {
}
}

fn get_options(&self) -> get::Options {
match self {
GetInteractive::Ticket {
ticket,
keylog,
derp_map,
} => get::Options {
keylog: *keylog,
derp_map: derp_map.clone(),
..ticket.as_get_options(Keypair::generate())
},
GetInteractive::Hash { opts, .. } => opts.clone(),
}
}

fn new_request(&self, query: RangeSpecSeq) -> Request {
GetRequest::new(self.hash(), query)
.with_token(self.token().cloned())
Expand Down Expand Up @@ -95,15 +110,9 @@ impl GetInteractive {
// collection info, in case we won't get a callback with is_root
let collection_info = Some((1, 0));

let request = self.new_request(query);
let response = match self {
GetInteractive::Ticket {
ticket,
keylog,
derp_map,
} => get::run_ticket(&ticket, request, keylog, derp_map).await?,
GetInteractive::Hash { opts, .. } => get::run(request, opts).await?,
};
let request = self.new_request(query).with_token(self.token().cloned());
let connection = get::dial(self.get_options()).await?;
let response = get_response_machine::AtInitial::new(connection, request);
let connected = response.next().await?;
write(format!("{} Requesting ...", style("[2/3]").bold().dim()));
if let Some((count, missing_bytes)) = collection_info {
Expand Down Expand Up @@ -189,15 +198,9 @@ impl GetInteractive {
Some((collection.len() as u64, 0))
};

let request = self.new_request(query);
let response = match self {
GetInteractive::Ticket {
ticket,
keylog,
derp_map,
} => get::run_ticket(&ticket, request, keylog, derp_map).await?,
GetInteractive::Hash { opts, .. } => get::run(request, opts).await?,
};
let request = self.new_request(query).with_token(self.token().cloned());
let connection = get::dial(self.get_options()).await?;
let response = get_response_machine::AtInitial::new(connection, request);
let connected = response.next().await?;
write(format!("{} Requesting ...", style("[2/3]").bold().dim()));
if let Some((count, missing_bytes)) = collection_info {
Expand Down Expand Up @@ -365,15 +368,9 @@ impl GetInteractive {
};

let pb = make_download_pb();
let request = self.new_request(query);
let response = match self {
GetInteractive::Ticket {
ticket,
keylog,
derp_map,
} => get::run_ticket(&ticket, request, keylog, derp_map).await?,
GetInteractive::Hash { opts, .. } => get::run(request, opts).await?,
};
let request = self.new_request(query).with_token(self.token().cloned());
let connection = get::dial(self.get_options()).await?;
let response = get_response_machine::AtInitial::new(connection, request);
let connected = response.next().await?;
write(format!("{} Requesting ...", style("[2/3]").bold().dim()));
let ConnectedNext::StartRoot(curr) = connected.next().await? else {
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const MAX_STREAMS: u64 = 10;
const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak https://simple.wikipedia.org/wiki/Leet
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
pub const DEFAULT_BIND_ADDR: (Ipv4Addr, u16) = (Ipv4Addr::LOCALHOST, 11204);

/// How long we wait at most for some endpoints to be discovered.
Expand Down
Loading

0 comments on commit 86fdbb9

Please sign in to comment.