From 701e9b7c6ff57037cd3bb88a9f7e037f5ddf6b87 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Tue, 4 Jul 2023 11:16:42 -0500 Subject: [PATCH] feat(iroh-net): Upnp port mapping (#1117) * add igd * add log, use types for readability * request mapping * fix code placement + comments * filling in the gaps * some cleanup and refactor * debugging * debug cleanning * merge cleanup * easy doctor appointment * debug cleanning * use non zero to ensure port to map is positive * spelling * follow iroh code style * log when probe fails * update comment * more logs, use localhost * add portmap metrics to iroh-metrics * use portmap metrics * add client * wip: closing for the day * wip * more wip * connecting stuff * coding with the heart lol * fixes * customize timeout of portmapping doctor command * on_change for portmap * improve docs and logs * add mapping struct that handles maintenance * add waker * improve debug logs * connect with mapping thing * partially connect current mapping stream * simplify mapping handling and add a sanity test * remove hardcoded mapping half life * remove unused field * ensure external port is non zero * simplify test types, reduce test timeout * add external addr fn to mapping * rework getting a new upnp mapping to reuse a known gateway and or known external port * connect expiry and renew events * split client functions * update doctor command * add service channel capacity * remove unnecesary change * leverage derive_more and the utils mod * remove yet one more unnecesary clone * Revert "remove yet one more unnecesary clone" might be controversial since technically the probe starts before the report This reverts commit f5582460af23f00915057a92e3068ea81e84db86. * misc spelling * misc docs * cleanup * more cleanup * simplify logic updating a probe * appease clippy * expand comment * expand metrics * self review * improve doctor docs * unflatten portmap probe * get interfaces * get interfaces using dep already here --- Cargo.lock | 98 +++++ iroh-metrics/src/core.rs | 12 +- iroh-metrics/src/lib.rs | 1 + iroh-metrics/src/portmap.rs | 21 + iroh-net/Cargo.toml | 5 +- iroh-net/src/hp/cfg.rs | 16 +- iroh-net/src/hp/magicsock/conn.rs | 59 +-- iroh-net/src/hp/netcheck.rs | 102 ++--- iroh-net/src/hp/portmapper.rs | 546 ++++++++++++++++++++++++-- iroh-net/src/hp/portmapper/mapping.rs | 233 +++++++++++ iroh-net/src/hp/portmapper/upnp.rs | 140 +++++++ iroh-net/src/util.rs | 49 ++- iroh/src/commands/doctor.rs | 38 +- 13 files changed, 1172 insertions(+), 148 deletions(-) create mode 100644 iroh-metrics/src/portmap.rs create mode 100644 iroh-net/src/hp/portmapper/mapping.rs create mode 100644 iroh-net/src/hp/portmapper/upnp.rs diff --git a/Cargo.lock b/Cargo.lock index 4a1226aca1..ba23b759e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,6 +188,18 @@ version = "1.1.1" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" +[[package]] +name = "attohttpc" +version = "0.16.3" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "fdb8867f378f33f78a811a8eb9bf108ad99430d7aad43315dd9319c827ef6247" +dependencies = [ + "http", + "log", + "url", + "wildmatch", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -1496,6 +1508,24 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "igd" +version = "0.12.1" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "556b5a75cd4adb7c4ea21c64af1c48cefb2ce7d43dc4352c720a1fe47c21f355" +dependencies = [ + "attohttpc", + "bytes", + "futures", + "http", + "hyper", + "log", + "rand", + "tokio", + "url", + "xmltree", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -1736,9 +1766,11 @@ dependencies = [ "hostname", "http", "hyper", + "igd", "iroh-metrics", "libc", "netlink-packet-route", + "ntest", "once_cell", "os_info", "postcard", @@ -1764,6 +1796,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-rustls-acme", + "tokio-stream", "tokio-util", "toml 0.7.5", "tracing", @@ -2095,6 +2128,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "ntest" +version = "0.9.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "da8ec6d2b73d45307e926f5af46809768581044384637af6b3f3fe7c3c88f512" +dependencies = [ + "ntest_test_cases", + "ntest_timeout", +] + +[[package]] +name = "ntest_test_cases" +version = "0.9.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "be7d33be719c6f4d09e64e27c1ef4e73485dc4cc1f4d22201f89860a7fe22e22" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ntest_timeout" +version = "0.9.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "066b468120587a402f0b47d8f80035c921f6a46f8209efd0632a89a16f5188a4" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2561,6 +2627,16 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3801,6 +3877,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -4316,6 +4393,12 @@ version = "1.0.2" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8" +[[package]] +name = "wildmatch" +version = "1.1.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "7f44b95f62d34113cf558c93511ac93027e03e9c29a60dd0fd70e6e025c7270a" + [[package]] name = "winapi" version = "0.3.9" @@ -4586,6 +4669,21 @@ dependencies = [ "time", ] +[[package]] +name = "xml-rs" +version = "0.8.14" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "52839dc911083a8ef63efa4d039d1f58b5e409f923e44c80828f206f66e5541c" + +[[package]] +name = "xmltree" +version = "0.10.3" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "d7d8a75eaf6557bb84a65ace8609883db44a29951042ada9b393151532e41fcb" +dependencies = [ + "xml-rs", +] + [[package]] name = "yasna" version = "0.5.2" diff --git a/iroh-metrics/src/core.rs b/iroh-metrics/src/core.rs index fe7739dbab..46bcf297d4 100644 --- a/iroh-metrics/src/core.rs +++ b/iroh-metrics/src/core.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use once_cell::sync::Lazy; use prometheus_client::{encoding::text::encode, registry::Registry}; -use crate::{iroh, magicsock, netcheck}; +use crate::{iroh, magicsock, netcheck, portmap}; pub static CORE: Lazy = Lazy::new(Core::default); @@ -14,6 +14,7 @@ pub struct Core { iroh_metrics: iroh::Metrics, magicsock_metrics: magicsock::Metrics, netcheck_metrics: netcheck::Metrics, + portmap_metrics: portmap::Metrics, } impl Default for Core { @@ -24,6 +25,7 @@ impl Default for Core { iroh_metrics: iroh::Metrics::new(&mut reg), magicsock_metrics: magicsock::Metrics::new(&mut reg), netcheck_metrics: netcheck::Metrics::new(&mut reg), + portmap_metrics: portmap::Metrics::new(&mut reg), registry: reg, } } @@ -46,6 +48,10 @@ impl Core { &self.netcheck_metrics } + pub fn portmap_metrics(&self) -> &portmap::Metrics { + &self.portmap_metrics + } + pub(crate) fn encode(&self) -> Result { let mut buf = String::new(); encode(&mut buf, self.registry())?; @@ -122,6 +128,7 @@ where Collector::Iroh => CORE.iroh_metrics().record(m, v), Collector::Magicsock => CORE.magicsock_metrics().record(m, v), Collector::Netcheck => CORE.netcheck_metrics().record(m, v), + Collector::Portmap => CORE.portmap_metrics().record(m, v), }; } } @@ -140,6 +147,7 @@ where Collector::Iroh => CORE.iroh_metrics().observe(m, v), Collector::Magicsock => CORE.magicsock_metrics().observe(m, v), Collector::Netcheck => CORE.netcheck_metrics().observe(m, v), + Collector::Portmap => CORE.portmap_metrics().observe(m, v), }; } } @@ -154,4 +162,6 @@ pub enum Collector { Magicsock, /// Netcheck related metrics. Netcheck, + /// Portmap related metrics. + Portmap, } diff --git a/iroh-metrics/src/lib.rs b/iroh-metrics/src/lib.rs index e084b381b3..51c3d58415 100644 --- a/iroh-metrics/src/lib.rs +++ b/iroh-metrics/src/lib.rs @@ -11,3 +11,4 @@ mod service; pub mod iroh; pub mod magicsock; pub mod netcheck; +pub mod portmap; diff --git a/iroh-metrics/src/portmap.rs b/iroh-metrics/src/portmap.rs new file mode 100644 index 0000000000..eb0c4a61c7 --- /dev/null +++ b/iroh-metrics/src/portmap.rs @@ -0,0 +1,21 @@ +super::make_metric_recorders! { + Portmap, + + /* + * General port mapping metrics + */ + ProbesStarted: Counter: "Number of probing tasks started.", + LocalPortUpdates: Counter: "Number of updates to the local port.", + MappingAttempts: Counter: "Number of mapping tasks started.", + MappingFailures: Counter: "Number of failed mapping tasks", + ExternalAddressUpdated: Counter: "Number of times the external address obtained via port mapping was updated.", + + /* + * UPnP metrics + */ + UpnpProbes: Counter: "Number of UPnP probes executed.", + UpnpProbesFailed: Counter: "Number of failed Upnp probes", + UpnpAvailable: Counter: "Number of UPnP probes that found it available.", + UpnpGatewayUpdated: Counter: "Number of UPnP probes that resulted in a gateway different to the previous one.", + +} diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 66e06b9057..c22987e651 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -20,7 +20,7 @@ curve25519-dalek = "=4.0.0-rc.3" default-net = "0.15" data-encoding = "2.3.3" der = { version = "0.7", features = ["alloc", "derive"] } -derive_more = { version = "0.99.17", git = "/~https://github.com/JelteF/derive_more", features = ["debug", "display", "from", "try_into"] } +derive_more = { version = "0.99.17", git = "/~https://github.com/JelteF/derive_more", features = ["debug", "display", "from", "try_into", "deref"] } ed25519-dalek = { version = "=2.0.0-rc.3", features = ["serde", "rand_core"] } flume = "0.10.14" futures = "0.3.25" @@ -29,6 +29,7 @@ hostname = "0.3.1" http = "0.2.9" hyper = { version = "0.14.25", features = ["server", "client", "http1", "tcp"] } governor = "0.5.1" +igd = { version = "0.12.1", features = ["aio"] } libc = "0.2.139" once_cell = "1.17.0" os_info = "3.6.0" @@ -52,6 +53,7 @@ tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs"] } tokio-util = { version = "0.7", features = ["io-util", "io"] } tokio-rustls = { version = "0.24" } tokio-rustls-acme = { version = "0.1" } +tokio-stream = { version = "0.1", features = ["sync"]} url = { version = "2.4", features = ["serde"] } webpki = { version = "0.22", features = ["std"] } webpki-roots = "0.23.0" @@ -83,6 +85,7 @@ wmi = "0.13" clap = { version = "4", features = ["derive"] } tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs", "macros", "time", "test-util"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } +ntest = "*" [build-dependencies] duct = "0.13.6" diff --git a/iroh-net/src/hp/cfg.rs b/iroh-net/src/hp/cfg.rs index d564e0ec02..4323569976 100644 --- a/iroh-net/src/hp/cfg.rs +++ b/iroh-net/src/hp/cfg.rs @@ -6,7 +6,7 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, }; -use super::key; +use super::{key, portmapper}; /// Fake WireGuard endpoint IP address that means to /// use DERP. When used (in the Node.DERP field), the port number of @@ -68,14 +68,8 @@ pub struct NetInfo { /// Whether we have an existing portmap open (UPnP, PMP, or PCP). pub have_port_map: bool, - /// Whether UPnP appears present on the LAN. Empty means not checked. - pub upnp: Option, - - /// Whether NAT-PMP appears present on the LAN. Empty means not checked. - pub pmp: Option, - - /// Whether PCP appears present on the LAN. Empty means not checked. - pub pcp: Option, + /// Probe indicating the presence of port mapping protocols on the LAN. + pub portmap_probe: Option, /// This node's preferred DERP server for incoming traffic. The node might be be temporarily /// connected to multiple DERP servers (to send to other nodes) @@ -104,9 +98,7 @@ impl NetInfo { && self.working_udp == other.working_udp && self.working_icm_pv4 == other.working_icm_pv4 && self.have_port_map == other.have_port_map - && self.upnp == other.upnp - && self.pmp == other.pmp - && self.pcp == other.pcp + && self.portmap_probe == other.portmap_probe && self.preferred_derp == other.preferred_derp && self.link_type == other.link_type } diff --git a/iroh-net/src/hp/magicsock/conn.rs b/iroh-net/src/hp/magicsock/conn.rs index f2a9774599..def7af7195 100644 --- a/iroh-net/src/hp/magicsock/conn.rs +++ b/iroh-net/src/hp/magicsock/conn.rs @@ -261,7 +261,8 @@ impl Conn { "magic-{}", hex::encode(&opts.private_key.public_key().as_ref()[..8]) ); - let port_mapper = portmapper::Client::new(); // TODO: pass self.on_port_map_changed + + let port_mapper = portmapper::Client::new().await; let Options { port, @@ -278,8 +279,14 @@ impl Conn { let (pconn4, pconn6) = bind(port).await?; let port = pconn4.port(); - port_mapper.set_local_port(port).await; + // NOTE: we can end up with a zero port if `std::net::UdpSocket::socket_addr` fails + match port.try_into() { + Ok(non_zero_port) => { + port_mapper.update_local_port(non_zero_port); + } + Err(_zero_port) => debug!("Skipping port mapping with zero local port"), + } let ipv4_addr = pconn4.local_addr()?; let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok()); @@ -517,11 +524,6 @@ impl Conn { Ok(()) } - #[instrument(skip_all, fields(self.name = %self.name))] - async fn on_port_map_changed(&self) { - self.re_stun("portmap-changed").await; - } - /// Closes and re-binds the UDP sockets and resets the DERP connection. /// It should be followed by a call to ReSTUN. #[instrument(skip_all, fields(self.name = %self.name))] @@ -833,6 +835,7 @@ impl Actor { HEARTBEAT_INTERVAL, ); let mut endpoints_update_receiver = self.endpoints_update_state.running.subscribe(); + let mut portmap_watcher = self.port_mapper.watch_external_address(); loop { tokio::select! { @@ -871,6 +874,12 @@ impl Actor { trace!("tick: re_stun {:?}", tick); self.re_stun("periodic").await; } + Ok(()) = portmap_watcher.changed() => { + trace!("tick: portmap changed"); + let new_external_address = *portmap_watcher.borrow(); + debug!("external address updated: {new_external_address:?}"); + self.re_stun("portmap_updated").await; + }, _ = endpoint_heartbeat_timer.tick() => { trace!("tick: endpoint heartbeat {} endpoints", self.peer_map.node_count()); // TODO: this might trigger too many packets at once, pace this @@ -921,7 +930,7 @@ impl Actor { for (_, ep) in self.peer_map.endpoints_mut() { ep.stop_and_reset(); } - self.port_mapper.close(); + self.port_mapper.deactivate(); self.derp_actor_sender .send(DerpActorMessage::Shutdown) .await @@ -1283,10 +1292,8 @@ impl Actor { /// to determine its public address. #[instrument(skip_all, fields(self.name = %self.conn.name))] async fn determine_endpoints(&mut self) -> Result> { - let mut portmap_ext = self - .port_mapper - .get_cached_mapping_or_start_creating_one() - .await; + self.port_mapper.procure_mapping(); + let portmap_watcher = self.port_mapper.watch_external_address(); let nr = self.update_net_info().await.context("update_net_info")?; // endpoint -> how it was found @@ -1307,14 +1314,9 @@ impl Actor { }; } - // If we didn't have a portmap earlier, maybe it's done by now. - if portmap_ext.is_none() { - portmap_ext = self - .port_mapper - .get_cached_mapping_or_start_creating_one() - .await; - } - if let Some(portmap_ext) = portmap_ext { + let maybe_port_mapped = *portmap_watcher.borrow(); + + if let Some(portmap_ext) = maybe_port_mapped.map(SocketAddr::V4) { add_addr!(already, eps, portmap_ext, cfg::EndpointType::Portmapped); self.set_net_info_have_port_map().await; } @@ -1506,14 +1508,13 @@ impl Actor { ); self.no_v4_send = !r.ipv4_can_send; + let have_port_map = self.port_mapper.watch_external_address().borrow().is_some(); let mut ni = cfg::NetInfo { derp_latency: Default::default(), mapping_varies_by_dest_ip: r.mapping_varies_by_dest_ip, hair_pinning: r.hair_pinning, - upnp: r.upnp, - pmp: r.pmp, - pcp: r.pcp, - have_port_map: self.port_mapper.have_mapping(), + portmap_probe: r.portmap_probe.clone(), + have_port_map, working_ipv6: Some(r.ipv6), os_has_ipv6: Some(r.os_has_ipv6), working_udp: Some(r.udp), @@ -1756,8 +1757,14 @@ impl Actor { .context("rebind IPv4 failed")?; // reread, as it might have changed - let port = self.local_port_v4(); - self.port_mapper.set_local_port(port).await; + // we can end up with a zero port if std::net::UdpSocket::socket_addr fails + match self.local_port_v4().try_into() { + Ok(non_zero_port) => self.port_mapper.update_local_port(non_zero_port), + Err(_zero_port) => { + // since the local port might still be the same, don't deactivate port mapping + debug!("Skipping port mapping on rebind with zero local port"); + } + } let ipv4_addr = self.pconn4.local_addr()?; *self.conn.local_addrs.write().unwrap() = (ipv4_addr, ipv6_addr); diff --git a/iroh-net/src/hp/netcheck.rs b/iroh-net/src/hp/netcheck.rs index 7fc3f3c27c..81e7c1991e 100644 --- a/iroh-net/src/hp/netcheck.rs +++ b/iroh-net/src/hp/netcheck.rs @@ -7,14 +7,13 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, sync::Arc, - task::{Context, Poll}, }; use anyhow::{anyhow, bail, ensure, Context as _, Result}; use bytes::Bytes; use futures::{ stream::{FuturesUnordered, StreamExt}, - Future, FutureExt, + FutureExt, }; use iroh_metrics::{inc, netcheck::NetcheckMetrics}; use rand::seq::IteratorRandom; @@ -27,7 +26,10 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument}; -use crate::net::{interfaces, ip::to_canonical}; +use crate::{ + net::{interfaces, ip::to_canonical}, + util::MaybeFuture, +}; use self::probe::{Probe, ProbePlan, ProbeProto}; @@ -89,15 +91,8 @@ pub struct Report { /// Whether the router supports communicating between two local devices through the NATted /// public IP address (on IPv4). pub hair_pinning: Option, - /// Whether UPnP appears present on the LAN. - /// None means not checked. - pub upnp: Option, - /// Whether NAT-PMP appears present on the LAN. - /// None means not checked. - pub pmp: Option, - /// Whether PCP appears present on the LAN. - /// None means not checked. - pub pcp: Option, + /// Probe indicating the presence of port mapping protocols on the LAN. + pub portmap_probe: Option, /// or 0 for unknown pub preferred_derp: u16, /// keyed by DERP Region ID @@ -115,13 +110,6 @@ pub struct Report { pub captive_portal: Option, } -impl Report { - /// Reports whether any of UPnP, PMP, or PCP are non-empty. - pub fn any_port_mapping_checked(&self) -> bool { - self.upnp.is_some() || self.pmp.is_some() || self.pcp.is_some() - } -} - impl fmt::Display for Report { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self, f) @@ -497,7 +485,6 @@ struct ReportState { /// Doing a lite, follow-up netcheck incremental: bool, stop_probe: Arc, - wait_port_map: wg::AsyncWaitGroup, /// The report which will be returned. report: Arc>, got_ep4: Option, @@ -525,15 +512,18 @@ impl ReportState { debug!(port_mapper = %port_mapper.is_some(), %skip_external_network, "running report"); self.report.write().await.os_has_ipv6 = os_has_ipv6().await; - let mut port_mapping = MaybeFuture::default(); + let mut portmap_probe = MaybeFuture::default(); if !skip_external_network { - if let Some(ref port_mapper) = port_mapper { - let port_mapper = port_mapper.clone(); - port_mapping.inner = Some(Box::pin(async move { + if let Some(port_mapper) = port_mapper { + portmap_probe.inner = Some(Box::pin(async move { match port_mapper.probe().await { - Ok(res) => Some((res.upnp, res.pmp, res.pcp)), - Err(err) => { - warn!("skipping port mapping: {:?}", err); + Ok(Ok(res)) => Some(res), + Ok(Err(err)) => { + warn!("skipping port mapping: {err:?}"); + None + } + Err(recv_err) => { + warn!("skipping port mapping: {recv_err}"); None } } @@ -639,21 +629,10 @@ impl ReportState { debug!("STUN timer expired"); break; }, - pm = &mut port_mapping => { + pm = &mut portmap_probe => { let mut report = self.report.write().await; - match pm { - Some((upnp, pmp, pcp)) => { - report.upnp = Some(upnp); - report.pmp = Some(pmp); - report.pcp = Some(pcp); - } - None => { - report.upnp = None; - report.pmp = None; - report.pcp = None; - } - } - port_mapping.inner = None; + report.portmap_probe = pm; + portmap_probe.inner = None; } probe_report = probes.next() => { match probe_report { @@ -718,8 +697,10 @@ impl ReportState { self.report.write().await.hair_pinning = Some(hair_pin); } - if !skip_external_network && port_mapper.is_some() { - self.wait_port_map.wait().await; + if portmap_probe.inner.is_some() { + let probe_result = portmap_probe.await; + let mut report = self.report.write().await; + report.portmap_probe = probe_result; debug!("port_map done"); } @@ -1420,7 +1401,6 @@ impl Actor { pc4_hair: Arc::new(pc4_hair), hair_timeout: None, stop_probe: Arc::new(sync::Notify::new()), - wait_port_map: wg::AsyncWaitGroup::new(), report: Default::default(), got_ep4: None, timers: Default::default(), @@ -1596,11 +1576,8 @@ impl Actor { } log += &format!(" mapvarydest={:?}", r.mapping_varies_by_dest_ip); log += &format!(" hair={:?}", r.hair_pinning); - if r.any_port_mapping_checked() { - log += &format!( - " portmap={{ UPnP: {:?}, PMP: {:?}, PCP: {:?} }}", - r.upnp, r.pmp, r.pcp - ); + if let Some(probe) = &r.portmap_probe { + log += &format!(" {}", probe); } else { log += " portmap=?"; } @@ -1712,29 +1689,6 @@ pub(crate) async fn os_has_ipv6() -> bool { udp.is_ok() } -/// Resolves to pending if the inner is `None`. -#[derive(Debug)] -struct MaybeFuture { - inner: Option, -} - -impl Default for MaybeFuture { - fn default() -> Self { - MaybeFuture { inner: None } - } -} - -impl Future for MaybeFuture { - type Output = T::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.inner { - Some(ref mut t) => Pin::new(t).poll(cx), - None => Poll::Pending, - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -1890,9 +1844,7 @@ mod tests { let r = client.get_report(dm, None, None).await?; let mut r: Report = (*r).clone(); - r.upnp = None; - r.pmp = None; - r.pcp = None; + r.portmap_probe = None; let want = Report { // The ip_v4_can_send flag gets set differently across platforms. diff --git a/iroh-net/src/hp/portmapper.rs b/iroh-net/src/hp/portmapper.rs index 3f46f32855..a635339322 100644 --- a/iroh-net/src/hp/portmapper.rs +++ b/iroh-net/src/hp/portmapper.rs @@ -1,54 +1,538 @@ -use std::net::SocketAddr; +//! Port mapping client and service. -use anyhow::{bail, Error}; +use std::{ + net::SocketAddrV4, + num::NonZeroU16, + time::{Duration, Instant}, +}; -#[derive(Debug, Clone)] -pub struct PortMapper {} +use anyhow::{anyhow, Result}; +use futures::StreamExt; +use tokio::sync::{mpsc, oneshot, watch}; +use tracing::{debug, trace}; -#[derive(Debug, Clone)] -pub struct ProbeResult { +use iroh_metrics::{inc, portmap::PortmapMetrics as Metrics}; + +use crate::util; + +use mapping::CurrentMapping; + +mod mapping; +mod upnp; + +/// If a port mapping service has been seen within the last [`AVAILABILITY_TRUST_DURATION`] it will +/// not be probed again. +const AVAILABILITY_TRUST_DURATION: Duration = Duration::from_secs(60 * 10); // 10 minutes + +/// Capacity of the channel to communicate with the long-running service. +const SERVICE_CHANNEL_CAPACITY: usize = 32; // should be plenty + +#[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] +#[display("portmap={{ UPnP: {upnp}, PMP: {pmp}, PCP: {pcp} }}")] +pub struct ProbeOutput { + /// If UPnP can be considered available. + pub upnp: bool, + /// If PCP can be considered available. pub pcp: bool, + /// If PMP can be considered available. pub pmp: bool, - pub upnp: bool, } -/// A port mapping client. -#[derive(Default, Debug, Clone)] -pub struct Client {} +impl ProbeOutput { + pub fn all_available(&self) -> bool { + self.upnp && self.pcp && self.pmp + } +} + +#[derive(derive_more::Debug)] +enum Message { + /// Attempt to get a mapping if the local port is set but there is no mapping. + ProcureMapping, + /// Request to update the local port. + /// + /// The resulting external address can be obtained subscribing using + /// [`Client::watch_external_address`]. + /// A value of `None` will deactivate port mapping. + UpdateLocalPort { local_port: Option }, + /// Request to probe the port mapping protocols. + /// + /// The requester should wait for the result at the [`oneshot::Receiver`] counterpart of the + /// [`oneshot::Sender`]. + Probe { + /// Sender side to communicate the result of the probe. + #[debug("_")] + result_tx: oneshot::Sender>, + }, +} + +/// Port mapping client. +#[derive(Debug, Clone)] +pub struct Client { + /// A watcher over the most recent external address obtained from port mapping. + /// + /// See [`watch::Receiver`]. + port_mapping: watch::Receiver>, + /// Channel used to communicate with the port mapping service. + service_tx: mpsc::Sender, + /// A handle to the service that will cancel the spawned task once the client is dropped. + _service_handle: std::sync::Arc, +} impl Client { - pub fn new() -> Self { - Self::default() + /// Create a new port mapping client. + pub async fn new() -> Self { + let (service_tx, service_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); + + let (service, watcher) = Service::new(service_rx); + + let handle = util::CancelOnDrop::new( + "portmap_service", + tokio::spawn(async move { service.run().await }).abort_handle(), + ); + + Client { + port_mapping: watcher, + service_tx, + _service_handle: std::sync::Arc::new(handle), + } + } + + /// Request a probe to the port mapping protocols. + /// + /// Returns the [`oneshot::Receiver`] used to obtain the result of the probe. + pub fn probe(&self) -> oneshot::Receiver> { + let (result_tx, result_rx) = oneshot::channel(); + + if let Err(e) = self.service_tx.try_send(Message::Probe { result_tx }) { + use mpsc::error::TrySendError::*; + + // recover the sender and return the error there + let (result_tx, e) = match e { + Full(Message::Probe { result_tx }) => (result_tx, "Port mapping channel full"), + Closed(Message::Probe { result_tx }) => (result_tx, "Port mapping channel closed"), + Full(_) | Closed(_) => unreachable!("Sent value is a probe."), + }; + + // sender was just created. If it's dropped we have two send error and are likely + // shutting down + // NOTE: second Err is infallible match due to being the sent value + if let Err(Err(e)) = result_tx.send(Err(e.into())) { + trace!("Failed to request probe: {e}") + } + } + result_rx } - pub async fn probe(&self) -> Result { - bail!("not implemented yet") + /// Try to get a mapping for the last local port if there isn't one already. + pub fn procure_mapping(&self) { + // requester can't really do anything with this error if returned, so we log it + if let Err(e) = self.service_tx.try_send(Message::ProcureMapping) { + trace!("Failed to request mapping {e}") + } } - /// Updates the local port number to which we want to port map UDP traffic. - pub async fn set_local_port(&self, _local_port: u16) { - // TODO: + /// Update the local port. + /// + /// If the port changes, this will trigger a port mapping attempt. + pub fn update_local_port(&self, local_port: NonZeroU16) { + let local_port = Some(local_port); + // requester can't really do anything with this error if returned, so we log it + if let Err(e) = self + .service_tx + .try_send(Message::UpdateLocalPort { local_port }) + { + trace!("Failed to update local port {e}") + } } - /// Quickly returns with our current cached portmapping, if any. - /// If there's not one, it starts up a background goroutine to create one. - /// If the background goroutine ends up creating one, the `on_change` hook registered with the - /// `Client::new` constructor (if any) will fire. - pub async fn get_cached_mapping_or_start_creating_one(&self) -> Option { - // TODO: - None + /// Deactivate port mapping. + pub fn deactivate(&self) { + // requester can't really do anything with this error if returned, so we log it + if let Err(e) = self + .service_tx + .try_send(Message::UpdateLocalPort { local_port: None }) + { + trace!("Failed to deactivate port mapping {e}") + } } - pub fn have_mapping(&self) -> bool { - // TODO: - false + /// Watch the external address for changes in the mappings. + pub fn watch_external_address(&self) -> watch::Receiver> { + self.port_mapping.clone() + } +} + +/// Port mapping protocol information obtained during a probe. +#[derive(Debug, Default)] +struct Probe { + /// The last [`igd::aio::Gateway`] and when was it last seen. + last_upnp_gateway_addr: Option<(upnp::Gateway, Instant)>, + // TODO(@divma): PCP placeholder. + last_pcp: Option, + // TODO(@divma): PMP placeholder. + last_pmp: Option, +} + +impl Probe { + /// Create a new probe based on a previous output. + async fn new(output: ProbeOutput) -> Probe { + let ProbeOutput { + upnp, + pcp: _, + pmp: _, + } = output; + let mut upnp_probing_task = util::MaybeFuture { + inner: (!upnp).then(|| { + Box::pin(async { + upnp::probe_available() + .await + .map(|addr| (addr, Instant::now())) + }) + }), + }; + + // placeholder tasks + let pcp_probing_task = async { None }; + let pmp_probing_task = async { None }; + + if upnp_probing_task.inner.is_some() { + inc!(Metrics::UpnpProbes); + } + + let mut upnp_done = upnp_probing_task.inner.is_none(); + let mut pcp_done = true; + let mut pmp_done = true; + + tokio::pin!(pmp_probing_task); + tokio::pin!(pcp_probing_task); + + let mut probe = Probe::default(); + + while !upnp_done || !pcp_done || !pmp_done { + tokio::select! { + last_upnp_gateway_addr = &mut upnp_probing_task, if !upnp_done => { + trace!("tick: upnp probe ready"); + probe.last_upnp_gateway_addr = last_upnp_gateway_addr; + upnp_done = true; + }, + last_pmp = &mut pmp_probing_task, if !pmp_done => { + trace!("tick: pmp probe ready"); + probe.last_pmp = last_pmp; + pmp_done = true; + }, + last_pcp = &mut pcp_probing_task, if !pcp_done => { + trace!("tick: pcp probe ready"); + probe.last_pcp = last_pcp; + pcp_done = true; + }, + } + } + + probe } - pub fn note_network_down(&self) { - // TODO: + /// Returns a [`ProbeOutput`] indicating which services can be considered available. + fn output(&self) -> ProbeOutput { + let now = Instant::now(); + + // check if the last UPnP gateway is valid + let upnp = self + .last_upnp_gateway_addr + .as_ref() + .map(|(_gateway_addr, last_probed)| *last_probed + AVAILABILITY_TRUST_DURATION > now) + .unwrap_or_default(); + + // not probing for now + let pcp = false; + + // not probing for now + let pmp = false; + + ProbeOutput { upnp, pcp, pmp } + } + + /// Updates a probe with the `Some` values of another probe. + fn update(&mut self, probe: Probe) { + let Probe { + last_upnp_gateway_addr, + last_pcp, + last_pmp, + } = probe; + if last_upnp_gateway_addr.is_some() { + inc!(Metrics::UpnpAvailable); + let new_gateway = last_upnp_gateway_addr + .as_ref() + .map(|(addr, _last_seen)| addr); + let old_gateway = self + .last_upnp_gateway_addr + .as_ref() + .map(|(addr, _last_seen)| addr); + if new_gateway != old_gateway { + inc!(Metrics::UpnpGatewayUpdated); + debug!( + "upnp gateway changed {:?} -> {:?}", + old_gateway + .map(|gw| gw.to_string()) + .unwrap_or("None".into()), + new_gateway + .map(|gw| gw.to_string()) + .unwrap_or("None".into()) + ) + }; + self.last_upnp_gateway_addr = last_upnp_gateway_addr; + } + if last_pcp.is_some() { + self.last_pcp = last_pcp; + } + if last_pmp.is_some() { + self.last_pmp = last_pmp; + } + } +} + +// mainly to make clippy happy +type ProbeResult = Result; + +/// A port mapping client. +#[derive(Debug)] +pub struct Service { + /// Local port to map. + local_port: Option, + /// Channel over which the service is informed of messages. + /// + /// The service will stop when all senders are gone. + rx: mpsc::Receiver, + /// Currently active mapping. + current_mapping: CurrentMapping, + /// Last updated probe. + full_probe: Probe, + /// Task attempting to get a port mapping. + /// + /// This task will be cancelled if a request to set the local port arrives before it's + /// finished. + mapping_task: Option>>, + /// Task probing the necessary protocols. + /// + /// Requests for a probe that arrive while this task is still in progress will receive the same + /// result. + probing_task: Option<( + util::AbortingJoinHandle, + Vec>, + )>, +} + +impl Service { + fn new(rx: mpsc::Receiver) -> (Self, watch::Receiver>) { + let (current_mapping, watcher) = CurrentMapping::new(); + let service = Service { + local_port: None, + rx, + current_mapping, + full_probe: Default::default(), + mapping_task: None, + probing_task: None, + }; + + (service, watcher) + } + + /// Clears the current mapping and releases it. + async fn invalidate_mapping(&mut self) { + if let Some(old_mapping) = self.current_mapping.update(None) { + if let Err(e) = old_mapping.release().await { + debug!("failed to release mapping {e}"); + } + } + } + + async fn run(mut self) -> Result<()> { + debug!("portmap starting"); + loop { + tokio::select! { + msg = self.rx.recv() => { + trace!("tick: msg {msg:?}"); + match msg { + Some(msg) => { + self.handle_msg(msg).await; + }, + None => { + debug!("portmap service channel dropped. Likely shutting down."); + break; + } + } + } + mapping_result = util::MaybeFuture{ inner: self.mapping_task.as_mut() } => { + trace!("tick: mapping ready"); + // regardless of outcome, the task is finished, clear it + self.mapping_task = None; + // there isn't really a way to react to a join error here. Flatten it to make + // it easier to work with + let result = match mapping_result { + Ok(result) => result, + Err(join_err) => Err(anyhow!("Failed to obtain a result {join_err}")) + }; + self.on_mapping_result(result).await; + } + probe_result = util::MaybeFuture{ inner: self.probing_task.as_mut().map(|(fut, _rec)| fut) } => { + trace!("tick: probe ready"); + // retrieve the receivers and clear the task + let receivers = self.probing_task.take().expect("is some").1; + let probe_result = probe_result.map_err(|join_err| anyhow!("Failed to obtain a result {join_err}")); + self.on_probe_result(probe_result, receivers).await; + } + Some(event) = self.current_mapping.next() => { + trace!("tick: mapping event {event:?}"); + match event { + mapping::Event::Renew { external_port } | mapping::Event::Expired { external_port } => { + self.get_mapping(Some(external_port)); + }, + } + + } + } + } + Ok(()) + } + + async fn on_probe_result( + &mut self, + result: Result, + receivers: Vec>, + ) { + let result = match result { + Err(e) => Err(e.to_string()), + Ok(probe) => { + self.full_probe.update(probe); + // TODO(@divma): the gateway of the current mapping could have changed. Tailscale + // still assumes the current mapping is valid/active and will return it even after + // this + let output = self.full_probe.output(); + debug!("probe output {output}"); + Ok(output) + } + }; + for tx in receivers { + // ignore the error. If the receiver is no longer there we don't really care + let _ = tx.send(result.clone()); + } + } + + async fn on_mapping_result(&mut self, result: Result) { + match result { + Ok(mapping) => { + self.current_mapping.update(Some(mapping)); + } + Err(e) => { + debug!("failed to get a port mapping {e}"); + inc!(Metrics::MappingFailures) + } + } + } + + async fn handle_msg(&mut self, msg: Message) { + match msg { + Message::ProcureMapping => self.update_local_port(self.local_port).await, + Message::UpdateLocalPort { local_port } => self.update_local_port(local_port).await, + Message::Probe { result_tx } => self.probe_request(result_tx), + } + } + + /// Updates the local port of the port mapping service. + /// + /// If the port changed, any port mapping task is cancelled. If the new port is some, it will + /// start a new port mapping task. + async fn update_local_port(&mut self, local_port: Option) { + // ignore requests to update the local port in a way that does not produce a change + if local_port != self.local_port { + inc!(Metrics::LocalPortUpdates); + let old_port = std::mem::replace(&mut self.local_port, local_port); + + // clear the current mapping task if any + + let dropped_task = self.mapping_task.take(); + // check if the dropped task had finished to reduce log noise + let did_cancel = dropped_task + .map(|task| !task.is_finished()) + .unwrap_or_default(); + + if did_cancel { + debug!( + "canceled mapping task due to local port update. Old: {:?} New: {:?}", + old_port, self.local_port + ) + } + + // get the current external port if any to try to get it again + let port = self.current_mapping.external().map(|(_addr, port)| port); + + // since the port has changed, the current mapping is no longer valid and should be + // released + + if port.is_some() { + self.invalidate_mapping().await; + } + + // start a new mapping task to account for the new port if necessary + self.get_mapping(port) + } else if self.current_mapping.external().is_none() { + // if the local port has not changed, but there is no active mapping try to get one + self.get_mapping(None) + } + } + + fn get_mapping(&mut self, external_port: Option) { + if let Some(local_port) = self.local_port { + inc!(Metrics::MappingAttempts); + let local_ip = match default_net::interface::get_local_ipaddr() { + Some(std::net::IpAddr::V4(ip)) + if !ip.is_unspecified() && !ip.is_loopback() && !ip.is_multicast() => + { + ip + } + _ => { + debug!("no address suitable for portmapping found, attempting localhost"); + std::net::Ipv4Addr::LOCALHOST + } + }; + + debug!("getting a port mapping for {local_ip}:{local_port} -> {external_port:?}"); + let gateway = self + .full_probe + .last_upnp_gateway_addr + .as_ref() + .map(|(gateway, _last_seen)| gateway.clone()); + self.mapping_task = Some( + tokio::spawn(upnp::Mapping::new( + local_ip, + local_port, + gateway, + external_port, + )) + .into(), + ); + } } - pub fn close(&self) { - // TODO: + /// Handles a probe request. + /// + /// If there is a task getting a probe, the receiver will be added with any other waiting for a + /// result. If no probe is underway, a result can be returned immediately if it's still + /// considered valid. Otherwise, a new probe task will be started. + fn probe_request(&mut self, result_tx: oneshot::Sender>) { + match self.probing_task.as_mut() { + Some((_task_handle, receivers)) => receivers.push(result_tx), + None => { + let probe_output = self.full_probe.output(); + if probe_output.all_available() { + // we don't care if the requester is no longer there + let _ = result_tx.send(Ok(probe_output)); + } else { + inc!(Metrics::ProbesStarted); + let handle = tokio::spawn(async move { Probe::new(probe_output).await }); + let receivers = vec![result_tx]; + self.probing_task = Some((handle.into(), receivers)); + } + } + } } } diff --git a/iroh-net/src/hp/portmapper/mapping.rs b/iroh-net/src/hp/portmapper/mapping.rs new file mode 100644 index 0000000000..5b5ed63b8c --- /dev/null +++ b/iroh-net/src/hp/portmapper/mapping.rs @@ -0,0 +1,233 @@ +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + num::NonZeroU16, + pin::Pin, + task::Poll, +}; + +use futures::Future; +use iroh_metrics::{inc, portmap::PortmapMetrics::ExternalAddressUpdated}; +use std::time::Duration; +use tokio::{sync::watch, time}; +use tracing::trace; + +/// This is an implementation detail to facilitate testing. +pub(super) trait Mapping: std::fmt::Debug + Unpin { + fn external(&self) -> (Ipv4Addr, NonZeroU16); + /// Half the lifetime of a mapping. This is used to calculate when a mapping should be renewed. + fn half_lifetime(&self) -> Duration; +} + +impl Mapping for super::upnp::Mapping { + fn external(&self) -> (Ipv4Addr, NonZeroU16) { + self.external() + } + fn half_lifetime(&self) -> Duration { + self.half_lifetime() + } +} + +/// Models the lifetime of an active mapping. +#[derive(Debug)] +struct ActiveMapping { + mapping: M, + deadline: Pin>, + expire_after: bool, +} + +impl ActiveMapping { + fn new(mapping: M) -> Self { + let deadline = Box::pin(time::sleep(mapping.half_lifetime())); + ActiveMapping { + mapping, + deadline, + expire_after: false, + } + } +} + +/// Events in the lifetime of the mapping. +#[derive(Debug, PartialEq, Eq)] +pub(super) enum Event { + /// On this event, the mapping is halway through it's lifetime and should be renewed. + Renew { external_port: NonZeroU16 }, + /// Mapping has expired. + Expired { external_port: NonZeroU16 }, +} + +/// Holds the current mapping value and ensures that any change is reported accordingly. +#[derive(derive_more::Debug)] +pub(super) struct CurrentMapping { + /// Active port mapping. + mapping: Option>, + /// A [`watch::Sender`] that keeps the latest external address for subscribers to changes. + address_tx: watch::Sender>, + /// Waker to ensure this is polled when needed. + #[debug(skip)] + waker: Option, +} + +impl CurrentMapping { + /// Creates a new [`CurrentMapping`] and returns the watcher over it's external address. + pub(super) fn new() -> (Self, watch::Receiver>) { + let (address_tx, address_rx) = watch::channel(None); + let wrapper = CurrentMapping { + mapping: None, + address_tx, + waker: None, + }; + (wrapper, address_rx) + } + + /// Updates the mapping, informing of any changes to the external address. The old mapping is + /// returned. + pub(super) fn update(&mut self, mapping: Option) -> Option { + trace!("new port mapping {mapping:?}"); + let maybe_external_addr = mapping.as_ref().map(|mapping| { + let (ip, port) = mapping.external(); + SocketAddrV4::new(ip, port.into()) + }); + let old_mapping = std::mem::replace(&mut self.mapping, mapping.map(ActiveMapping::new)) + .map(|mapping| mapping.mapping); + // mapping changed + // TODO(@divma): maybe only wake if mapping is some + if let Some(waker) = &self.waker { + waker.wake_by_ref() + } + self.address_tx.send_if_modified(|old_addr| { + // replace the value always, as it could have different internal values + let old_addr = std::mem::replace(old_addr, maybe_external_addr); + // inform only if this produces a different external address + let update = old_addr != maybe_external_addr; + if update { + inc!(ExternalAddressUpdated); + }; + update + }); + old_mapping + } + + fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll { + // grab the waker if needed + if let Some(waker) = &self.waker { + if waker.will_wake(cx.waker()) { + self.waker = Some(cx.waker().clone()); + } + } else { + self.waker = Some(cx.waker().clone()); + } + + // poll the mapping deadlines to keep the state up to date + if let Some(ActiveMapping { + mapping, + deadline, + expire_after, + }) = &mut self.mapping + { + if deadline.as_mut().poll(cx).is_ready() { + let external_port = mapping.external().1; + // check if the deadline means the mapping is expired or due for renewal + return if *expire_after { + trace!("mapping expired {mapping:?}"); + self.update(None); + Poll::Ready(Event::Expired { external_port }) + } else { + // mapping is due for renewal + *deadline = Box::pin(time::sleep(mapping.half_lifetime())); + *expire_after = true; + trace!("due for renewal {mapping:?}"); + Poll::Ready(Event::Renew { external_port }) + }; + } + } + Poll::Pending + } + + pub(crate) fn external(&self) -> Option<(Ipv4Addr, NonZeroU16)> { + self.mapping + .as_ref() + .map(|mapping| mapping.mapping.external()) + } +} + +impl futures::Stream for CurrentMapping { + type Item = Event; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.as_mut().poll(cx).map(Some) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + // for testing a mapping is simply an ip, port pair + type M = (Ipv4Addr, NonZeroU16); + + const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero + unsafe { NonZeroU16::new_unchecked(9586) }; + const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; + const HALF_LIFETIME_SECS: u64 = 1; + + impl Mapping for M { + fn external(&self) -> M { + *self + } + fn half_lifetime(&self) -> Duration { + Duration::from_secs(HALF_LIFETIME_SECS) + } + } + + #[tokio::test] + #[ntest::timeout(2500)] + async fn report_renew_expire_report() { + let (mut c, mut watcher) = CurrentMapping::::new(); + let now = std::time::Instant::now(); + c.update(Some((TEST_IP, TEST_PORT))); + + // 1) check that changes are reported as soon as needed + time::timeout(Duration::from_millis(10), watcher.changed()) + .await + .expect("change is as immediate as it can be.") + .expect("sender is alive"); + let addr = watcher.borrow_and_update().unwrap(); + assert_eq!(addr.ip(), &TEST_IP); + assert_eq!(addr.port(), Into::::into(TEST_PORT)); + + // 2) test that the mapping being due for renewal is reported in a timely matter + let event = c.next().await.expect("Renewal is reported"); + // check that the event is the correct type + assert_eq!( + event, + Event::Renew { + external_port: TEST_PORT + } + ); + // check it's reported not before not after it should + assert_eq!(now.elapsed().as_secs(), HALF_LIFETIME_SECS); + // check renewal does not produce a change + assert!(!watcher.has_changed().unwrap()); + + // 3) test that the mapping being expired is reported in a timely matter + let event = c.next().await.expect("Expiry is reported"); + // check that the event is the correct type + assert_eq!( + event, + Event::Expired { + external_port: TEST_PORT + } + ); + assert_eq!(now.elapsed().as_secs(), 2 * HALF_LIFETIME_SECS); + // check that the change is reported + time::timeout(Duration::from_millis(10), watcher.changed()) + .await + .expect("change is as immediate as it can be.") + .expect("sender is alive"); + assert!(watcher.borrow_and_update().is_none()); + } +} diff --git a/iroh-net/src/hp/portmapper/upnp.rs b/iroh-net/src/hp/portmapper/upnp.rs new file mode 100644 index 0000000000..eabf013eb5 --- /dev/null +++ b/iroh-net/src/hp/portmapper/upnp.rs @@ -0,0 +1,140 @@ +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + num::NonZeroU16, + time::Duration, +}; + +use anyhow::Result; +use igd::aio as aigd; + +use iroh_metrics::{inc, portmap::PortmapMetrics as Metrics}; +use tracing::debug; + +pub use aigd::Gateway; + +/// Seconds we ask the router to maintain the port mapping. 0 means infinite. +const PORT_MAPPING_LEASE_DURATION_SECONDS: u32 = 0; + +/// Maximum duration a UPnP search can take before timing out. +const SEARCH_TIMEOUT: Duration = Duration::from_secs(1); + +/// Tailscale uses the recommended port mapping lifetime for PMP, which is 2 hours. So we assume a +/// half lifetime of 1h. See +const HALF_LIFETIME: Duration = Duration::from_secs(60 * 60); + +/// Name with which we register the mapping in the router. +const PORT_MAPPING_DESCRIPTION: &str = "iroh-portmap"; + +#[derive(derive_more::Debug, Clone)] +pub struct Mapping { + /// The internet Gateway device (router) used to create this mapping. + #[debug("{}", gateway)] + gateway: aigd::Gateway, + /// The external address obtained by this mapping. + external_ip: Ipv4Addr, + /// External port obtained by this mapping. + external_port: NonZeroU16, +} + +impl Mapping { + pub(crate) async fn new( + local_addr: Ipv4Addr, + port: NonZeroU16, + gateway: Option, + preferred_port: Option, + ) -> Result { + let local_addr = SocketAddrV4::new(local_addr, port.into()); + + // search for a gateway if there is not one already + let gateway = if let Some(known_gateway) = gateway { + known_gateway + } else { + aigd::search_gateway(igd::SearchOptions { + timeout: Some(SEARCH_TIMEOUT), + ..Default::default() + }) + .await? + }; + + let external_ip = gateway.get_external_ip().await?; + + // if we are trying to get a specific external port, try this first. If this fails, default + // to try to get any port + if let Some(external_port) = preferred_port { + if gateway + .add_port( + igd::PortMappingProtocol::UDP, + external_port.into(), + local_addr, + PORT_MAPPING_LEASE_DURATION_SECONDS, + PORT_MAPPING_DESCRIPTION, + ) + .await + .is_ok() + { + return Ok(Mapping { + gateway, + external_ip, + external_port, + }); + } + } + + let external_port = gateway + .add_any_port( + igd::PortMappingProtocol::UDP, + local_addr, + PORT_MAPPING_LEASE_DURATION_SECONDS, + PORT_MAPPING_DESCRIPTION, + ) + .await? + .try_into() + .map_err(|_| anyhow::anyhow!("upnp mapping got zero external port"))?; + + Ok(Mapping { + gateway, + external_ip, + external_port, + }) + } + + pub fn half_lifetime(&self) -> Duration { + HALF_LIFETIME + } + + /// Releases the mapping. + pub(crate) async fn release(self) -> Result<()> { + let Mapping { + gateway, + external_port, + .. + } = self; + gateway + .remove_port(igd::PortMappingProtocol::UDP, external_port.into()) + .await?; + Ok(()) + } + + /// Returns the external gateway ip and port that can be used to contact this node. + pub fn external(&self) -> (Ipv4Addr, NonZeroU16) { + (self.external_ip, self.external_port) + } +} + +/// Searches for UPnP gateways. +pub async fn probe_available() -> Option { + inc!(Metrics::UpnpProbes); + match aigd::search_gateway(igd::SearchOptions { + timeout: Some(SEARCH_TIMEOUT), + ..Default::default() + }) + .await + { + Ok(gateway) => Some(gateway), + Err(e) => { + inc!(Metrics::UpnpProbesFailed); + debug!("upnp probe failed: {e}"); + None + } + } +} diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index def662b9f1..81785cca27 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -7,7 +7,7 @@ use std::{ use futures::FutureExt; /// A join handle that owns the task it is running, and aborts it when dropped. -#[derive(Debug)] +#[derive(Debug, derive_more::Deref)] pub(crate) struct AbortingJoinHandle(tokio::task::JoinHandle); impl From> for AbortingJoinHandle { @@ -29,3 +29,50 @@ impl Drop for AbortingJoinHandle { self.0.abort(); } } + +/// Holds a handle to a task and aborts it on drop. +/// +/// See [`tokio::task::AbortHandle`]. +#[derive(derive_more::Debug)] +pub struct CancelOnDrop { + task_name: &'static str, + #[debug(skip)] + handle: tokio::task::AbortHandle, +} + +impl CancelOnDrop { + pub fn new(task_name: &'static str, handle: tokio::task::AbortHandle) -> Self { + CancelOnDrop { task_name, handle } + } +} + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.handle.abort(); + tracing::debug!("{} completed", self.task_name); + } +} + +/// Resolves to pending if the inner is `None`. +#[derive(Debug)] +pub struct MaybeFuture { + pub inner: Option, +} + +// NOTE: explicit implementation to bypass derive unnecessary bounds +impl Default for MaybeFuture { + fn default() -> Self { + MaybeFuture { inner: None } + } +} + +impl Future for MaybeFuture { + type Output = T::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.inner { + Some(ref mut t) => Pin::new(t).poll(cx), + None => Poll::Pending, + } + } +} diff --git a/iroh/src/commands/doctor.rs b/iroh/src/commands/doctor.rs index c6433b264d..e7f26ade5d 100644 --- a/iroh/src/commands/doctor.rs +++ b/iroh/src/commands/doctor.rs @@ -2,6 +2,7 @@ //! and to test connectivity to specific other nodes. use std::{ net::SocketAddr, + num::NonZeroU16, time::{Duration, Instant}, }; @@ -16,6 +17,7 @@ use iroh_net::{ self, derp::{DerpMap, UseIpv4, UseIpv6}, key::node::SecretKey, + portmapper, }, tls::{Keypair, PeerId, PublicKey}, MagicEndpoint, @@ -96,6 +98,14 @@ pub enum Commands { #[clap(long)] local_derper: bool, }, + /// Attempt to get a port mapping to the given local port. + PortMap { + /// Local port to get a mapping. + local_port: NonZeroU16, + /// How long to wait for an external port to be ready in seconds. + #[clap(long, default_value_t = 10)] + timeout_secs: u64, + }, } #[derive(Debug, Serialize, Deserialize, MaxSize)] @@ -191,7 +201,8 @@ async fn send_blocks( } async fn report(stun_host: Option, stun_port: u16, config: &Config) -> anyhow::Result<()> { - let mut client = hp::netcheck::Client::new(None).await?; + let port_mapper = hp::portmapper::Client::new().await; + let mut client = hp::netcheck::Client::new(Some(port_mapper)).await?; let dm = match stun_host { Some(host_name) => { @@ -565,6 +576,27 @@ async fn accept( Ok(()) } +async fn port_map(local_port: NonZeroU16, timeout: Duration) -> anyhow::Result<()> { + let port_mapper = portmapper::Client::new().await; + let mut watcher = port_mapper.watch_external_address(); + port_mapper.update_local_port(local_port); + + // wait for the mapping to be ready, or timeout waiting for a change. + match tokio::time::timeout(timeout, watcher.changed()).await { + Ok(Ok(_)) => match *watcher.borrow() { + Some(address) => { + println!("Port mapping ready: {address}"); + // Ensure the port mapper remains alive until the end. + drop(port_mapper); + Ok(()) + } + None => anyhow::bail!("No port mapping found"), + }, + Ok(Err(_recv_err)) => anyhow::bail!("Service dropped. This is a bug"), + Err(_) => anyhow::bail!("Timed out waiting for a port mapping"), + } +} + fn create_secret_key(private_key: PrivateKey) -> anyhow::Result { Ok(match private_key { PrivateKey::Random => SecretKey::generate(), @@ -626,5 +658,9 @@ pub async fn run(command: Commands, config: &Config) -> anyhow::Result<()> { let config = TestConfig { size, iterations }; accept(private_key, config, derp_map).await } + Commands::PortMap { + local_port, + timeout_secs, + } => port_map(local_port, Duration::from_secs(timeout_secs)).await, } }