Skip to content

Commit

Permalink
fix(netcheck): Do not read from main Conn sockets (#1017)
Browse files Browse the repository at this point in the history
The get_report function is passed the primary Conn sockets so they can
be used to send STUN messages from.  However these sockets should not
be read from directly, Conn already passes through the STUN payloads.
Reading from them directly will lose packets in Conn.

When the sockets are not passed in however we should create them
ourselves, and read from them.  The payloads are now received by the
actor in the same way as otherwise, removing a lot of Option<>s.
  • Loading branch information
flub authored May 16, 2023
1 parent 4fc70f5 commit 5e997a4
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 116 deletions.
216 changes: 101 additions & 115 deletions src/hp/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::{
};
use rand::seq::IteratorRandom;
use tokio::{
net,
net::UdpSocket,
sync::{self, broadcast, mpsc, RwLock},
task::JoinSet,
time::{self, Duration, Instant},
Expand Down Expand Up @@ -203,13 +203,76 @@ impl Client {
pub async fn get_report(
&mut self,
dm: &DerpMap,
stun_conn4: Option<Arc<net::UdpSocket>>,
stun_conn6: Option<Arc<net::UdpSocket>>,
stun_conn4: Option<Arc<UdpSocket>>,
stun_conn6: Option<Arc<UdpSocket>>,
) -> Result<Arc<Report>> {
// TODO: consider if DerpMap should be made to easily clone? It is expensive right
// now.
// If not given UdpSockets to send stun packets, create them.
// TODO: Is failure really fatal?
let stun_conn4 = match stun_conn4 {
Some(stun_conn4) => stun_conn4,
None => {
let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0));
let sock = UdpSocket::bind(addr)
.await
.context("netcheck: failed to bind udp 0.0.0.0:0")?;
let sock = Arc::new(sock);
self.spawn_udp_listener(sock.clone(), self.msg_sender.clone());
sock
}
};
let stun_conn6 = match stun_conn6 {
Some(stun_conn6) => stun_conn6,
None => {
let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0));
let sock = UdpSocket::bind(addr)
.await
.context("netcheck: failed to bind udp6 [::]:0")?;
let sock = Arc::new(sock);
self.spawn_udp_listener(sock.clone(), self.msg_sender.clone());
sock
}
};

// TODO: consider if DerpMap should be made to easily clone? It seems expensive
// right now.
self.actor.run(dm.clone(), stun_conn4, stun_conn6).await
}

/// Spawns a tokio task reading stun packets from the UDP socket.
fn spawn_udp_listener(&self, sock: Arc<UdpSocket>, sender: mpsc::Sender<ActorMessage>) {
tokio::spawn(async move {
debug!("udp stun socket listener started");
// TODO: Can we do better for buffers here? Probably doesn't matter
// much.
let mut buf = vec![0u8; 64 << 10];
loop {
if let Err(err) = Self::recv_stun_socket(&sock, &mut buf, &sender).await {
// TODO: handle socket closed nicely
warn!(%err, "stun recv failed");
break;
}
}
debug!("udp stun socket listener stopped");
});
}

/// Receive STUN response from a UDP socket, pass it to the actor.
async fn recv_stun_socket(
sock: &UdpSocket,
buf: &mut [u8],
sender: &mpsc::Sender<ActorMessage>,
) -> Result<()> {
let (count, mut from_addr) = sock
.recv_from(buf)
.await
.context("Error reading from stun socket")?;
let payload = &buf[..count];
from_addr.set_ip(to_canonical(from_addr.ip()));
sender
.send(ActorMessage::StunPacket(payload.to_vec(), from_addr))
.await
.context("actor stopped")
}
}

async fn measure_https_latency(_reg: &DerpRegion) -> Result<(Duration, IpAddr)> {
Expand Down Expand Up @@ -435,9 +498,9 @@ struct ReportState {
got_hair_stun: broadcast::Receiver<SocketAddr>,
// notified on hair pin timeout
hair_timeout: Arc<sync::Notify>,
pc4: Option<Arc<net::UdpSocket>>,
pc6: Option<Arc<net::UdpSocket>>,
pc4_hair: Arc<net::UdpSocket>,
pc4: Arc<UdpSocket>,
pc6: Arc<UdpSocket>,
pc4_hair: Arc<UdpSocket>,
incremental: bool, // doing a lite, follow-up netcheck
stop_probe: Arc<sync::Notify>,
wait_port_map: wg::AsyncWaitGroup,
Expand Down Expand Up @@ -809,8 +872,8 @@ enum ProbeError {
async fn run_probe(
report: Arc<RwLock<Report>>,
resolver: &TokioAsyncResolver,
pc4: Option<Arc<net::UdpSocket>>,
pc6: Option<Arc<net::UdpSocket>>,
pc4: Arc<UdpSocket>,
pc6: Arc<UdpSocket>,
node: DerpNode,
probe: Probe,
in_flight: sync::mpsc::Sender<Inflight>,
Expand Down Expand Up @@ -850,33 +913,29 @@ async fn run_probe(
Probe::Ipv4 { .. } => {
// TODO:
// metricSTUNSend4.Add(1)
if let Some(ref pc4) = pc4 {
let n = pc4.send_to(&req, addr).await;
debug!("sending probe IPV4: {:?} to {}", n, addr);
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv4_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
let n = pc4.send_to(&req, addr).await;
debug!("sending probe IPV4: {:?} to {}", n, addr);
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv4_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
}
Probe::Ipv6 { .. } => {
if let Some(ref pc6) = pc6 {
// TODO:
// metricSTUNSend6.Add(1)
let n = pc6.send_to(&req, addr).await;
debug!("sending probe IPV6: {:?} to {}", n, addr);
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv6_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
// TODO:
// metricSTUNSend6.Add(1)
let n = pc6.send_to(&req, addr).await;
debug!("sending probe IPV6: {:?} to {}", n, addr);
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv6_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
}
Probe::Https { reg, .. } => {
Expand Down Expand Up @@ -1026,8 +1085,8 @@ impl Actor {
async fn run(
&mut self,
dm: DerpMap,
stun_sock_v4: Option<Arc<net::UdpSocket>>,
stun_sock_v6: Option<Arc<net::UdpSocket>>,
stun_sock_v4: Arc<UdpSocket>,
stun_sock_v6: Arc<UdpSocket>,
) -> Result<Arc<Report>> {
let report_state = self
.create_report_state(&dm, stun_sock_v4.clone(), stun_sock_v6.clone())
Expand All @@ -1043,8 +1102,6 @@ impl Actor {
.await
}))
};
let mut buf4 = vec![0u8; 64 << 10];
let mut buf6 = vec![0u8; 64 << 10];
let mut in_flight = HashMap::new();

loop {
Expand All @@ -1059,20 +1116,6 @@ impl Actor {
self.receive_stun_packet(&mut in_flight, &pkt, source).await,
}
}
res = maybe_pending(stun_sock_v4.as_ref().map(|c| c.recv_from(&mut buf4))) => {
match res {
Err(err) => warn!("failed to read ipv4: {:?}", err),
Ok((n, addr)) =>
self.process_packet(&mut in_flight, &buf4[..n], addr).await,
}
}
res = maybe_pending(stun_sock_v6.as_ref().map(|c| c.recv_from(&mut buf6))) => {
match res {
Err(err) => warn!("failed to read ipv6: {:?}", err),
Ok((n, addr)) =>
self.process_packet(&mut in_flight, &buf6[..n], addr).await,
}
}
res = &mut running => {
match res {
Ok(Ok((report, dm))) => {
Expand All @@ -1096,13 +1139,13 @@ impl Actor {
async fn create_report_state(
&mut self,
dm: &DerpMap,
pc4: Option<Arc<net::UdpSocket>>,
pc6: Option<Arc<net::UdpSocket>>,
pc4: Arc<UdpSocket>,
pc6: Arc<UdpSocket>,
) -> Result<ReportState> {
let now = Instant::now();

// Create a UDP4 socket used for sending to our discovered IPv4 address.
let pc4_hair = net::UdpSocket::bind("0.0.0.0:0")
let pc4_hair = UdpSocket::bind("0.0.0.0:0")
.await
.context("udp4: failed to bind")?;

Expand All @@ -1113,12 +1156,6 @@ impl Actor {

let got_hair_stun_r = self.got_hair_stun.subscribe();
let if_state = interfaces::State::new().await;
let pc4 = Some(self.init_stun_conn4(pc4).await?);
let pc6 = if if_state.have_v6 {
Some(self.init_stun_conn6(pc6).await?)
} else {
None
};
let mut do_full = self.reports.next_full
|| now.duration_since(self.reports.last_full) > FULL_REPORT_INTERVAL;

Expand Down Expand Up @@ -1161,34 +1198,6 @@ impl Actor {
})
}

async fn init_stun_conn4(
&self,
pc4: Option<Arc<net::UdpSocket>>,
) -> Result<Arc<net::UdpSocket>> {
if let Some(pc4) = pc4 {
return Ok(pc4);
}
let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0));
let u4 = net::UdpSocket::bind(addr)
.await
.with_context(|| format!("udp4: failed to bind to: {}", addr))?;
Ok(Arc::new(u4))
}

async fn init_stun_conn6(
&self,
pc6: Option<Arc<net::UdpSocket>>,
) -> Result<Arc<net::UdpSocket>> {
if let Some(pc6) = pc6 {
return Ok(pc6);
}
let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0));
let u6 = net::UdpSocket::bind(addr)
.await
.with_context(|| format!("udp6: failed to bind to: {}", addr))?;
Ok(Arc::new(u6))
}

async fn receive_stun_packet(
&self,
in_flight: &mut HashMap<stun::TransactionId, Inflight>,
Expand Down Expand Up @@ -1230,21 +1239,6 @@ impl Actor {
}
}

/// Reads STUN packets from pc until there's an error. In either case, it closes `pc`.
async fn process_packet(
&self,
in_flight: &mut HashMap<stun::TransactionId, Inflight>,
pkt: &[u8],
mut addr: SocketAddr,
) {
if !stun::is(pkt) {
// ignore non stun packets
return;
}
addr.set_ip(to_canonical(addr.ip()));
self.receive_stun_packet(in_flight, pkt, addr).await;
}

async fn finish_and_store_report(&mut self, report: Report, dm: &DerpMap) -> Arc<Report> {
let report = self.add_report_history_and_set_preferred_derp(report).await;
self.log_concise_report(&report, dm).await;
Expand Down Expand Up @@ -1401,7 +1395,7 @@ impl Actor {
/// Test if IPv6 works at all, or if it's been hard disabled at the OS level.
async fn os_has_ipv6() -> bool {
// TODO: use socket2 to specify binding to ipv6
let udp = net::UdpSocket::bind("[::1]:0").await;
let udp = UdpSocket::bind("[::1]:0").await;
udp.is_ok()
}

Expand All @@ -1418,14 +1412,6 @@ async fn os_has_ipv6() -> bool {
// metricHTTPSend = clientmetric.NewCounter("netcheck_https_measure")
// )

/// Resolves to pending if the future is `None`.
async fn maybe_pending<T>(maybe_fut: Option<impl Future<Output = T>>) -> T {
match maybe_fut {
Some(t) => t.await,
None => futures::future::pending().await,
}
}

/// Resolves to pending if the inner is `None`.
#[derive(Debug)]
struct MaybeFuture<T> {
Expand Down Expand Up @@ -1554,7 +1540,7 @@ mod tests {
let local_addr = "127.0.0.1";
let bind_addr = "0.0.0.0";

let server = net::UdpSocket::bind(format!("{bind_addr}:0")).await?;
let server = UdpSocket::bind(format!("{bind_addr}:0")).await?;
let addr = server.local_addr()?;

let server_task = tokio::task::spawn(async move {
Expand All @@ -1565,7 +1551,7 @@ mod tests {
server.send_to(&buf[..n], addr).await.unwrap();
});

let client = net::UdpSocket::bind(format!("{bind_addr}:0")).await?;
let client = UdpSocket::bind(format!("{bind_addr}:0")).await?;
let data = b"foobar";
println!("client: send");
let server_addr = format!("{local_addr}:{}", addr.port());
Expand Down
5 changes: 4 additions & 1 deletion src/net/ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ fn is_link_local(ip: IpAddr) -> bool {
}
}

/// Converts this address to an IpAddr::V4 if it is an IPv4-mapped IPv6 addresses, otherwise it return self as-is.
/// Converts IPv4-mappend IPv6 addresses to IPv4.
///
/// Converts this address to an [`IpAddr::V4`] if it is an IPv4-mapped IPv6 addresses,
/// otherwise it return self as-is.
// TODO: replace with IpAddr::to_canoncial once stabilized.
pub fn to_canonical(ip: IpAddr) -> IpAddr {
match ip {
Expand Down

0 comments on commit 5e997a4

Please sign in to comment.