Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conditions::is_accepted for crds with await_condition #659

Merged
merged 7 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 6 additions & 32 deletions examples/crd_apply.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use apiexts::CustomResourceDefinition;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1 as apiexts;

use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt, WatchEvent},
api::{Api, Patch, PatchParams, ResourceExt},
runtime::wait::{await_condition, conditions},
Client, CustomResource, CustomResourceExt,
};

Expand Down Expand Up @@ -46,7 +46,10 @@ async fn main() -> anyhow::Result<()> {
info!("Creating crd: {}", serde_yaml::to_string(&Foo::crd())?);
crds.patch("foos.clux.dev", &ssapply, &Patch::Apply(Foo::crd()))
.await?;
wait_for_crd_ready(&crds).await?; // wait for k8s to deal with it

info!("Waiting for the api-server to accept the CRD");
let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;

// Start applying foos
let foos: Api<Foo> = Api::namespaced(client.clone(), &namespace);
Expand Down Expand Up @@ -78,32 +81,3 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

// manual way to check that a CRD has been installed
async fn wait_for_crd_ready(crds: &Api<CustomResourceDefinition>) -> anyhow::Result<()> {
if crds.get("foos.clux.dev").await.is_ok() {
return Ok(());
}
// Wait for the apply to take place (takes a sec or two during first install)
let lp = ListParams::default()
.fields(&format!("metadata.name={}", "foos.clux.dev")) // our crd only
.timeout(5); // should not take long
let mut stream = crds.watch(&lp, "0").await?.boxed();

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(s) = status {
info!("Modify event for {}", s.name());
if let Some(s) = s.status {
if let Some(conds) = s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "NamesAccepted") {
if pcond.status == "True" {
info!("crd was accepted: {:?}", pcond);
return Ok(());
}
}
}
}
}
}
Err(anyhow::anyhow!("Timed out waiting for crd to become accepted"))
}
31 changes: 8 additions & 23 deletions examples/crd_derive_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use kube::{
Api, ApiResource, DeleteParams, DynamicObject, GroupVersionKind, ListParams, Patch, PatchParams,
PostParams, WatchEvent,
},
runtime::wait::{await_condition, conditions},
Client, CustomResource, CustomResourceExt,
};
use schemars::JsonSchema;
Expand Down Expand Up @@ -224,30 +225,14 @@ async fn create_crd(client: Client) -> Result<CustomResourceDefinition> {
let api = Api::<CustomResourceDefinition>::all(client);
api.create(&PostParams::default(), &Foo::crd()).await?;

// Wait until ready
let timeout_secs = 15;
let lp = ListParams::default()
.fields("metadata.name=foos.clux.dev")
.timeout(timeout_secs);
let mut stream = api.watch(&lp, "0").await?.boxed_local();
while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(crd) = status {
let accepted = crd
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|sc| {
sc.iter()
.any(|c| c.type_ == "NamesAccepted" && c.status == "True")
})
.unwrap_or(false);
if accepted {
return Ok(crd);
}
}
}
// Wait until it's accepted and established by the api-server
println!("Waiting for the api-server to accept the CRD");
let establish = await_condition(api.clone(), "foos.clux.dev", conditions::is_crd_established());
let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;

Err(anyhow!(format!("CRD not ready after {} seconds", timeout_secs)))
// It's served by the api - get it and return it
let crd = api.get("foos.clux.dev").await?;
Ok(crd)
}

// Delete the CRD if it exists and wait until it's deleted.
Expand Down
1 change: 0 additions & 1 deletion examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use kube::{
Client,
};


#[tokio::main]
async fn main() -> Result<()> {
let client = Client::try_default().await?;
Expand Down
80 changes: 73 additions & 7 deletions kube-runtime/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,83 @@ pub enum Error {
/// permission to `watch` and `list` it.
///
/// Does *not* fail if the object is not found.
pub async fn await_condition<K>(
api: Api<K>,
name: &str,
mut cond: impl FnMut(Option<&K>) -> bool,
) -> Result<(), Error>
///
/// # Usage
///
/// ```
/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
/// use kube::{Api, runtime::wait::{await_condition, conditions}};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
///
/// let crds: Api<CustomResourceDefinition> = Api::all(client);
/// // .. create or apply a crd here ..
/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
/// # Ok(())
/// # }
/// ```
pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<(), Error>
where
K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
{
watch_object(api, name)
.context(ProbeFailed)
.try_take_while(|obj| {
let result = !cond(obj.as_ref());
let result = !cond.matches_object(obj.as_ref());
async move { Ok(result) }
})
.try_for_each(|_| async { Ok(()) })
.await
}

/// A trait for condition functions to be used by [`await_condition`]
///
/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
///
/// # Usage
///
/// ```
/// use kube::runtime::wait::Condition;
/// use k8s_openapi::api::core::v1::Pod;
/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
/// move |obj: Option<&Pod>| {
/// if let Some(pod) = &obj {
/// if let Some(status) = &pod.status {
/// if let Some(conds) = &status.conditions {
/// if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
/// return pcond.status == "True";
/// }
/// }
/// }
/// }
/// false
/// }
/// }
/// ```
pub trait Condition<K> {
fn matches_object(&self, obj: Option<&K>) -> bool;
}

impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
fn matches_object(&self, obj: Option<&K>) -> bool {
(self)(obj)
}
}

/// Common conditions to wait for
pub mod conditions {
pub use super::Condition;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube_client::Resource;

/// An await condition that returns `true` once the object has been deleted.
///
/// An object is considered to be deleted if the object can no longer be found, or if its
/// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
/// the deletion event and the object is recreated in the meantime.
pub fn is_deleted<K: Resource>(uid: &str) -> impl Fn(Option<&K>) -> bool + '_ {
#[must_use]
pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
move |obj: Option<&K>| {
obj.map_or(
// Object is not found, success!
Expand All @@ -67,6 +116,23 @@ pub mod conditions {
)
}
}

/// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
#[must_use]
pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
|obj: Option<&CustomResourceDefinition>| {
if let Some(o) = obj {
if let Some(s) = &o.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
return pcond.status == "True";
}
}
}
}
false
}
}
}

/// Utilities for deleting objects
Expand Down