From dd0a3e10f50971f9235d65ed775d51646a5555ae Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 20 Apr 2021 16:26:57 -0700 Subject: [PATCH 1/3] remove unused handle time module Signed-off-by: Eliza Weisman --- linkerd/app/core/src/handle_time.rs | 302 ---------------------------- linkerd/app/core/src/lib.rs | 1 - 2 files changed, 303 deletions(-) delete mode 100644 linkerd/app/core/src/handle_time.rs diff --git a/linkerd/app/core/src/handle_time.rs b/linkerd/app/core/src/handle_time.rs deleted file mode 100644 index 0c1c2e0d0e..0000000000 --- a/linkerd/app/core/src/handle_time.rs +++ /dev/null @@ -1,302 +0,0 @@ -#![allow(clippy::new_without_default)] - -use super::metrics::Direction; -use linkerd_metrics::{latency, FmtLabels, FmtMetric, FmtMetrics, Histogram, Metric}; -use linkerd_proxy_http::insert; -use std::fmt; -use std::sync::atomic::{self, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; -use std::time::Instant; - -#[derive(Clone, Debug)] -pub struct Metrics { - inbound: Scope, - outbound: Scope, -} - -impl Metrics { - pub const NAME: &'static str = "request_handle_us"; - - pub const HELP: &'static str = - "A histogram of the time in microseconds between when a request is \ - received and when it is sent upstream."; - - pub fn new() -> Self { - Self { - inbound: Scope::new(), - outbound: Scope::new(), - } - } - - pub fn outbound(&self) -> Scope { - self.outbound.clone() - } - - pub fn inbound(&self) -> Scope { - self.inbound.clone() - } - - fn metric(&self) -> Metric<'_, &str, Scope> { - Metric::new(Self::NAME, Self::HELP) - } - - fn scopes(&self) -> impl Iterator { - std::iter::once((Direction::In, &self.inbound)) - .chain(std::iter::once((Direction::Out, &self.outbound))) - } -} - -impl FmtMetrics for Metrics { - fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let metric = self.metric(); - metric.fmt_help(f)?; - metric.fmt_scopes(f, self.scopes(), |s| s) - } -} - -/// A single handle time histogram. -/// -/// Higher-level code will use this to represent a single set of labels for -/// handle-time metrics. -#[derive(Debug, Clone)] -pub struct Scope(Arc); - -/// A layer that inserts a `Tracker` into each request passing through it. -#[derive(Debug, Clone)] -pub struct InsertTracker(Arc); - -/// A request extension that, when dropped, records the time elapsed since it -/// was created. -#[derive(Debug)] -pub struct Tracker { - shared: Arc, - idx: usize, - t0: Instant, -} - -#[derive(Debug)] -struct Shared { - // NOTE: this is inside a `Mutex` since recording a latency requires a mutable - // reference to the histogram. In the future, we could consider making the - // histogram counters `AtomicU64`, so that the histogram could be updated - // with an immutable reference. Then, the mutex could be removed. - histogram: Mutex>, - /// Stores the state of currently active `Tracker`s. - counts: RwLock>, - /// The index of the most recently finished counter. - /// - /// When a new counter is needed, the counter at this index will be used, - /// and that counter's next index will be set as the head. of the free - /// list. - /// - /// When an active counter completes, this will be set to its index, and - /// the previous value will become the freed counter's next pointer. - idle_head: AtomicUsize, -} - -/// Counts the number of times a request has been cloned or dropped. -/// -/// Since requests may be cloned for retries, we must count the number of clones -/// currently alive to ensure that the handle time for that request has -/// completed fully. -#[derive(Debug)] -struct Count { - /// The number of currently active `Tracker`s for this request.. - /// - /// When a request is initially received, there will be one `Tracker` in its - /// `Extensions` map. If the request is cloned for retries, the `Tracker` - /// will be cloned, incrementing this count. Dropping a `Tracker` decrements - /// this count, and when it reaches 0, the handle time is recorded. - clones: AtomicUsize, - /// Index of the next free counter. - next_idle: AtomicUsize, -} - -impl insert::Lazy for InsertTracker { - fn value(&self) -> Tracker { - self.0.clone().tracker() - } -} - -// ===== impl Scope ===== - -impl Scope { - pub fn new() -> Self { - Scope(Arc::new(Shared::new())) - } - - pub fn layer(&self) -> insert::Layer { - insert::Layer::new(InsertTracker(self.0.clone())) - } -} - -impl FmtMetric for Scope { - const KIND: &'static str = as FmtMetric>::KIND; - - fn fmt_metric(&self, f: &mut fmt::Formatter<'_>, name: N) -> fmt::Result { - if let Ok(hist) = self.0.histogram.lock() { - hist.fmt_metric(f, name)?; - } - Ok(()) - } - - fn fmt_metric_labeled( - &self, - f: &mut fmt::Formatter<'_>, - name: N, - labels: L, - ) -> fmt::Result - where - N: fmt::Display, - L: FmtLabels, - { - if let Ok(hist) = self.0.histogram.lock() { - hist.fmt_metric_labeled(f, name, labels)?; - } - Ok(()) - } -} - -// ===== impl InsertTracker ===== - -impl Clone for Tracker { - fn clone(&self) -> Self { - self.shared.clone_tracker(self.idx); - Self { - shared: self.shared.clone(), - idx: self.idx, - t0: self.t0, - } - } -} - -impl Drop for Tracker { - fn drop(&mut self) { - self.shared.drop_tracker(&*self); - } -} - -impl Shared { - const INITIAL_RECORDERS: usize = 32; - - fn new() -> Self { - let mut counts = Vec::with_capacity(Self::INITIAL_RECORDERS); - Self::add_counts(&mut counts, Self::INITIAL_RECORDERS); - Self { - histogram: Mutex::new(Histogram::default()), // TODO(eliza): should we change the bounds here? - counts: RwLock::new(counts), - idle_head: AtomicUsize::new(0), - } - } - - fn tracker(self: Arc) -> Tracker { - let t0 = Instant::now(); - loop { - let idx = self.idle_head.load(Ordering::Relaxed); - // This is determined in a scope so that we can move `Self` into the - // new tracker without doing a second (unecessary) arc bump. - let acquired = { - // Do we have any free counts remaining, or must we grow the - // slab? - let counts = self - .counts - .read() - .ok() - .filter(|counts| idx < counts.len()) - .unwrap_or_else(|| { - // Slow path: if there are no free counts in the - // slab, extend it (acquiring a write lock temporarily). - self.grow(); - self.counts.read().unwrap() - }); - - let next = counts[idx].next_idle.load(Ordering::Acquire); - // If the counter is still idle, update its ref count & set the - // free index to point at the next free counter. - counts[idx].clones.compare_and_swap(0, 1, Ordering::AcqRel) == 0 - && self.idle_head.compare_and_swap(idx, next, Ordering::AcqRel) == idx - }; - - if acquired { - return Tracker { - shared: self, - idx, - t0, - }; - } - - // The counter at `idx` was not actually free! Try again with a - // fresh index. - atomic::spin_loop_hint() - } - } - - #[cold] - fn grow(&self) { - let mut counts = self.counts.write().unwrap(); - let amount = counts.len() * 2; - counts.reserve(amount); - Self::add_counts(&mut counts, amount); - } - - /// Called when a tracker is dropped. This updates the counter of clones for - /// that request, and records its handle time when the final clone is dropped. - fn drop_tracker(&self, Tracker { idx, t0, .. }: &Tracker) { - let panicking = std::thread::panicking(); - let counts = match self.counts.read() { - Ok(lock) => lock, - // Avoid double panicking in drop. - Err(_) if panicking => return, - Err(e) => panic!("lock poisoned: {:?}", e), - }; - let idx = *idx; - let counter = match counts.get(idx) { - Some(counter) => counter, - None if panicking => return, - None => panic!("counts[{:?}] did not exist", idx), - }; - - // If the prior count was 1, it's now 0 and all clones of the request - // have been fully dropped, so we can now record its handle time. - if counter.clones.fetch_sub(1, Ordering::Release) == 1 { - let elapsed = t0.elapsed(); - - let hist = match self.histogram.lock() { - Ok(lock) => lock, - // Avoid double panicking in drop. - Err(_) if panicking => return, - Err(e) => panic!("lock poisoned: {:?}", e), - }; - - // Record the handle time for this counter. - hist.add(elapsed); - - // Link the counter onto the free list by setting the free-list - // head to its index, and setting the counter's next pointer to the - // previous head index. - let next_idx = self.idle_head.swap(idx, Ordering::AcqRel); - counter.next_idle.store(next_idx, Ordering::Release); - } - } - - /// Called when cloning a tracker into a copy of a request. - /// - /// This updates the count of clones for that request. - fn clone_tracker(&self, idx: usize) { - let counts = self.counts.read().unwrap(); - let _prev = counts[idx].clones.fetch_add(1, Ordering::Release); - debug_assert!(_prev > 0, "cannot clone an idle tracker"); - } - - fn add_counts(counts: &mut Vec, amount: usize) { - let len = counts.len(); - let new_len = len + amount; - for i in len..new_len { - let next_idle = AtomicUsize::new(i + 1); - counts.push(Count { - clones: AtomicUsize::new(0), - next_idle, - }) - } - } -} diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 13270a4d11..81e7d6fa4a 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -37,7 +37,6 @@ pub mod control; pub mod dns; pub mod dst; pub mod errors; -pub mod handle_time; pub mod http_tracing; pub mod metrics; pub mod proxy; From 3d82e9a5d355c5802f4b94c4adb5cffea54fb13c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 20 Apr 2021 16:33:28 -0700 Subject: [PATCH 2/3] remove deprecated compare_and_swap Signed-off-by: Eliza Weisman --- linkerd/app/test/src/resolver.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/linkerd/app/test/src/resolver.rs b/linkerd/app/test/src/resolver.rs index d7180d422e..db6b4e96ec 100644 --- a/linkerd/app/test/src/resolver.rs +++ b/linkerd/app/test/src/resolver.rs @@ -139,9 +139,7 @@ impl, E> tower::Service for Dst { .unwrap_or_else(|| { tracing::debug!(?addr, "no endpoint configured for"); // An unknown endpoint was resolved! - self.state - .only - .compare_and_swap(true, false, Ordering::Release); + self.state.only.store(false, Ordering::Release); let (tx, rx) = mpsc::unbounded_channel(); let _ = tx.send(Ok(Update::DoesNotExist)); UnboundedReceiverStream::new(rx) @@ -212,9 +210,7 @@ impl> tower::Service for Profiles { .unwrap_or_else(|| { tracing::debug!(?addr, "no endpoint configured for"); // An unknown endpoint was resolved! - self.state - .only - .compare_and_swap(true, false, Ordering::Release); + self.state.only.store(false, Ordering::Release); None }); From 75989a51d949b7dd4d70efc013fc476f61a3658e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 20 Apr 2021 16:34:34 -0700 Subject: [PATCH 3/3] update tower; remove deprecated ready-ands Signed-off-by: Eliza Weisman --- Cargo.lock | 4 ++-- linkerd/app/outbound/src/http/tests.rs | 2 +- linkerd/app/test/src/http_util.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b98d6df54..779dc0e673 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2262,9 +2262,9 @@ dependencies = [ [[package]] name = "tower" -version = "0.4.5" +version = "0.4.6" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "713c629c07a3a97f741c140e474e7304294fabec66a43a33f0832e98315ab07f" +checksum = "f715efe02c0862926eb463e49368d38ddb119383475686178e32e26d15d06a66" dependencies = [ "futures-core", "futures-util", diff --git a/linkerd/app/outbound/src/http/tests.rs b/linkerd/app/outbound/src/http/tests.rs index e221818222..9078f8c85c 100644 --- a/linkerd/app/outbound/src/http/tests.rs +++ b/linkerd/app/outbound/src/http/tests.rs @@ -186,7 +186,7 @@ async fn profile_endpoint_propagates_conn_errors() { }); let rsp = client - .ready_and() + .ready() .await .expect("Client must not fail") .call( diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 03f396c7db..c4a50a1e6f 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -45,7 +45,7 @@ where { let (client_io, server_io) = io::duplex(4096); let f = server - .ready_and() + .ready() .await .map_err(Into::into) .expect("proxy server failed to become ready") @@ -108,7 +108,7 @@ pub async fn http_request( request: Request, ) -> Response { let rsp = client - .ready_and() + .ready() .await .expect("Client must not fail") .call(request)