Skip to content

Commit

Permalink
Minor docs improvements/fixes, deny fix + macros dep fix (#1166)
Browse files Browse the repository at this point in the history
* Docs for reflector + better docs for category

Also fixes a misleading import in the event recorder
that made it seem like we were using core::v1::Event.

Signed-off-by: clux <sszynrae@gmail.com>

* better event recorder documentation + hermit abi deny

Signed-off-by: clux <sszynrae@gmail.com>

* skip syn for now

Signed-off-by: clux <sszynrae@gmail.com>

* fix build

Signed-off-by: clux <sszynrae@gmail.com>

* sometimes need to wait an extra second for all logs to come through

Signed-off-by: clux <sszynrae@gmail.com>

* add tokio/macros to ws depedencies - fixes #1168

Signed-off-by: clux <sszynrae@gmail.com>

* line buffering changed from kubernetes 1.26

Signed-off-by: clux <sszynrae@gmail.com>

* ok dont try to test any buffering

Signed-off-by: clux <sszynrae@gmail.com>

* fix new docs with watcher::Config

Signed-off-by: clux <sszynrae@gmail.com>

* fix doc issues from previous pr

Signed-off-by: clux <sszynrae@gmail.com>

---------

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux authored Mar 27, 2023
1 parent 30a0c39 commit aed07be
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 34 deletions.
7 changes: 7 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ version = "0.2"
name = "windows-sys"
version = "0.42"

[[bans.skip]]
name = "hermit-abi"

[[bans.skip]]
# Needs a complicated upgrade
name = "syn"

[[bans.skip]]
# waiting for pem to bump base64
# /~https://github.com/jcreekmore/pem-rs/blob/master/Cargo.toml#L16
Expand Down
2 changes: 1 addition & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ edition = "2021"
default = ["client", "openssl-tls"]
rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"]
openssl-tls = ["openssl", "hyper-openssl"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"]
oauth = ["client", "tame-oauth"]
gzip = ["client", "tower-http/decompression-gzip"]
client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"]
Expand Down
15 changes: 7 additions & 8 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,19 @@ mod test {
..LogParams::default()
};
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed();
let log_line = logs_stream.try_next().await?.unwrap();
assert_eq!(log_line, "kube 1\n");

// wait for container to finish
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

// remaining logs should have been buffered internally
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 2\n");
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 3\n");
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 4\n");
assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 5\n");
// individual logs may or may not buffer
let mut output = String::new();
while let Some(line) = logs_stream.try_next().await? {
output.push_str(&String::from_utf8_lossy(&line));
}
assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

// evict the pod
let ep = EvictParams::default();
Expand Down
8 changes: 5 additions & 3 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::fmt;
use crate::request::Error;
use serde::Serialize;

/// Specifies how the resourceVersion parameter is applied. resourceVersionMatch may only be set if resourceVersion is also set.
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.
/// Controls how the resourceVersion parameter is applied
///
/// This embeds the resource version when using the `NotOlderThan` or `Exact` variants.
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list> for details.
#[derive(Clone, Debug, Default, PartialEq)]
pub enum VersionMatch {
/// Matches data with the latest version available in the kube-apiserver database (etcd) (quorum read required).
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct ListParams {
pub continue_token: Option<String>,

/// Determines how resourceVersion is applied to list calls.
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> for
/// details.
pub version_match: VersionMatch,
}
Expand Down
3 changes: 3 additions & 0 deletions kube-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ mod custom_resource;
/// ## `#[kube(shortname = "sn")]`
/// Add a single shortname to the generated crd.
///
/// ## `#[kube(category = "apps")]`
/// Add a single category to `crd.spec.names.categories`.
///
/// ## Example with all properties
///
/// ```rust
Expand Down
14 changes: 7 additions & 7 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,10 @@ where
///
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
///
/// [`Config`]: kube_runtime::watcher::Config
/// [`Config`]: crate::watcher::Config
/// [`Api`]: kube_client::Api
/// [`dynamic`]: kube_client::core::dynamic
/// [`Config::default`]: kube_runtime::watcher::Config::default
/// [`Config::default`]: crate::watcher::Config::default
pub fn new_with(owned_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
Expand Down Expand Up @@ -552,9 +552,9 @@ where
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
/// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
///
/// The [`ListParams`] refer to the possible subset of `Child` objects that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// To watch the full set of `Child` objects in the given `Api` scope, you can use [`ListParams::default`].
/// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
///
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
#[must_use]
Expand Down Expand Up @@ -593,9 +593,9 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
///
/// The [`ListParams`] refer to the possible subset of `Watched` objects that you want the [`Api`]
/// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
/// to watch - in the Api's configured scope - and run through the custom mapper.
/// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`ListParams::default`].
/// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
///
/// # Example
///
Expand Down
30 changes: 20 additions & 10 deletions kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Publishes events for objects for kubernetes >= 1.19
use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event as CoreEvent},
api::{core::v1::ObjectReference, events::v1::Event as K8sEvent},
apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
chrono::Utc,
};
Expand All @@ -21,6 +21,7 @@ pub struct Event {
/// The short reason explaining why the `action` was taken.
///
/// This must be at most 128 characters, and is often PascalCased. Shows up in `kubectl describe` as `Reason`.
/// Usually denoted
pub reason: String,

/// A optional description of the status of the `action`.
Expand All @@ -31,6 +32,8 @@ pub struct Event {
/// The action that was taken (either successfully or unsuccessfully) against main object
///
/// This must be at most 128 characters. It does not currently show up in `kubectl describe`.
/// A common convention is a short identifier of the action that caused the outcome described in `reason`.
/// Usually denoted in `PascalCase`.
pub action: String,

/// Optional secondary object related to the main object
Expand Down Expand Up @@ -126,10 +129,7 @@ impl From<&str> for Reporter {
/// specified when building the recorder using [`Recorder::new`].
///
/// ```
/// use kube::{
/// core::Resource,
/// runtime::events::{Reporter, Recorder, Event, EventType}
/// };
/// use kube::runtime::events::{Reporter, Recorder, Event, EventType};
/// use k8s_openapi::api::core::v1::ObjectReference;
///
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -161,9 +161,19 @@ impl From<&str> for Reporter {
///
/// Events attached to an object will be shown in the `Events` section of the output of
/// of `kubectl describe` for that object.
///
/// ## RBAC
///
/// Note that usage of the event recorder minimally requires the following RBAC rules:
///
/// ```yaml
/// - apiGroups: ["events.k8s.io"]
/// resources: ["events"]
/// verbs: ["create"]
/// ```
#[derive(Clone)]
pub struct Recorder {
events: Api<CoreEvent>,
events: Api<K8sEvent>,
reporter: Reporter,
reference: ObjectReference,
}
Expand Down Expand Up @@ -202,7 +212,7 @@ impl Recorder {
// for more detail on the fields
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
self.events
.create(&PostParams::default(), &CoreEvent {
.create(&PostParams::default(), &K8sEvent {
action: Some(ev.action),
reason: Some(ev.reason),
deprecated_count: None,
Expand Down Expand Up @@ -241,7 +251,7 @@ mod test {
#![allow(unused_imports)]

use k8s_openapi::api::{
core::v1::{Event as CoreEvent, Service},
core::v1::{Event as K8sEvent, Service},
rbac::v1::ClusterRole,
};
use kube_client::{Api, Client, Resource};
Expand All @@ -265,7 +275,7 @@ mod test {
secondary: None,
})
.await?;
let events: Api<CoreEvent> = Api::namespaced(client, "default");
let events: Api<K8sEvent> = Api::namespaced(client, "default");

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
Expand Down Expand Up @@ -294,7 +304,7 @@ mod test {
secondary: None,
})
.await?;
let events: Api<CoreEvent> = Api::namespaced(client, "kube-system");
let events: Api<K8sEvent> = Api::namespaced(client, "kube-system");

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
Expand Down
81 changes: 77 additions & 4 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,85 @@ use kube_client::Resource;
use std::hash::Hash;
pub use store::{store, Store};

/// Caches objects from `watcher::Event`s to a local `Store`
/// Cache objects from a [`watcher()`] stream into a local [`Store`]
///
/// Keep in mind that the `Store` is just a cache, and may be out of date.
/// Observes the raw [`Stream`] of [`watcher::Event`] objects, and modifies the cache.
/// It passes the raw [`watcher()`] stream through unmodified.
///
/// Note: It is a bad idea to feed a single `reflector` from multiple `watcher`s, since
/// the whole `Store` will be cleared whenever any of them emits a `Restarted` event.
/// ## Usage
/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-clonable,
/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
/// that you can send to other parts of your program as state.
///
/// The cache contains the last-seen state of objects,
/// which may lag slightly behind the actual state.
///
/// ## Example
///
/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
///
/// The `reader` part being passed around to a webserver is omitted.
/// For examples see [version-rs](/~https://github.com/kube-rs/version-rs) for integration with [axum](/~https://github.com/tokio-rs/axum),
/// or [controller-rs](/~https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
///
/// ```no_run
/// use k8s_openapi::api::core::v1::Node;
/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
/// use futures::{StreamExt, future::ready};
/// # use kube::api::Api;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
///
/// let nodes: Api<Node> = Api::all(client);
/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
/// let (reader, writer) = reflector::store();
///
/// // Create the infinite reflector stream
/// let rf = reflector(writer, watcher(nodes, node_filter));
///
/// // !!! pass reader to your webserver/manager as state !!!
///
/// // Poll the stream (needed to keep the store up-to-date)
/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
/// infinite_watch.await;
/// # Ok(())
/// # }
/// ```
///
///
/// ## Memory Usage
///
/// A reflector often constitutes one of the biggest components of a controller's memory use.
/// Given ~two thousand pods in a cluster, a reflector around that quickly consumes 1GB of memory.
///
/// While, sometimes acceptible, there are techniques you can leverage to reduce the memory usage
/// depending on your use case.
///
/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
///
/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
/// and this will can drop your memory usage by more than a factor of two,
/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
///
/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
///
/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
///
/// ```no_run
/// # use futures::TryStreamExt;
/// # use kube::{ResourceExt, Api, runtime::watcher};
/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
/// let stream = watcher(api, Default::default()).map_ok(|ev| {
/// ev.modify(|pod| {
/// pod.managed_fields_mut().clear();
/// pod.annotations_mut().clear();
/// pod.status = None;
/// })
/// });
/// ```
/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Resource + Clone,
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub struct Config {
pub timeout: Option<u32>,

/// Determines how resourceVersion is applied to list calls.
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> for
/// details.
pub version_match: VersionMatch,

Expand Down

0 comments on commit aed07be

Please sign in to comment.