From aed07be9b3aa87d2f36f2a7254a6fa3f86065f0b Mon Sep 17 00:00:00 2001 From: Eirik A Date: Mon, 27 Mar 2023 16:21:21 +0100 Subject: [PATCH] Minor docs improvements/fixes, deny fix + macros dep fix (#1166) * Docs for reflector + better docs for category Also fixes a misleading import in the event recorder that made it seem like we were using core::v1::Event. Signed-off-by: clux * better event recorder documentation + hermit abi deny Signed-off-by: clux * skip syn for now Signed-off-by: clux * fix build Signed-off-by: clux * sometimes need to wait an extra second for all logs to come through Signed-off-by: clux * add tokio/macros to ws depedencies - fixes #1168 Signed-off-by: clux * line buffering changed from kubernetes 1.26 Signed-off-by: clux * ok dont try to test any buffering Signed-off-by: clux * fix new docs with watcher::Config Signed-off-by: clux * fix doc issues from previous pr Signed-off-by: clux --------- Signed-off-by: clux --- deny.toml | 7 +++ kube-client/Cargo.toml | 2 +- kube-client/src/lib.rs | 15 +++--- kube-core/src/params.rs | 8 +-- kube-derive/src/lib.rs | 3 ++ kube-runtime/src/controller/mod.rs | 14 +++--- kube-runtime/src/events.rs | 30 +++++++---- kube-runtime/src/reflector/mod.rs | 81 ++++++++++++++++++++++++++++-- kube-runtime/src/watcher.rs | 2 +- 9 files changed, 128 insertions(+), 34 deletions(-) diff --git a/deny.toml b/deny.toml index 133c609e8..8a5e24922 100644 --- a/deny.toml +++ b/deny.toml @@ -88,6 +88,13 @@ version = "0.2" name = "windows-sys" version = "0.42" +[[bans.skip]] +name = "hermit-abi" + +[[bans.skip]] +# Needs a complicated upgrade +name = "syn" + [[bans.skip]] # waiting for pem to bump base64 # /~https://github.com/jcreekmore/pem-rs/blob/master/Cargo.toml#L16 diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index c2f31263d..9dde2f572 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -19,7 +19,7 @@ edition = "2021" default = ["client", "openssl-tls"] rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"] openssl-tls = ["openssl", "hyper-openssl"] -ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"] +ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"] oauth = ["client", "tame-oauth"] gzip = ["client", "tower-http/decompression-gzip"] client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"] diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index 2def5d00b..ed593bd04 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -445,20 +445,19 @@ mod test { ..LogParams::default() }; let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed(); - let log_line = logs_stream.try_next().await?.unwrap(); - assert_eq!(log_line, "kube 1\n"); // wait for container to finish - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; let all_logs = pods.logs("busybox-kube3", &Default::default()).await?; assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); - // remaining logs should have been buffered internally - assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 2\n"); - assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 3\n"); - assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 4\n"); - assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 5\n"); + // individual logs may or may not buffer + let mut output = String::new(); + while let Some(line) = logs_stream.try_next().await? { + output.push_str(&String::from_utf8_lossy(&line)); + } + assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); // evict the pod let ep = EvictParams::default(); diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 9f25ee4d4..3aaab39bb 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -4,8 +4,10 @@ use std::fmt; use crate::request::Error; use serde::Serialize; -/// Specifies how the resourceVersion parameter is applied. resourceVersionMatch may only be set if resourceVersion is also set. -/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details. +/// Controls how the resourceVersion parameter is applied +/// +/// This embeds the resource version when using the `NotOlderThan` or `Exact` variants. +/// See for details. #[derive(Clone, Debug, Default, PartialEq)] pub enum VersionMatch { /// Matches data with the latest version available in the kube-apiserver database (etcd) (quorum read required). @@ -61,7 +63,7 @@ pub struct ListParams { pub continue_token: Option, /// Determines how resourceVersion is applied to list calls. - /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for + /// See for /// details. pub version_match: VersionMatch, } diff --git a/kube-derive/src/lib.rs b/kube-derive/src/lib.rs index ea0c74ff7..2c124761b 100644 --- a/kube-derive/src/lib.rs +++ b/kube-derive/src/lib.rs @@ -139,6 +139,9 @@ mod custom_resource; /// ## `#[kube(shortname = "sn")]` /// Add a single shortname to the generated crd. /// +/// ## `#[kube(category = "apps")]` +/// Add a single category to `crd.spec.names.categories`. +/// /// ## Example with all properties /// /// ```rust diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index b85e7cab7..98affaece 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -500,10 +500,10 @@ where /// /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types. /// - /// [`Config`]: kube_runtime::watcher::Config + /// [`Config`]: crate::watcher::Config /// [`Api`]: kube_client::Api /// [`dynamic`]: kube_client::core::dynamic - /// [`Config::default`]: kube_runtime::watcher::Config::default + /// [`Config::default`]: crate::watcher::Config::default pub fn new_with(owned_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { let writer = Writer::::new(dyntype.clone()); let reader = writer.as_reader(); @@ -552,9 +552,9 @@ where /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`. /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`. /// - /// The [`ListParams`] refer to the possible subset of `Child` objects that you want the [`Api`] - /// to watch - in the Api's configured scope - and receive reconcile events for. - /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`ListParams::default`]. + /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`] + /// to watch - in the Api's configured scope - and receive reconcile events for. + /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`]. /// /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference #[must_use] @@ -593,9 +593,9 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`. /// - /// The [`ListParams`] refer to the possible subset of `Watched` objects that you want the [`Api`] + /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`] /// to watch - in the Api's configured scope - and run through the custom mapper. - /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`ListParams::default`]. + /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`]. /// /// # Example /// diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index ba46da230..cfc6603aa 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -1,6 +1,6 @@ //! Publishes events for objects for kubernetes >= 1.19 use k8s_openapi::{ - api::{core::v1::ObjectReference, events::v1::Event as CoreEvent}, + api::{core::v1::ObjectReference, events::v1::Event as K8sEvent}, apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta}, chrono::Utc, }; @@ -21,6 +21,7 @@ pub struct Event { /// The short reason explaining why the `action` was taken. /// /// This must be at most 128 characters, and is often PascalCased. Shows up in `kubectl describe` as `Reason`. + /// Usually denoted pub reason: String, /// A optional description of the status of the `action`. @@ -31,6 +32,8 @@ pub struct Event { /// The action that was taken (either successfully or unsuccessfully) against main object /// /// This must be at most 128 characters. It does not currently show up in `kubectl describe`. + /// A common convention is a short identifier of the action that caused the outcome described in `reason`. + /// Usually denoted in `PascalCase`. pub action: String, /// Optional secondary object related to the main object @@ -126,10 +129,7 @@ impl From<&str> for Reporter { /// specified when building the recorder using [`Recorder::new`]. /// /// ``` -/// use kube::{ -/// core::Resource, -/// runtime::events::{Reporter, Recorder, Event, EventType} -/// }; +/// use kube::runtime::events::{Reporter, Recorder, Event, EventType}; /// use k8s_openapi::api::core::v1::ObjectReference; /// /// # async fn wrapper() -> Result<(), Box> { @@ -161,9 +161,19 @@ impl From<&str> for Reporter { /// /// Events attached to an object will be shown in the `Events` section of the output of /// of `kubectl describe` for that object. +/// +/// ## RBAC +/// +/// Note that usage of the event recorder minimally requires the following RBAC rules: +/// +/// ```yaml +/// - apiGroups: ["events.k8s.io"] +/// resources: ["events"] +/// verbs: ["create"] +/// ``` #[derive(Clone)] pub struct Recorder { - events: Api, + events: Api, reporter: Reporter, reference: ObjectReference, } @@ -202,7 +212,7 @@ impl Recorder { // for more detail on the fields // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125 self.events - .create(&PostParams::default(), &CoreEvent { + .create(&PostParams::default(), &K8sEvent { action: Some(ev.action), reason: Some(ev.reason), deprecated_count: None, @@ -241,7 +251,7 @@ mod test { #![allow(unused_imports)] use k8s_openapi::api::{ - core::v1::{Event as CoreEvent, Service}, + core::v1::{Event as K8sEvent, Service}, rbac::v1::ClusterRole, }; use kube_client::{Api, Client, Resource}; @@ -265,7 +275,7 @@ mod test { secondary: None, }) .await?; - let events: Api = Api::namespaced(client, "default"); + let events: Api = Api::namespaced(client, "default"); let event_list = events.list(&Default::default()).await?; let found_event = event_list @@ -294,7 +304,7 @@ mod test { secondary: None, }) .await?; - let events: Api = Api::namespaced(client, "kube-system"); + let events: Api = Api::namespaced(client, "kube-system"); let event_list = events.list(&Default::default()).await?; let found_event = event_list diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 8932456c5..339a3297b 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -10,12 +10,85 @@ use kube_client::Resource; use std::hash::Hash; pub use store::{store, Store}; -/// Caches objects from `watcher::Event`s to a local `Store` +/// Cache objects from a [`watcher()`] stream into a local [`Store`] /// -/// Keep in mind that the `Store` is just a cache, and may be out of date. +/// Observes the raw [`Stream`] of [`watcher::Event`] objects, and modifies the cache. +/// It passes the raw [`watcher()`] stream through unmodified. /// -/// Note: It is a bad idea to feed a single `reflector` from multiple `watcher`s, since -/// the whole `Store` will be cleared whenever any of them emits a `Restarted` event. +/// ## Usage +/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-clonable, +/// and must be moved into the reflector. The `reader` part is the [`Store`] interface +/// that you can send to other parts of your program as state. +/// +/// The cache contains the last-seen state of objects, +/// which may lag slightly behind the actual state. +/// +/// ## Example +/// +/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label. +/// +/// The `reader` part being passed around to a webserver is omitted. +/// For examples see [version-rs](/~https://github.com/kube-rs/version-rs) for integration with [axum](/~https://github.com/tokio-rs/axum), +/// or [controller-rs](/~https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/). +/// +/// ```no_run +/// use k8s_openapi::api::core::v1::Node; +/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config}; +/// use futures::{StreamExt, future::ready}; +/// # use kube::api::Api; +/// # async fn wrapper() -> Result<(), Box> { +/// # let client: kube::Client = todo!(); +/// +/// let nodes: Api = Api::all(client); +/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64"); +/// let (reader, writer) = reflector::store(); +/// +/// // Create the infinite reflector stream +/// let rf = reflector(writer, watcher(nodes, node_filter)); +/// +/// // !!! pass reader to your webserver/manager as state !!! +/// +/// // Poll the stream (needed to keep the store up-to-date) +/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) }); +/// infinite_watch.await; +/// # Ok(()) +/// # } +/// ``` +/// +/// +/// ## Memory Usage +/// +/// A reflector often constitutes one of the biggest components of a controller's memory use. +/// Given ~two thousand pods in a cluster, a reflector around that quickly consumes 1GB of memory. +/// +/// While, sometimes acceptible, there are techniques you can leverage to reduce the memory usage +/// depending on your use case. +/// +/// 1. Reflect a [`PartialObjectMeta`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K` +/// +/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`], +/// and this will can drop your memory usage by more than a factor of two, +/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical. +/// +/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties +/// +/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped: +/// +/// ```no_run +/// # use futures::TryStreamExt; +/// # use kube::{ResourceExt, Api, runtime::watcher}; +/// # let api: Api = todo!(); +/// let stream = watcher(api, Default::default()).map_ok(|ev| { +/// ev.modify(|pod| { +/// pod.managed_fields_mut().clear(); +/// pod.annotations_mut().clear(); +/// pod.status = None; +/// }) +/// }); +/// ``` +/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store. +/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on. +/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`. pub fn reflector(mut writer: store::Writer, stream: W) -> impl Stream where K: Resource + Clone, diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index f1d2a1034..6f81ef070 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -174,7 +174,7 @@ pub struct Config { pub timeout: Option, /// Determines how resourceVersion is applied to list calls. - /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for + /// See for /// details. pub version_match: VersionMatch,