Skip to content

Commit

Permalink
Add better logging for watcher errors (#1134)
Browse files Browse the repository at this point in the history
* tracing output from kube-runtime

trying to hunt a potential bug

Signed-off-by: clux <sszynrae@gmail.com>

better tracing

Signed-off-by: clux <sszynrae@gmail.com>

ugh

Signed-off-by: clux <sszynrae@gmail.com>

lesssss output

Signed-off-by: clux <sszynrae@gmail.com>

remove backoff reset

Signed-off-by: clux <sszynrae@gmail.com>

better errors for watcher

Signed-off-by: clux <sszynrae@gmail.com>

re-add line

Signed-off-by: clux <sszynrae@gmail.com>

* clippy + fmt

Signed-off-by: clux <sszynrae@gmail.com>

---------

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux authored Feb 6, 2023
1 parent 8664de5 commit 4c26615
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 19 deletions.
2 changes: 0 additions & 2 deletions examples/crd_derive_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ async fn main() -> anyhow::Result<()> {
let newvarv2_2 = v2api.patch("new", &ssapply, &Patch::Apply(&v2m)).await?;
info!("new on v2 correct on reapply to v2: {:?}", newvarv2_2.spec);


// note we can apply old versions without them being truncated to the v2 schema
// in our case this means we cannot fetch them with our v1 schema (breaking change to not have oldprop)
let v1m2 = v1::ManyDerive::new("old", v1::ManyDeriveSpec {
Expand All @@ -101,7 +100,6 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}


async fn apply_crd(client: Client, crd: CustomResourceDefinition) -> anyhow::Result<()> {
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
info!("Creating crd: {}", serde_yaml::to_string(&crd)?);
Expand Down
1 change: 0 additions & 1 deletion examples/pod_shell_crossterm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ async fn handle_terminal_size(mut channel: Sender<TerminalSize>) -> Result<(), a
Ok(())
}


#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
Expand Down
6 changes: 3 additions & 3 deletions kube-client/src/config/file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ impl ConfigLoader {
.ok_or_else(|| KubeconfigError::FindUser(user_name.clone()))?;

Ok(ConfigLoader {
current_context: current_context,
cluster: cluster,
user: user,
current_context,
cluster,
user,
})
}

Expand Down
1 change: 0 additions & 1 deletion kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
/// Re-exports from kube_core
pub use kube_core as core;


// Tests that require a cluster and the complete feature set
// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
#[cfg(all(feature = "client", feature = "config"))]
Expand Down
1 change: 0 additions & 1 deletion kube-core/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ pub mod v1 {
served: true
storage: false"#;


let c1: Crd = serde_yaml::from_str(crd1).unwrap();
let c2: Crd = serde_yaml::from_str(crd2).unwrap();
let ce: Crd = serde_yaml::from_str(expected).unwrap();
Expand Down
50 changes: 39 additions & 11 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
Api,
error::ErrorResponse,
Api, Error as ClientErr,
};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
use std::{clone::Clone, fmt::Debug, time::Duration};
use thiserror::Error;
use tracing::{debug, error, warn};

#[derive(Debug, Error)]
pub enum Error {
Expand All @@ -22,7 +24,7 @@ pub enum Error {
#[error("failed to start watching object: {0}")]
WatchStartFailed(#[source] kube_client::Error),
#[error("error returned by apiserver during watch: {0}")]
WatchError(#[source] kube_client::error::ErrorResponse),
WatchError(#[source] ErrorResponse),
#[error("watch stream failed: {0}")]
WatchFailed(#[source] kube_client::Error),
#[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
Expand Down Expand Up @@ -150,17 +152,31 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
(Some(Err(Error::NoResourceVersion)), State::Empty)
}
}
Err(err) => (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
}
(Some(Err(err).map_err(Error::InitialListFailed)), State::Empty)
}
},
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
}),
Err(err) => (
Some(Err(err).map_err(Error::WatchStartFailed)),
State::InitListed { resource_version },
),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
} else {
debug!("watch initlist error: {err:?}");
}
(
Some(Err(err).map_err(Error::WatchStartFailed)),
State::InitListed { resource_version },
)
}
},
State::Watching {
resource_version,
Expand Down Expand Up @@ -194,12 +210,24 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
stream,
}
};
if err.code == 403 {
warn!("watcher watchevent error 403: {err:?}");
} else {
debug!("error watchevent error: {err:?}");
}
(Some(Err(err).map_err(Error::WatchError)), new_state)
}
Some(Err(err)) => (Some(Err(err).map_err(Error::WatchFailed)), State::Watching {
resource_version,
stream,
}),
Some(Err(err)) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watcher error 403: {err:?}");
} else {
debug!("watcher error: {err:?}");
}
(Some(Err(err).map_err(Error::WatchFailed)), State::Watching {
resource_version,
stream,
})
}
None => (None, State::InitListed { resource_version }),
},
}
Expand Down

0 comments on commit 4c26615

Please sign in to comment.