From c38e800d4624660fd50c22b7bf4af7b1d6439dc9 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 16:09:16 +0100 Subject: [PATCH 1/6] Allow configuring the watcher's `default_backoff` from `WatchStreamExt` To allow the same chaining style without having to import all the backoff dependencies directly. This allows me to write more concise recommendation documentation in kube-rs/website. Signed-off-by: clux --- examples/pod_watcher.rs | 1 + kube-runtime/src/controller/mod.rs | 4 ++-- kube-runtime/src/utils/watch_ext.rs | 17 +++++++++++++++-- kube-runtime/src/watcher.rs | 11 ++++++++--- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/examples/pod_watcher.rs b/examples/pod_watcher.rs index 946c86ec1..e4e79fc4d 100644 --- a/examples/pod_watcher.rs +++ b/examples/pod_watcher.rs @@ -15,6 +15,7 @@ async fn main() -> anyhow::Result<()> { watcher(api, watcher::Config::default()) .applied_objects() + .default_backoff() .try_for_each(|p| async move { info!("saw {}", p.name_any()); if let Some(unready_reason) = pod_unready(&p) { diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 526aacf95..afc08f5e1 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -710,14 +710,14 @@ where /// # use k8s_openapi::api::core::v1::ConfigMap; /// # use k8s_openapi::api::apps::v1::StatefulSet; /// # use kube::runtime::controller::Action; - /// # use kube::runtime::{predicates, watcher, Controller, WatchStreamExt}; + /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt}; /// # use kube::{Api, Client, Error, ResourceExt}; /// # use std::sync::Arc; /// # type CustomResource = ConfigMap; /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } /// # async fn doc(client: kube::Client) { - /// let sts_stream = watcher(Api::::all(client.clone()), watcher::Config::default()) + /// let sts_stream = metadata_watcher(Api::::all(client.clone()), watcher::Config::default()) /// .touched_objects() /// .predicate_filter(predicates::generation); /// diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index ea2aa639a..9e0d9fa8a 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -8,12 +8,25 @@ use crate::{ }; #[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; -use backoff::backoff::Backoff; +use crate::{utils::ResetTimerBackoff, watcher::default_exponential_backoff}; +use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{Stream, TryStream}; +use std::time::Duration; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { - /// Apply a [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] + /// Apply the [`default_backoff`](crate::watcher::default_backoff) watcher [`Backoff`] policy + /// + /// This is recommended for controllers that want to play nicely with the apiserver. + fn default_backoff(self) -> StreamBackoff> + where + Self: TryStream + Sized, + { + let bo = default_exponential_backoff(); + StreamBackoff::new(self, ResetTimerBackoff::new(bo, Duration::from_secs(120))) + } + + /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] fn backoff(self, b: B) -> StreamBackoff where B: Backoff, diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 940749755..f7603a906 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -646,13 +646,18 @@ pub fn watch_object impl Backoff + Send + Sync { - let expo = backoff::ExponentialBackoff { + let bo = default_exponential_backoff(); + ResetTimerBackoff::new(bo, Duration::from_secs(120)) +} + +// ideally should be a const var but Default for ExponentialBackoff is not const +pub(crate) fn default_exponential_backoff() -> backoff::ExponentialBackoff { + backoff::ExponentialBackoff { initial_interval: Duration::from_millis(800), max_interval: Duration::from_secs(30), randomization_factor: 1.0, multiplier: 2.0, max_elapsed_time: None, ..ExponentialBackoff::default() - }; - ResetTimerBackoff::new(expo, Duration::from_secs(120)) + } } From b59c1b22fea74cb71492dc9d4c0803200ecc5511 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 16:41:02 +0100 Subject: [PATCH 2/6] avoid duplicating the duration value to keep params in one place Signed-off-by: clux --- examples/node_watcher.rs | 5 +---- kube-runtime/src/utils/watch_ext.rs | 8 +++++--- kube-runtime/src/watcher.rs | 3 ++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index 3b11b7e42..e5a079f34 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -1,4 +1,3 @@ -use backoff::ExponentialBackoff; use futures::{pin_mut, TryStreamExt}; use k8s_openapi::api::core::v1::{Event, Node}; use kube::{ @@ -16,9 +15,7 @@ async fn main() -> anyhow::Result<()> { let nodes: Api = Api::all(client.clone()); let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64"); - let obs = watcher(nodes, wc) - .backoff(ExponentialBackoff::default()) - .applied_objects(); + let obs = watcher(nodes, wc).default_backoff().applied_objects(); pin_mut!(obs); while let Some(n) = obs.try_next().await? { diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 9e0d9fa8a..80066923f 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -8,10 +8,12 @@ use crate::{ }; #[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; -use crate::{utils::ResetTimerBackoff, watcher::default_exponential_backoff}; +use crate::{ + utils::ResetTimerBackoff, + watcher::{default_exponential_backoff, DEFAULT_RESET_DURATION}, +}; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{Stream, TryStream}; -use std::time::Duration; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { @@ -23,7 +25,7 @@ pub trait WatchStreamExt: Stream { Self: TryStream + Sized, { let bo = default_exponential_backoff(); - StreamBackoff::new(self, ResetTimerBackoff::new(bo, Duration::from_secs(120))) + StreamBackoff::new(self, ResetTimerBackoff::new(bo, DEFAULT_RESET_DURATION)) } /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index f7603a906..ad9a4c9bb 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -647,7 +647,7 @@ pub fn watch_object impl Backoff + Send + Sync { let bo = default_exponential_backoff(); - ResetTimerBackoff::new(bo, Duration::from_secs(120)) + ResetTimerBackoff::new(bo, DEFAULT_RESET_DURATION) } // ideally should be a const var but Default for ExponentialBackoff is not const @@ -661,3 +661,4 @@ pub(crate) fn default_exponential_backoff() -> backoff::ExponentialBackoff { ..ExponentialBackoff::default() } } +pub(crate) const DEFAULT_RESET_DURATION: Duration = Duration::from_secs(120); From 4b6f1c7b132ad22828e9436ef7f2b04bf78a6528 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 17:27:53 +0100 Subject: [PATCH 3/6] partially hide the implementation detail inside `DefaultBackoff` Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 4 ++-- kube-runtime/src/utils/watch_ext.rs | 14 ++++------- kube-runtime/src/watcher.rs | 37 ++++++++++++++++++++++++++--- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index afc08f5e1..d3232e59f 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -541,7 +541,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::new(watcher::default_backoff()), + trigger_backoff: Box::new(watcher::DefaultBackoff::default()), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -624,7 +624,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::new(watcher::default_backoff()), + trigger_backoff: Box::new(watcher::DefaultBackoff::default()), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 80066923f..a9391f923 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -8,24 +8,20 @@ use crate::{ }; #[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; -use crate::{ - utils::ResetTimerBackoff, - watcher::{default_exponential_backoff, DEFAULT_RESET_DURATION}, -}; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use crate::watcher::DefaultBackoff; +use backoff::backoff::Backoff; use futures::{Stream, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { - /// Apply the [`default_backoff`](crate::watcher::default_backoff) watcher [`Backoff`] policy + /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy /// /// This is recommended for controllers that want to play nicely with the apiserver. - fn default_backoff(self) -> StreamBackoff> + fn default_backoff(self) -> StreamBackoff where Self: TryStream + Sized, { - let bo = default_exponential_backoff(); - StreamBackoff::new(self, ResetTimerBackoff::new(bo, DEFAULT_RESET_DURATION)) + StreamBackoff::new(self, DefaultBackoff::default()) } /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index ad9a4c9bb..2731db280 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -645,13 +645,25 @@ pub fn watch_object impl Backoff + Send + Sync { let bo = default_exponential_backoff(); ResetTimerBackoff::new(bo, DEFAULT_RESET_DURATION) } -// ideally should be a const var but Default for ExponentialBackoff is not const -pub(crate) fn default_exponential_backoff() -> backoff::ExponentialBackoff { +/// Default watcher backoff inspired by Kubernetes' client-go. +/// +/// Note that the exact parameters used herein should not be considered stable. +/// The parameters currently optimize for being kind to struggling apiservers. +/// See [client-go's reflector source](/~https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181) +/// for more details. +pub struct DefaultBackoff(ResetTimerBackoff); + + +fn default_exponential_backoff() -> backoff::ExponentialBackoff { backoff::ExponentialBackoff { initial_interval: Duration::from_millis(800), max_interval: Duration::from_secs(30), @@ -661,4 +673,23 @@ pub(crate) fn default_exponential_backoff() -> backoff::ExponentialBackoff { ..ExponentialBackoff::default() } } -pub(crate) const DEFAULT_RESET_DURATION: Duration = Duration::from_secs(120); +const DEFAULT_RESET_DURATION: Duration = Duration::from_secs(120); + +impl Default for DefaultBackoff { + fn default() -> Self { + Self(ResetTimerBackoff::new( + default_exponential_backoff(), + DEFAULT_RESET_DURATION, + )) + } +} + +impl Backoff for DefaultBackoff { + fn next_backoff(&mut self) -> Option { + self.0.next_backoff() + } + + fn reset(&mut self) { + self.0.reset() + } +} From 147308e566a42618f40461781c4278f990880666 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 17:30:55 +0100 Subject: [PATCH 4/6] clippy Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index d3232e59f..4d9c3a4b3 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -9,7 +9,7 @@ use crate::{ }, scheduler::{scheduler, ScheduleRequest}, utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, watcher, Config}, + watcher::{self, watcher, Config, DefaultBackoff}, }; use backoff::backoff::Backoff; use derivative::Derivative; @@ -541,7 +541,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::new(watcher::DefaultBackoff::default()), + trigger_backoff: Box::::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -624,7 +624,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::new(watcher::DefaultBackoff::default()), + trigger_backoff: Box::::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), From 8d321a1803e22bac771051a6f97d4d6cf70e552c Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 17:48:21 +0100 Subject: [PATCH 5/6] fully encapsulate strategy with a private proxy Signed-off-by: clux --- kube-runtime/src/watcher.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 2731db280..3632f4894 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -640,10 +640,7 @@ pub fn watch_object impl Backoff + Send + Sync { /// Default watcher backoff inspired by Kubernetes' client-go. /// -/// Note that the exact parameters used herein should not be considered stable. /// The parameters currently optimize for being kind to struggling apiservers. -/// See [client-go's reflector source](/~https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181) -/// for more details. -pub struct DefaultBackoff(ResetTimerBackoff); +/// The exact parameters are taken from +/// [client-go's reflector source](/~https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181) +/// and should not be considered stable. +/// +/// This struct implements [`Backoff`] and is the default strategy used +/// when calling `WatchStreamExt::default_backoff`. If you need to create +/// this manually then [`DefaultBackoff::default`] can be used. +pub struct DefaultBackoff(Strategy); +struct Strategy(ResetTimerBackoff); // encapsulate strategy fn default_exponential_backoff() -> backoff::ExponentialBackoff { backoff::ExponentialBackoff { @@ -677,19 +679,19 @@ const DEFAULT_RESET_DURATION: Duration = Duration::from_secs(120); impl Default for DefaultBackoff { fn default() -> Self { - Self(ResetTimerBackoff::new( + Self(Strategy(ResetTimerBackoff::new( default_exponential_backoff(), DEFAULT_RESET_DURATION, - )) + ))) } } impl Backoff for DefaultBackoff { fn next_backoff(&mut self) -> Option { - self.0.next_backoff() + self.0 .0.next_backoff() } fn reset(&mut self) { - self.0.reset() + self.0 .0.reset() } } From b57e57dce5eababfec1de08c9027481d9514daee Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 28 Jun 2023 18:37:11 +0100 Subject: [PATCH 6/6] avoid wrapper struct and avoid default impl fns by changing signature Signed-off-by: clux --- kube-runtime/src/watcher.rs | 39 +++++++++++++++---------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 3632f4894..045463e55 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -646,9 +646,8 @@ pub fn watch_object impl Backoff + Send + Sync { - let bo = default_exponential_backoff(); - ResetTimerBackoff::new(bo, DEFAULT_RESET_DURATION) +pub fn default_backoff() -> DefaultBackoff { + DefaultBackoff::default() } /// Default watcher backoff inspired by Kubernetes' client-go. @@ -662,36 +661,30 @@ pub fn default_backoff() -> impl Backoff + Send + Sync { /// when calling `WatchStreamExt::default_backoff`. If you need to create /// this manually then [`DefaultBackoff::default`] can be used. pub struct DefaultBackoff(Strategy); - -struct Strategy(ResetTimerBackoff); // encapsulate strategy - -fn default_exponential_backoff() -> backoff::ExponentialBackoff { - backoff::ExponentialBackoff { - initial_interval: Duration::from_millis(800), - max_interval: Duration::from_secs(30), - randomization_factor: 1.0, - multiplier: 2.0, - max_elapsed_time: None, - ..ExponentialBackoff::default() - } -} -const DEFAULT_RESET_DURATION: Duration = Duration::from_secs(120); +type Strategy = ResetTimerBackoff; impl Default for DefaultBackoff { fn default() -> Self { - Self(Strategy(ResetTimerBackoff::new( - default_exponential_backoff(), - DEFAULT_RESET_DURATION, - ))) + Self(ResetTimerBackoff::new( + backoff::ExponentialBackoff { + initial_interval: Duration::from_millis(800), + max_interval: Duration::from_secs(30), + randomization_factor: 1.0, + multiplier: 2.0, + max_elapsed_time: None, + ..ExponentialBackoff::default() + }, + Duration::from_secs(120), + )) } } impl Backoff for DefaultBackoff { fn next_backoff(&mut self) -> Option { - self.0 .0.next_backoff() + self.0.next_backoff() } fn reset(&mut self) { - self.0 .0.reset() + self.0.reset() } }