From 4c26615ccdb70cf8dde98b3953d36f59c5e13693 Mon Sep 17 00:00:00 2001 From: Eirik A Date: Mon, 6 Feb 2023 09:46:57 +0000 Subject: [PATCH] Add better logging for watcher errors (#1134) * tracing output from kube-runtime trying to hunt a potential bug Signed-off-by: clux better tracing Signed-off-by: clux ugh Signed-off-by: clux lesssss output Signed-off-by: clux remove backoff reset Signed-off-by: clux better errors for watcher Signed-off-by: clux re-add line Signed-off-by: clux * clippy + fmt Signed-off-by: clux --------- Signed-off-by: clux --- examples/crd_derive_multi.rs | 2 -- examples/pod_shell_crossterm.rs | 1 - kube-client/src/config/file_loader.rs | 6 ++-- kube-client/src/lib.rs | 1 - kube-core/src/crd.rs | 1 - kube-runtime/src/watcher.rs | 50 +++++++++++++++++++++------ 6 files changed, 42 insertions(+), 19 deletions(-) diff --git a/examples/crd_derive_multi.rs b/examples/crd_derive_multi.rs index 258eabb1a..f96342cc0 100644 --- a/examples/crd_derive_multi.rs +++ b/examples/crd_derive_multi.rs @@ -81,7 +81,6 @@ async fn main() -> anyhow::Result<()> { let newvarv2_2 = v2api.patch("new", &ssapply, &Patch::Apply(&v2m)).await?; info!("new on v2 correct on reapply to v2: {:?}", newvarv2_2.spec); - // note we can apply old versions without them being truncated to the v2 schema // in our case this means we cannot fetch them with our v1 schema (breaking change to not have oldprop) let v1m2 = v1::ManyDerive::new("old", v1::ManyDeriveSpec { @@ -101,7 +100,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } - async fn apply_crd(client: Client, crd: CustomResourceDefinition) -> anyhow::Result<()> { let crds: Api = Api::all(client.clone()); info!("Creating crd: {}", serde_yaml::to_string(&crd)?); diff --git a/examples/pod_shell_crossterm.rs b/examples/pod_shell_crossterm.rs index 02d474e78..56455a62d 100644 --- a/examples/pod_shell_crossterm.rs +++ b/examples/pod_shell_crossterm.rs @@ -37,7 +37,6 @@ async fn handle_terminal_size(mut channel: Sender) -> Result<(), a Ok(()) } - #[tokio::main] async fn main() -> anyhow::Result<()> { let client = Client::try_default().await?; diff --git a/kube-client/src/config/file_loader.rs b/kube-client/src/config/file_loader.rs index fb962ed2e..2eeb1b01f 100644 --- a/kube-client/src/config/file_loader.rs +++ b/kube-client/src/config/file_loader.rs @@ -91,9 +91,9 @@ impl ConfigLoader { .ok_or_else(|| KubeconfigError::FindUser(user_name.clone()))?; Ok(ConfigLoader { - current_context: current_context, - cluster: cluster, - user: user, + current_context, + cluster, + user, }) } diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index b2fd32f57..957c78a1b 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -125,7 +125,6 @@ pub use crate::core::{CustomResourceExt, Resource, ResourceExt}; /// Re-exports from kube_core pub use kube_core as core; - // Tests that require a cluster and the complete feature set // Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored` #[cfg(all(feature = "client", feature = "config"))] diff --git a/kube-core/src/crd.rs b/kube-core/src/crd.rs index c8bff69ad..376e3d632 100644 --- a/kube-core/src/crd.rs +++ b/kube-core/src/crd.rs @@ -222,7 +222,6 @@ pub mod v1 { served: true storage: false"#; - let c1: Crd = serde_yaml::from_str(crd1).unwrap(); let c2: Crd = serde_yaml::from_str(crd2).unwrap(); let ce: Crd = serde_yaml::from_str(expected).unwrap(); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index df2b4a32b..7b202f0ea 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -8,12 +8,14 @@ use derivative::Derivative; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ api::{ListParams, Resource, ResourceExt, WatchEvent}, - Api, + error::ErrorResponse, + Api, Error as ClientErr, }; use serde::de::DeserializeOwned; use smallvec::SmallVec; use std::{clone::Clone, fmt::Debug, time::Duration}; use thiserror::Error; +use tracing::{debug, error, warn}; #[derive(Debug, Error)] pub enum Error { @@ -22,7 +24,7 @@ pub enum Error { #[error("failed to start watching object: {0}")] WatchStartFailed(#[source] kube_client::Error), #[error("error returned by apiserver during watch: {0}")] - WatchError(#[source] kube_client::error::ErrorResponse), + WatchError(#[source] ErrorResponse), #[error("watch stream failed: {0}")] WatchFailed(#[source] kube_client::Error), #[error("no metadata.resourceVersion in watch result (does resource support watch?)")] @@ -150,17 +152,31 @@ async fn step_trampolined (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) + } }, State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await { Ok(stream) => (None, State::Watching { resource_version, stream: stream.boxed(), }), - Err(err) => ( - Some(Err(err).map_err(Error::WatchStartFailed)), - State::InitListed { resource_version }, - ), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + ( + Some(Err(err).map_err(Error::WatchStartFailed)), + State::InitListed { resource_version }, + ) + } }, State::Watching { resource_version, @@ -194,12 +210,24 @@ async fn step_trampolined (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { - resource_version, - stream, - }), + Some(Err(err)) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watcher error 403: {err:?}"); + } else { + debug!("watcher error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { + resource_version, + stream, + }) + } None => (None, State::InitListed { resource_version }), }, }