forked from kube-rs/kube
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrd_apply.rs
107 lines (92 loc) · 3.71 KB
/
crd_apply.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use apiexts::CustomResourceDefinition;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1 as apiexts;
use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt, WatchEvent},
Client, CustomResource,
};
// NB: This example uses server side apply and beta1 customresources
// Please test against Kubernetes 1.16.X!
// Own custom resource
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "clux.dev", version = "v1", kind = "Foo", namespaced)]
#[kube(status = "FooStatus")]
#[kube(scale = r#"{"specReplicasPath":".spec.replicas", "statusReplicasPath":".status.replicas"}"#)]
pub struct FooSpec {
name: String,
info: Option<String>,
replicas: isize,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
pub struct FooStatus {
is_bad: bool,
replicas: isize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=info");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());
let ssapply = PatchParams::apply("crd_apply_example").force();
// 0. Apply the CRD
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
info!("Creating crd: {}", serde_yaml::to_string(&Foo::crd())?);
crds.patch("foos.clux.dev", &ssapply, &Patch::Apply(Foo::crd()))
.await?;
wait_for_crd_ready(&crds).await?; // wait for k8s to deal with it
// Start applying foos
let foos: Api<Foo> = Api::namespaced(client.clone(), &namespace);
// 1. Apply from a full struct (e.g. equivalent to replace w/o resource_version)
let foo = Foo::new("baz", FooSpec {
name: "baz".into(),
info: Some("old baz".into()),
replicas: 3,
});
info!("Applying 1: \n{}", serde_yaml::to_string(&foo)?);
let o = foos.patch("baz", &ssapply, &Patch::Apply(&foo)).await?;
// NB: kubernetes < 1.20 will fail to admit scale subresources - see #387
info!("Applied 1 {}: {:?}", o.name(), o.spec);
// 2. Apply from partial json!
let patch = serde_json::json!({
"apiVersion": "clux.dev/v1",
"kind": "Foo",
"spec": {
"name": "foo",
"replicas": 2
}
});
info!("Applying 2: \n{}", serde_yaml::to_string(&patch)?);
let o2 = foos.patch("baz", &ssapply, &Patch::Apply(patch)).await?;
info!("Applied 2 {}: {:?}", o2.name(), o2.spec);
Ok(())
}
async fn wait_for_crd_ready(crds: &Api<CustomResourceDefinition>) -> anyhow::Result<()> {
if crds.get("foos.clux.dev").await.is_ok() {
return Ok(());
}
// Wait for the apply to take place (takes a sec or two during first install)
let lp = ListParams::default()
.fields(&format!("metadata.name={}", "foos.clux.dev")) // our crd only
.timeout(5); // should not take long
let mut stream = crds.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(s) = status {
info!("Modify event for {}", s.name());
if let Some(s) = s.status {
if let Some(conds) = s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "NamesAccepted") {
if pcond.status == "True" {
info!("crd was accepted: {:?}", pcond);
return Ok(());
}
}
}
}
}
}
Err(anyhow::anyhow!("Timed out waiting for crd to become accepted"))
}