Skip to content

Commit

Permalink
Split ListParams and WatchParams (#1162)
Browse files Browse the repository at this point in the history
* Add resourceVersion to ListParams

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Apply rustfmt

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* More validations for resource version options

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Add hack to allow the watcher to work

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Split ListParams and WatchParams

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Fix doc tests and rustfmt

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Remove continue_tooken and limit from watcher config

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Fix clippy errors, add more tests

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Migrate to 4 varients enum

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

* Add more tests and derive ListParams default

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>

---------

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>
  • Loading branch information
nabokihms authored Mar 27, 2023
1 parent 2e5e4de commit 30a0c39
Show file tree
Hide file tree
Showing 26 changed files with 593 additions and 247 deletions.
11 changes: 7 additions & 4 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use anyhow::Result;
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
runtime::controller::{Action, Controller},
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Controller},
watcher,
},
Client, CustomResource,
};
use schemars::JsonSchema;
Expand Down Expand Up @@ -99,8 +102,8 @@ async fn main() -> Result<()> {
}
});

Controller::new(cmgs, ListParams::default())
.owns(cms, ListParams::default())
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
Expand Down
8 changes: 4 additions & 4 deletions examples/crd_derive_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::{
api::{
Api, ApiResource, DeleteParams, DynamicObject, GroupVersionKind, ListParams, Patch, PatchParams,
PostParams, WatchEvent,
Api, ApiResource, DeleteParams, DynamicObject, GroupVersionKind, Patch, PatchParams, PostParams,
WatchEvent, WatchParams,
},
runtime::wait::{await_condition, conditions},
Client, CustomResource, CustomResourceExt,
Expand Down Expand Up @@ -241,10 +241,10 @@ async fn delete_crd(client: Client) -> Result<()> {

// Wait until deleted
let timeout_secs = 15;
let lp = ListParams::default()
let wp = WatchParams::default()
.fields("metadata.name=foos.clux.dev")
.timeout(timeout_secs);
let mut stream = api.watch(&lp, "0").await?.boxed_local();
let mut stream = api.watch(&wp, "0").await?.boxed_local();
while let Some(status) = stream.try_next().await? {
if let WatchEvent::Deleted(_) = status {
return Ok(());
Expand Down
6 changes: 3 additions & 3 deletions examples/crd_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomRe
use tracing::*;

use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
api::{Api, Patch, PatchParams, ResourceExt},
runtime::{reflector, watcher, WatchStreamExt},
Client, CustomResource, CustomResourceExt,
};
Expand Down Expand Up @@ -36,8 +36,8 @@ async fn main() -> anyhow::Result<()> {
let (reader, writer) = reflector::store::<Foo>();

let foos: Api<Foo> = Api::default_namespaced(client);
let lp = ListParams::default().timeout(20); // low timeout in this example
let rf = reflector(writer, watcher(foos, lp));
let wc = watcher::Config::default().timeout(20); // low timeout in this example
let rf = reflector(writer, watcher(foos, wc));

tokio::spawn(async move {
loop {
Expand Down
8 changes: 4 additions & 4 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{Stream, StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, Resource, ResourceExt},
api::{Api, DynamicObject, GroupVersionKind, Resource, ResourceExt},
runtime::{metadata_watcher, watcher, watcher::Event, WatchStreamExt},
};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -28,13 +28,13 @@ async fn main() -> anyhow::Result<()> {

// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);
let lp = ListParams::default();
let wc = watcher::Config::default();

// Start a metadata or a full resource watch
if watch_metadata {
handle_events(metadata_watcher(api, lp)).await
handle_events(metadata_watcher(api, wc)).await
} else {
handle_events(watcher(api, lp)).await
handle_events(watcher(api, wc)).await
}
}

Expand Down
6 changes: 3 additions & 3 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
api::{Api, ListParams},
api::Api,
runtime::{watcher, WatchStreamExt},
Client,
};
Expand All @@ -13,9 +13,9 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;

let events: Api<Event> = Api::all(client);
let lp = ListParams::default();
let wc = watcher::Config::default();

let ew = watcher(events, lp).applied_objects();
let ew = watcher(events, wc).applied_objects();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand Down
14 changes: 10 additions & 4 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ impl App {
Ok(())
}

async fn watch(&self, api: Api<DynamicObject>, mut lp: ListParams) -> Result<()> {
async fn watch(&self, api: Api<DynamicObject>, mut wc: watcher::Config) -> Result<()> {
if let Some(n) = &self.name {
lp = lp.fields(&format!("metadata.name={n}"));
wc = wc.fields(&format!("metadata.name={n}"));
}
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).applied_objects().boxed();
let mut stream = watcher(api, wc).applied_objects().boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
Expand Down Expand Up @@ -201,14 +201,20 @@ async fn main() -> Result<()> {
if let Some(label) = &app.selector {
lp = lp.labels(label);
}

let mut wc = watcher::Config::default();
if let Some(label) = &app.selector {
wc = wc.labels(label);
}

let api = dynamic_api(ar, caps, client, &app.namespace, app.all);

tracing::info!(?app.verb, ?resource, name = ?app.name.clone().unwrap_or_default(), "requested objects");
match app.verb {
Verb::Edit => app.edit(api).await?,
Verb::Get => app.get(api, lp).await?,
Verb::Delete => app.delete(api, lp).await?,
Verb::Watch => app.watch(api, lp).await?,
Verb::Watch => app.watch(api, wc).await?,
Verb::Apply => bail!("verb {:?} cannot act on an explicit resource", app.verb),
}
} else if app.verb == Verb::Apply {
Expand Down
8 changes: 4 additions & 4 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use k8s_openapi::api::{
core::v1::{ConfigMap, Secret},
};
use kube::{
api::{Api, ListParams, ResourceExt},
api::{Api, ResourceExt},
runtime::{watcher, WatchStreamExt},
Client,
};
Expand All @@ -18,9 +18,9 @@ async fn main() -> anyhow::Result<()> {
let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
let secret: Api<Secret> = Api::default_namespaced(client.clone());
let dep_watcher = watcher(deploys, ListParams::default());
let cm_watcher = watcher(cms, ListParams::default());
let sec_watcher = watcher(secret, ListParams::default());
let dep_watcher = watcher(deploys, watcher::Config::default());
let cm_watcher = watcher(cms, watcher::Config::default());
let sec_watcher = watcher(secret, watcher::Config::default());

// select on applied events from all watchers
let mut combo_stream = stream::select_all(vec![
Expand Down
6 changes: 3 additions & 3 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ListParams, ResourceExt},
api::{Api, ResourceExt},
runtime::{reflector, watcher, WatchStreamExt},
Client,
};
Expand All @@ -13,12 +13,12 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;

let nodes: Api<Node> = Api::all(client.clone());
let lp = ListParams::default()
let wc = watcher::Config::default()
.labels("kubernetes.io/arch=amd64") // filter instances by label
.timeout(10); // short watch timeout in this example

let (reader, writer) = reflector::store();
let rf = reflector(writer, watcher(nodes, lp));
let rf = reflector(writer, watcher(nodes, wc));

// Periodically read our state in the background
tokio::spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, lp)
let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, wc)
.backoff(ExponentialBackoff::default())
.applied_objects();

Expand Down
6 changes: 3 additions & 3 deletions examples/pod_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{
Api, AttachParams, AttachedProcess, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent,
Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams,
},
Client,
};
Expand Down Expand Up @@ -35,8 +35,8 @@ async fn main() -> anyhow::Result<()> {
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let wp = WatchParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
Expand Down
6 changes: 3 additions & 3 deletions examples/pod_cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use k8s_openapi::api::core::v1::Pod;
use tracing::*;

use kube::{
api::{Api, AttachParams, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
api::{Api, AttachParams, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams},
Client,
};
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -34,8 +34,8 @@ async fn main() -> anyhow::Result<()> {
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let wp = WatchParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
Expand Down
6 changes: 3 additions & 3 deletions examples/pod_evict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde_json::json;
use tracing::*;

use kube::{
api::{Api, EvictParams, ListParams, PostParams, ResourceExt, WatchEvent},
api::{Api, EvictParams, PostParams, ResourceExt, WatchEvent, WatchParams},
Client,
};

Expand Down Expand Up @@ -35,10 +35,10 @@ async fn main() -> anyhow::Result<()> {
pods.create(&pp, &empty_pod).await?;

// Wait until the pod is running, although it's not necessary
let lp = ListParams::default()
let wp = WatchParams::default()
.fields("metadata.name=empty-pod")
.timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
Expand Down
6 changes: 3 additions & 3 deletions examples/pod_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::*;

use kube::{
api::{
Api, AttachParams, AttachedProcess, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent,
Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams,
},
Client,
};
Expand Down Expand Up @@ -34,8 +34,8 @@ async fn main() -> anyhow::Result<()> {
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let wp = WatchParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/pod_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
api::Api,
runtime::{reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
Expand All @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> {
}
});

let stream = watcher(api, ListParams::default()).map_ok(|ev| {
let stream = watcher(api, watcher::Config::default()).map_ok(|ev| {
ev.modify(|pod| {
// memory optimization for our store - we don't care about fields/annotations/status
pod.managed_fields_mut().clear();
Expand Down
6 changes: 3 additions & 3 deletions examples/pod_shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use k8s_openapi::api::core::v1::Pod;
use tracing::*;

use kube::{
api::{Api, AttachParams, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
api::{Api, AttachParams, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams},
Client,
};

Expand Down Expand Up @@ -31,8 +31,8 @@ async fn main() -> anyhow::Result<()> {
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let wp = WatchParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/pod_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::prelude::*;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams, ResourceExt},
api::{Api, ResourceExt},
runtime::{watcher, WatchStreamExt},
Client,
};
Expand All @@ -13,7 +13,7 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let api = Api::<Pod>::default_namespaced(client);

watcher(api, ListParams::default())
watcher(api, watcher::Config::default())
.applied_objects()
.try_for_each(|p| async move {
info!("saw {}", p.name_any());
Expand Down
6 changes: 3 additions & 3 deletions examples/secret_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Secret;
use kube::{
api::{Api, ListParams, ResourceExt},
api::{Api, ResourceExt},
runtime::{reflector, reflector::Store, watcher, WatchStreamExt},
Client,
};
Expand Down Expand Up @@ -53,10 +53,10 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;

let secrets: Api<Secret> = Api::default_namespaced(client);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let wc = watcher::Config::default().timeout(10); // short watch timeout in this example

let (reader, writer) = reflector::store::<Secret>();
let rf = reflector(writer, watcher(secrets, lp));
let rf = reflector(writer, watcher(secrets, wc));

spawn_periodic_reader(reader); // read from a reader in the background

Expand Down
5 changes: 3 additions & 2 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
use futures::StreamExt;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use kube::{
api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
runtime::{
controller::{Action, Controller},
finalizer::{finalizer, Event},
watcher,
},
};
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -78,7 +79,7 @@ async fn main() -> anyhow::Result<()> {
let client = kube::Client::try_default().await?;
Controller::new(
Api::<ConfigMap>::all(client.clone()),
ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
watcher::Config::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
)
.run(
|cm, _| {
Expand Down
Loading

0 comments on commit 30a0c39

Please sign in to comment.