From fe94f59b9efedc6115a5ddf007036647751e7707 Mon Sep 17 00:00:00 2001 From: clux Date: Mon, 18 Oct 2021 19:56:15 +0100 Subject: [PATCH] add conditions::is_accepted for crds - closes #655 Signed-off-by: clux --- examples/crd_apply.rs | 36 ++++------------------------------- examples/crd_derive_schema.rs | 28 ++++----------------------- kube-runtime/src/wait.rs | 23 ++++++++++++++++++++++ 3 files changed, 31 insertions(+), 56 deletions(-) diff --git a/examples/crd_apply.rs b/examples/crd_apply.rs index d13c64d4d..b142db4c5 100644 --- a/examples/crd_apply.rs +++ b/examples/crd_apply.rs @@ -1,5 +1,4 @@ #[macro_use] extern crate log; -use futures::{StreamExt, TryStreamExt}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -7,8 +6,9 @@ use apiexts::CustomResourceDefinition; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1 as apiexts; use kube::{ - api::{Api, ListParams, Patch, PatchParams, ResourceExt, WatchEvent}, + api::{Api, Patch, PatchParams, ResourceExt}, Client, CustomResource, CustomResourceExt, + runtime::wait::{await_condition, conditions}, }; // NB: This example uses server side apply and beta1 customresources @@ -46,7 +46,8 @@ async fn main() -> anyhow::Result<()> { info!("Creating crd: {}", serde_yaml::to_string(&Foo::crd())?); crds.patch("foos.clux.dev", &ssapply, &Patch::Apply(Foo::crd())) .await?; - wait_for_crd_ready(&crds).await?; // wait for k8s to deal with it + info!("Waiting for the api-server to accept the CRD"); + await_condition(crds, "foos.clux.dev", conditions::is_accepted()).await?; // Start applying foos let foos: Api = Api::namespaced(client.clone(), &namespace); @@ -78,32 +79,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } - -// manual way to check that a CRD has been installed -async fn wait_for_crd_ready(crds: &Api) -> anyhow::Result<()> { - if crds.get("foos.clux.dev").await.is_ok() { - return Ok(()); - } - // Wait for the apply to take place (takes a sec or two during first install) - let lp = ListParams::default() - .fields(&format!("metadata.name={}", "foos.clux.dev")) // our crd only - .timeout(5); // should not take long - let mut stream = crds.watch(&lp, "0").await?.boxed(); - - while let Some(status) = stream.try_next().await? { - if let WatchEvent::Modified(s) = status { - info!("Modify event for {}", s.name()); - if let Some(s) = s.status { - if let Some(conds) = s.conditions { - if let Some(pcond) = conds.iter().find(|c| c.type_ == "NamesAccepted") { - if pcond.status == "True" { - info!("crd was accepted: {:?}", pcond); - return Ok(()); - } - } - } - } - } - } - Err(anyhow::anyhow!("Timed out waiting for crd to become accepted")) -} diff --git a/examples/crd_derive_schema.rs b/examples/crd_derive_schema.rs index 22e5316f7..72bff8f9d 100644 --- a/examples/crd_derive_schema.rs +++ b/examples/crd_derive_schema.rs @@ -7,6 +7,7 @@ use kube::{ PostParams, WatchEvent, }, Client, CustomResource, CustomResourceExt, + runtime::wait::{await_condition, conditions}, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -224,30 +225,9 @@ async fn create_crd(client: Client) -> Result { let api = Api::::all(client); api.create(&PostParams::default(), &Foo::crd()).await?; - // Wait until ready - let timeout_secs = 15; - let lp = ListParams::default() - .fields("metadata.name=foos.clux.dev") - .timeout(timeout_secs); - let mut stream = api.watch(&lp, "0").await?.boxed_local(); - while let Some(status) = stream.try_next().await? { - if let WatchEvent::Modified(crd) = status { - let accepted = crd - .status - .as_ref() - .and_then(|s| s.conditions.as_ref()) - .map(|sc| { - sc.iter() - .any(|c| c.type_ == "NamesAccepted" && c.status == "True") - }) - .unwrap_or(false); - if accepted { - return Ok(crd); - } - } - } - - Err(anyhow!(format!("CRD not ready after {} seconds", timeout_secs))) + // Wait until it's accepted + await_condition(crds, "foos.clux.dev", conditions::is_accepted()).await?; + let crd = api.get("foos.clux.dev").await?; } // Delete the CRD if it exists and wait until it's deleted. diff --git a/kube-runtime/src/wait.rs b/kube-runtime/src/wait.rs index 56bafff50..b490ed760 100644 --- a/kube-runtime/src/wait.rs +++ b/kube-runtime/src/wait.rs @@ -51,6 +51,7 @@ where /// Common conditions to wait for pub mod conditions { use kube_client::Resource; + use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; /// An await condition that returns `true` once the object has been deleted. /// @@ -67,6 +68,28 @@ pub mod conditions { ) } } + + /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted + pub fn is_accepted() -> impl Fn(Option<&CustomResourceDefinition>) -> bool { + move |obj: Option<&CustomResourceDefinition>| { + obj.map_or( + // Object missing, failure! + false, + |o| { + if let Some(s) = &o.status { + if let Some(conds) = &s.conditions { + if let Some(pcond) = conds.iter().find(|c| c.type_ == "NamesAccepted") { + if pcond.status == "True" { + return true + } + } + } + } + false + }, + ) + } + } } /// Utilities for deleting objects