From 47c43a10d0524a566d0efca102e4cbfba7d15a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 Aug 2024 12:17:03 +0200 Subject: [PATCH 1/4] feat(bench): Add `--with-relay` option to allow testing relay throughput (Using `DEV_RELAY_ONLY` in addition to `--with-relay`) --- Cargo.lock | 1 + iroh-net/bench/Cargo.toml | 3 ++- iroh-net/bench/src/bin/bulk.rs | 14 ++++++++-- iroh-net/bench/src/iroh.rs | 49 ++++++++++++++++++++++++++++------ iroh-net/bench/src/lib.rs | 7 +++++ 5 files changed, 63 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5facd291f..d303efee8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2937,6 +2937,7 @@ dependencies = [ "anyhow", "bytes", "clap", + "futures-lite 2.3.0", "hdrhistogram", "iroh-net", "quinn 0.10.2", diff --git a/iroh-net/bench/Cargo.toml b/iroh-net/bench/Cargo.toml index f5789db3eb..ae53ed830d 100644 --- a/iroh-net/bench/Cargo.toml +++ b/iroh-net/bench/Cargo.toml @@ -9,7 +9,7 @@ publish = false anyhow = "1.0.22" bytes = "1" hdrhistogram = { version = "7.2", default-features = false } -iroh-net = { path = ".." } +iroh-net = { path = "..", features = ["test-utils"] } rcgen = "0.11.1" rustls = { version = "0.21.0", default-features = false, features = ["quic"] } clap = { version = "4", features = ["derive"] } @@ -17,6 +17,7 @@ tokio = { version = "1.0.1", features = ["rt", "sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] } socket2 = "0.5" +futures-lite = "2.3.0" [target.'cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))'.dependencies] quinn = "0.10" diff --git a/iroh-net/bench/src/bin/bulk.rs b/iroh-net/bench/src/bin/bulk.rs index a0c6933876..6d09fb8c53 100644 --- a/iroh-net/bench/src/bin/bulk.rs +++ b/iroh-net/bench/src/bin/bulk.rs @@ -32,9 +32,18 @@ fn main() { pub fn run_iroh(opt: Opt) -> Result<()> { let server_span = tracing::error_span!("server"); let runtime = rt(); + + let (relay_url, _guard) = if opt.with_relay { + let (_, relay_url, _guard) = runtime.block_on(iroh_net::test_utils::run_relay_server())?; + + (Some(relay_url), Some(_guard)) + } else { + (None, None) + }; + let (server_addr, endpoint) = { let _guard = server_span.enter(); - iroh::server_endpoint(&runtime, &opt) + iroh::server_endpoint(&runtime, relay_url.clone(), &opt) }; let server_thread = std::thread::spawn(move || { @@ -47,10 +56,11 @@ pub fn run_iroh(opt: Opt) -> Result<()> { let mut handles = Vec::new(); for id in 0..opt.clients { let server_addr = server_addr.clone(); + let relay_url = relay_url.clone(); handles.push(std::thread::spawn(move || { let _guard = tracing::error_span!("client", id).entered(); let runtime = rt(); - match runtime.block_on(iroh::client(server_addr, opt)) { + match runtime.block_on(iroh::client(server_addr, relay_url.clone(), opt)) { Ok(stats) => Ok(stats), Err(e) => { eprintln!("client failed: {e:#}"); diff --git a/iroh-net/bench/src/iroh.rs b/iroh-net/bench/src/iroh.rs index a20d363384..84f58b47a4 100644 --- a/iroh-net/bench/src/iroh.rs +++ b/iroh-net/bench/src/iroh.rs @@ -5,9 +5,10 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; +use futures_lite::StreamExt as _; use iroh_net::{ endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig}, - relay::RelayMode, + relay::{RelayMap, RelayMode, RelayUrl}, Endpoint, NodeAddr, }; use tracing::trace; @@ -19,27 +20,47 @@ use crate::{ pub const ALPN: &[u8] = b"n0/iroh-net-bench/0"; /// Creates a server endpoint which runs on the given runtime -pub fn server_endpoint(rt: &tokio::runtime::Runtime, opt: &Opt) -> (NodeAddr, Endpoint) { +pub fn server_endpoint( + rt: &tokio::runtime::Runtime, + relay_url: Option, + opt: &Opt, +) -> (NodeAddr, Endpoint) { let _guard = rt.enter(); rt.block_on(async move { + let relay_mode = relay_url.clone().map_or(RelayMode::Disabled, |url| { + RelayMode::Custom(RelayMap::from_url(url)) + }); let ep = Endpoint::builder() .alpns(vec![ALPN.to_vec()]) - .relay_mode(RelayMode::Disabled) + .insecure_skip_relay_cert_verify(relay_url.is_some()) + .relay_mode(relay_mode) .transport_config(transport_config(opt.max_streams, opt.initial_mtu)) .bind(0) .await .unwrap(); + + if relay_url.is_some() { + ep.watch_home_relay().next().await; + } + let addr = ep.bound_sockets(); let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), addr.0.port()); - let addr = NodeAddr::new(ep.node_id()).with_direct_addresses([addr]); + let mut addr = NodeAddr::new(ep.node_id()).with_direct_addresses([addr]); + if let Some(relay_url) = relay_url { + addr = addr.with_relay_url(relay_url); + } (addr, ep) }) } /// Create and run a client -pub async fn client(server_addr: NodeAddr, opt: Opt) -> Result { +pub async fn client( + server_addr: NodeAddr, + relay_url: Option, + opt: Opt, +) -> Result { let client_start = std::time::Instant::now(); - let (endpoint, connection) = connect_client(server_addr, opt).await?; + let (endpoint, connection) = connect_client(server_addr, relay_url, opt).await?; let client_connect_time = client_start.elapsed(); let mut res = client_handler( EndpointSelector::Iroh(endpoint), @@ -52,15 +73,27 @@ pub async fn client(server_addr: NodeAddr, opt: Opt) -> Result { } /// Create a client endpoint and client connection -pub async fn connect_client(server_addr: NodeAddr, opt: Opt) -> Result<(Endpoint, Connection)> { +pub async fn connect_client( + server_addr: NodeAddr, + relay_url: Option, + opt: Opt, +) -> Result<(Endpoint, Connection)> { + let relay_mode = relay_url.clone().map_or(RelayMode::Disabled, |url| { + RelayMode::Custom(RelayMap::from_url(url)) + }); let endpoint = Endpoint::builder() .alpns(vec![ALPN.to_vec()]) - .relay_mode(RelayMode::Disabled) + .insecure_skip_relay_cert_verify(relay_url.is_some()) + .relay_mode(relay_mode) .transport_config(transport_config(opt.max_streams, opt.initial_mtu)) .bind(0) .await .unwrap(); + if relay_url.is_some() { + endpoint.watch_home_relay().next().await; + } + // TODO: We don't support passing client transport config currently // let mut client_config = quinn::ClientConfig::new(Arc::new(crypto)); // client_config.transport_config(Arc::new(transport_config(&opt))); diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index cf1ecab6c0..b7ce6f10cb 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -60,6 +60,13 @@ pub struct Opt { /// Starting guess for maximum UDP payload size #[clap(long, default_value = "1200")] pub initial_mtu: u16, + /// Whether to run a local relay and have the server and clients connect to that. + /// + /// Can be combined with the `DEV_RELAY_ONLY` environment variable (at compile time) + /// to test throughput for relay-only traffic locally. + /// (e.g. `DEV_RELAY_ONLY=true cargo run --release -- iroh --with-relay`) + #[clap(long, default_value_t = false)] + pub with_relay: bool, } pub enum EndpointSelector { From 9e9cdaa1ce02e705c2d52d16eac9385a26abbf24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 Aug 2024 12:24:52 +0200 Subject: [PATCH 2/4] Borrow more --- iroh-net/bench/src/bin/bulk.rs | 2 +- iroh-net/bench/src/iroh.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh-net/bench/src/bin/bulk.rs b/iroh-net/bench/src/bin/bulk.rs index 6d09fb8c53..b81a7ee1c8 100644 --- a/iroh-net/bench/src/bin/bulk.rs +++ b/iroh-net/bench/src/bin/bulk.rs @@ -43,7 +43,7 @@ pub fn run_iroh(opt: Opt) -> Result<()> { let (server_addr, endpoint) = { let _guard = server_span.enter(); - iroh::server_endpoint(&runtime, relay_url.clone(), &opt) + iroh::server_endpoint(&runtime, &relay_url, &opt) }; let server_thread = std::thread::spawn(move || { diff --git a/iroh-net/bench/src/iroh.rs b/iroh-net/bench/src/iroh.rs index 84f58b47a4..f411640b26 100644 --- a/iroh-net/bench/src/iroh.rs +++ b/iroh-net/bench/src/iroh.rs @@ -22,7 +22,7 @@ pub const ALPN: &[u8] = b"n0/iroh-net-bench/0"; /// Creates a server endpoint which runs on the given runtime pub fn server_endpoint( rt: &tokio::runtime::Runtime, - relay_url: Option, + relay_url: &Option, opt: &Opt, ) -> (NodeAddr, Endpoint) { let _guard = rt.enter(); @@ -47,7 +47,7 @@ pub fn server_endpoint( let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), addr.0.port()); let mut addr = NodeAddr::new(ep.node_id()).with_direct_addresses([addr]); if let Some(relay_url) = relay_url { - addr = addr.with_relay_url(relay_url); + addr = addr.with_relay_url(relay_url.clone()); } (addr, ep) }) From e0cd2059187f2da8fd7b008901d086ae55798d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 26 Aug 2024 10:59:58 +0200 Subject: [PATCH 3/4] feat(bench): Add `--metrics` option printing iroh-net library metrics --- Cargo.lock | 1 + iroh-net/bench/Cargo.toml | 1 + iroh-net/bench/src/bin/bulk.rs | 55 ++++++++++++++++++++++++++++++++++ iroh-net/bench/src/lib.rs | 6 ++++ 4 files changed, 63 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d303efee8a..2f8e76d693 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2939,6 +2939,7 @@ dependencies = [ "clap", "futures-lite 2.3.0", "hdrhistogram", + "iroh-metrics", "iroh-net", "quinn 0.10.2", "rcgen 0.11.3", diff --git a/iroh-net/bench/Cargo.toml b/iroh-net/bench/Cargo.toml index ae53ed830d..892dde19dc 100644 --- a/iroh-net/bench/Cargo.toml +++ b/iroh-net/bench/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0.22" bytes = "1" hdrhistogram = { version = "7.2", default-features = false } iroh-net = { path = "..", features = ["test-utils"] } +iroh-metrics = { path = "../../iroh-metrics" } rcgen = "0.11.1" rustls = { version = "0.21.0", default-features = false, features = ["quic"] } clap = { version = "4", features = ["derive"] } diff --git a/iroh-net/bench/src/bin/bulk.rs b/iroh-net/bench/src/bin/bulk.rs index b81a7ee1c8..5ac3ba6a00 100644 --- a/iroh-net/bench/src/bin/bulk.rs +++ b/iroh-net/bench/src/bin/bulk.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use anyhow::Result; use clap::Parser; @@ -30,6 +32,19 @@ fn main() { } pub fn run_iroh(opt: Opt) -> Result<()> { + if opt.metrics { + // enable recording metrics + iroh_metrics::core::Core::try_init(|reg, metrics| { + use iroh_metrics::core::Metric; + metrics.insert(iroh_net::metrics::MagicsockMetrics::new(reg)); + metrics.insert(iroh_net::metrics::NetcheckMetrics::new(reg)); + metrics.insert(iroh_net::metrics::PortmapMetrics::new(reg)); + if opt.with_relay { + metrics.insert(iroh_net::metrics::RelayMetrics::new(reg)); + } + })?; + } + let server_span = tracing::error_span!("server"); let runtime = rt(); @@ -78,6 +93,30 @@ pub fn run_iroh(opt: Opt) -> Result<()> { } } + if opt.metrics { + // print metrics + let core = + iroh_metrics::core::Core::get().ok_or_else(|| anyhow::anyhow!("Missing metrics"))?; + println!("\nMetrics:"); + collect_and_print( + "MagicsockMetrics", + core.get_collector::(), + ); + collect_and_print( + "NetcheckMetrics", + core.get_collector::(), + ); + collect_and_print( + "PortmapMetrics", + core.get_collector::(), + ); + // if None, (this is the case if opt.with_relay is false), then this is skipped internally: + collect_and_print( + "RelayMetrics", + core.get_collector::(), + ); + } + server_thread.join().expect("server thread"); Ok(()) @@ -130,3 +169,19 @@ pub fn run_quinn(opt: Opt) -> Result<()> { pub fn run_s2n(_opt: s2n::Opt) -> Result<()> { unimplemented!() } + +fn collect_and_print( + category: &'static str, + metrics: Option<&impl iroh_metrics::struct_iterable::Iterable>, +) { + let Some(metrics) = metrics else { + return; + }; + let mut map = BTreeMap::new(); + for (name, counter) in metrics.iter() { + if let Some(counter) = counter.downcast_ref::() { + map.insert(name.to_string(), counter.get()); + } + } + println!("{category}: {map:#?}"); +} diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index b7ce6f10cb..e9bfc4272c 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -54,6 +54,12 @@ pub struct Opt { /// Show connection stats the at the end of the benchmark #[clap(long = "stats")] pub stats: bool, + /// Show iroh-net library counter metrics at the end of the benchmark + /// + /// These metrics are process-wide, so contain metrics for + /// clients and the server all summed up. + #[clap(long)] + pub metrics: bool, /// Whether to use the unordered read API #[clap(long = "unordered")] pub read_unordered: bool, From ef780c020cbf26924dbbaa4e2ab272552242f7e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 26 Aug 2024 17:04:05 +0200 Subject: [PATCH 4/4] test(iroh-net): Mark `dht_discovery_smoke` as flaky #2669 --- iroh-net/src/discovery/pkarr/dht.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh-net/src/discovery/pkarr/dht.rs b/iroh-net/src/discovery/pkarr/dht.rs index cc79aafda2..ff06e8dbaa 100644 --- a/iroh-net/src/discovery/pkarr/dht.rs +++ b/iroh-net/src/discovery/pkarr/dht.rs @@ -396,6 +396,7 @@ mod tests { use testresult::TestResult; #[tokio::test] + #[ignore = "flaky"] async fn dht_discovery_smoke() -> TestResult { let _ = tracing_subscriber::fmt::try_init(); let ep = crate::Endpoint::builder().bind(0).await?;