Skip to content

Commit

Permalink
fix: allow dialing by peer id only
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 8, 2023
1 parent fd1be6f commit 6fb17d1
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 32 deletions.
29 changes: 20 additions & 9 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub const DEFAULT_PROVIDER_ADDR: (Ipv4Addr, u16) = crate::provider::DEFAULT_BIND
#[derive(Clone, Debug)]
pub struct Options {
/// The address to connect to
pub addr: SocketAddr,
pub addr: Option<SocketAddr>,
/// The peer id to expect
pub peer_id: Option<PeerId>,
/// Whether to log the SSL keys when `SSLKEYLOGFILE` environment variable is set.
Expand All @@ -60,7 +60,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Options {
addr: SocketAddr::from(DEFAULT_PROVIDER_ADDR),
addr: None,
peer_id: None,
keylog: false,
derp_map: None,
Expand Down Expand Up @@ -106,10 +106,11 @@ pub async fn make_client_endpoint(

/// Establishes a QUIC connection to the provided peer.
pub async fn dial_peer(opts: Options) -> Result<quinn::Connection> {
let bind_addr = match opts.addr.is_ipv6() {
true => SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0).into(),
false => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(),
let bind_addr = match opts.addr.map(|a| a.is_ipv6()) {
Some(true) => SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0).into(),
Some(false) | None => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(),
};

let (endpoint, magicsock) = make_client_endpoint(
bind_addr,
opts.peer_id,
Expand All @@ -124,13 +125,20 @@ pub async fn dial_peer(opts: Options) -> Result<quinn::Connection> {
let node_key: crate::hp::key::node::PublicKey = peer_id.into();
const DEFAULT_DERP_REGION: u16 = 1;

let mut addresses = Vec::new();
let mut endpoints = Vec::new();
// Add the provided address as a starting point.
if let Some(addr) = opts.addr {
addresses.push(addr.ip());
endpoints.push(addr);
}
magicsock
.set_network_map(netmap::NetworkMap {
peers: vec![cfg::Node {
name: None,
addresses: vec![opts.addr.ip()],
addresses,
key: node_key.clone(),
endpoints: vec![opts.addr],
endpoints,
derp: Some(SocketAddr::new(DERP_MAGIC_IP, DEFAULT_DERP_REGION)),
created: Instant::now(),
hostinfo: crate::hp::hostinfo::Hostinfo::new(),
Expand All @@ -146,7 +154,10 @@ pub async fn dial_peer(opts: Options) -> Result<quinn::Connection> {
.get_mapping_addr(&node_key)
.await
.expect("just inserted");
debug!("connecting to {}: (via {} - {})", peer_id, addr, opts.addr);
debug!(
"connecting to {}: (via {} - {:?})",
peer_id, addr, opts.addr
);
let connect = endpoint.connect(addr, "localhost")?;
let connection = connect.await.context("failed connecting to provider")?;

Expand Down Expand Up @@ -201,7 +212,7 @@ async fn dial_ticket(
let mut conn_stream = futures::stream::iter(addrs)
.map(|addr| {
let opts = Options {
addr,
addr: Some(addr),
peer_id: Some(ticket.peer()),
keylog,
derp_map: derp_map.clone(),
Expand Down
39 changes: 27 additions & 12 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,7 @@ impl Actor {
conn_sender: self.conn.actor_sender.clone(),
conn_public_key: self.conn.public_key.clone(),
public_key: None,
derp_addr: None,
});
self.peer_map.set_endpoint_for_ip_port(&meta.addr, id);

Expand Down Expand Up @@ -1114,17 +1115,32 @@ impl Actor {
.await
{
// Message was internal, do not bubble up.
debug!("processed internal disco message from {:?}", dm.src);
return None;
}

let ep_fake_wg_addr = {
self.peer_map
.endpoint_for_node_key(&dm.src)
.map(|ep| ep.fake_wg_addr)
let ep_fake_wg_addr = match self.peer_map.endpoint_for_node_key(&dm.src) {
Some(ep) => ep.fake_wg_addr,
None => {
info!(
"no peer_map state found for {:?} in: {:#?}",
dm.src, self.peer_map
);
let id = self
.peer_map
.upsert_endpoint(EndpointOptions {
conn_sender: self.conn.actor_sender.clone(),
conn_public_key: self.conn.public_key.clone(),
public_key: Some(dm.src),
derp_addr: Some(ipp),
})
.expect("just checked");
self.peer_map.set_endpoint_for_ip_port(&ipp, id);
let ep = self.peer_map.by_id_mut(&id).expect("inserted");
ep.fake_wg_addr
}
};

let ep_fake_wg_addr = ep_fake_wg_addr?;

let meta = quinn_udp::RecvMeta {
len: dm.buf.len(),
stride: dm.buf.len(),
Expand Down Expand Up @@ -2187,15 +2203,14 @@ impl Actor {
derp_node_src: Option<key::node::PublicKey>,
) -> bool {
debug!("handle_disco_message start {} - {:?}", src, derp_node_src);
let conn = self.conn.clone();
let source = disco::source_and_box(msg);
if source.is_none() {
return false;
}

let (source, sealed_box) = source.unwrap();

if conn.is_closed() {
if self.conn.is_closed() {
return true;
}

Expand Down Expand Up @@ -2282,7 +2297,9 @@ impl Actor {
// Ask each to handle it, stopping once one reports that
// the Pong's TxID was theirs.
if let Some(ep) = self.peer_map.endpoint_for_node_key_mut(&sender) {
let (_, insert) = ep.handle_pong_conn(&conn.public_key, &pong, di, src).await;
let (_, insert) = ep
.handle_pong_conn(&self.conn.public_key, &pong, di, src)
.await;
if let Some((src, key)) = insert {
self.peer_map.set_node_key_for_ip_port(&src, &key);
}
Expand Down Expand Up @@ -2525,6 +2542,7 @@ impl Actor {
conn_sender: self.conn.actor_sender.clone(),
conn_public_key: self.conn.public_key.clone(),
public_key: Some(n.key.clone()),
derp_addr: n.derp,
});
}

Expand Down Expand Up @@ -2652,9 +2670,6 @@ impl IpStream {
pconn4: RebindingUdpConn,
pconn6: Option<RebindingUdpConn>,
) -> Self {
// Init UDP receving state
let udp_state = quinn_udp::UdpState::new();

// 1480 MTU size based on default from quinn
let target_recv_buf_len = 1480 * udp_state.gro_segments() * quinn_udp::BATCH_SIZE;
let recv_buf = vec![0u8; target_recv_buf_len];
Expand Down
4 changes: 3 additions & 1 deletion src/hp/magicsock/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub(super) struct Options {
pub(super) conn_sender: flume::Sender<ActorMessage>,
pub(super) conn_public_key: key::node::PublicKey,
pub(super) public_key: Option<key::node::PublicKey>,
pub(super) derp_addr: Option<SocketAddr>,
}

impl Endpoint {
Expand All @@ -117,7 +118,7 @@ impl Endpoint {
conn_public_key: options.conn_public_key,
public_key: options.public_key,
last_full_ping: None,
derp_addr: None,
derp_addr: options.derp_addr,
best_addr: None,
best_addr_at: None,
trust_best_addr_until: None,
Expand Down Expand Up @@ -345,6 +346,7 @@ impl Endpoint {
Some(ep.clone())
})
.collect();
debug!("sending pings to {:?}", pings);

let sent_any = !pings.is_empty();
for (i, ep) in pings.into_iter().enumerate() {
Expand Down
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod tests {
content: Vec<u8>,
) -> Result<()> {
let opts = get::Options {
addr,
addr: Some(addr),
peer_id: Some(peer_id),
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -298,7 +298,7 @@ mod tests {
let addrs = provider.listen_addresses().await?;
let addr = *addrs.first().unwrap();
let opts = get::Options {
addr,
addr: Some(addr),
peer_id: Some(provider.peer_id()),
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
GetRequest::all(hash).into(),
auth_token,
get::Options {
addr: provider_addr[0],
addr: Some(provider_addr[0]),
peer_id: None,
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -459,7 +459,7 @@ mod tests {
GetRequest::all(hash).into(),
auth_token,
get::Options {
addr: provider_addr[0],
addr: Some(provider_addr[0]),
peer_id: None,
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -506,7 +506,7 @@ mod tests {
GetRequest::all(hash).into(),
auth_token,
get::Options {
addr: addr[0],
addr: Some(addr[0]),
peer_id,
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -622,7 +622,7 @@ mod tests {
let peer_id = Some(provider.peer_id());
tokio::time::timeout(Duration::from_secs(10), async move {
let connection = dial_peer(get::Options {
addr: addr[0],
addr: Some(addr[0]),
peer_id,
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -708,7 +708,7 @@ mod tests {
request,
auth_token,
get::Options {
addr: addr[0],
addr: Some(addr[0]),
peer_id,
keylog: true,
derp_map: None,
Expand Down Expand Up @@ -747,7 +747,7 @@ mod tests {
request,
auth_token,
get::Options {
addr: addr[0],
addr: Some(addr[0]),
peer_id,
keylog: true,
derp_map: None,
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ enum Commands {
#[clap(long)]
auth_token: String,
/// Address of the provider
#[clap(long, short, default_value_t = SocketAddr::from(get::DEFAULT_PROVIDER_ADDR))]
addr: SocketAddr,
#[clap(long, short)]
addr: Option<SocketAddr>,
/// Directory in which to save the file(s), defaults to writing to STDOUT
#[clap(long, short)]
out: Option<PathBuf>,
Expand Down

0 comments on commit 6fb17d1

Please sign in to comment.