From 8144c7b15b0f5ea35b8c5a441b7f8407fa14a384 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Sun, 2 Jul 2023 12:35:37 +0530 Subject: [PATCH] modify `log_stream` to return an async buf reader Modify `kube_client::api::subresource::Api::log_stream()` to return an async buf reader (`futures::io::AsyncBufRead`) that can read logs asynchronously. Users can call call `.lines()` on it to get a newline buffered stream of logs. Signed-off-by: Sanskar Jaiswal --- examples/log_stream.rs | 6 +++--- kube-client/src/api/subresource.rs | 20 ++++++++++++++++---- kube-client/src/client/mod.rs | 22 ++++++++++++---------- kube-client/src/lib.rs | 10 +++++----- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/examples/log_stream.rs b/examples/log_stream.rs index 19fc1c3ff..a708f9585 100644 --- a/examples/log_stream.rs +++ b/examples/log_stream.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use futures::{StreamExt, TryStreamExt}; +use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::api::core::v1::Pod; use kube::{ api::{Api, LogParams}, @@ -26,10 +26,10 @@ async fn main() -> Result<()> { ..LogParams::default() }) .await? - .boxed(); + .lines(); while let Some(line) = logs.try_next().await? { - info!("{:?}", String::from_utf8_lossy(&line)); + info!("{}", line); } Ok(()) } diff --git a/kube-client/src/api/subresource.rs b/kube-client/src/api/subresource.rs index f1876737b..db1a0125f 100644 --- a/kube-client/src/api/subresource.rs +++ b/kube-client/src/api/subresource.rs @@ -1,5 +1,4 @@ -use bytes::Bytes; -use futures::Stream; +use futures::{AsyncBufRead}; use serde::de::DeserializeOwned; use std::fmt::Debug; @@ -248,8 +247,21 @@ where self.client.request_text(req).await } - /// Fetch logs as a stream of bytes - pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result>> { + /// Fetch logs in the form of an async buf reader + /// + /// # Example + /// + /// ```no_run + /// let pods: Api = Api::default_namespaced(client); + /// let mut logs = pods + /// .log_stream(&mypod, LogParams::default()).await? + /// .lines(); + /// + /// while let Some(line) = logs.try_next().await? { + /// info!("{}", line); + /// } + /// ``` + pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result { let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?; req.extensions_mut().insert("log_stream"); self.client.request_text_stream(req).await diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index e17e46599..a78d2741b 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -7,9 +7,8 @@ //! //! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically //! retrieve the resources served by the kubernetes API. -use bytes::Bytes; use either::{Either, Left, Right}; -use futures::{self, Stream, StreamExt, TryStream, TryStreamExt}; +use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt}; use http::{self, Request, Response, StatusCode}; use hyper::Body; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; @@ -238,15 +237,18 @@ impl Client { Ok(text) } - /// Perform a raw HTTP request against the API and get back the response - /// as a stream of bytes - pub async fn request_text_stream( - &self, - request: Request>, - ) -> Result>> { + /// Perform a raw HTTP request against the API, get back the response + /// and return it as an object that implements [`AsyncBufRead`] and thus + /// [`AsyncBufReadExt`]. Users can call `.lines()` on it to get a newline + /// buffered stream of logs. + pub async fn request_text_stream(&self, request: Request>) -> Result { let res = self.send(request.map(Body::from)).await?; - // trace!("Status = {:?} for {}", res.status(), res.url()); - Ok(res.into_body().map_err(Error::HyperError)) + // Map the error as we want to convert this into an `AsyncBufReader` using + // `into_async_read` which specifies `std::io::Error` as the stream's error type. + let body = res + .into_body() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + Ok(body.into_async_read()) } /// Perform a raw HTTP request against the API and get back either an object diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index c92cf83b0..911c41056 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -136,7 +136,7 @@ mod test { client::ConfigExt, Api, Client, Config, ResourceExt, }; - use futures::{StreamExt, TryStreamExt}; + use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::Pod; use kube_core::{ params::{DeleteParams, Patch, WatchParams}, @@ -444,7 +444,7 @@ mod test { follow: true, ..LogParams::default() }; - let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed(); + let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines(); // wait for container to finish tokio::time::sleep(std::time::Duration::from_secs(2)).await; @@ -453,11 +453,11 @@ mod test { assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); // individual logs may or may not buffer - let mut output = String::new(); + let mut output = vec![]; while let Some(line) = logs_stream.try_next().await? { - output.push_str(&String::from_utf8_lossy(&line)); + output.push(line); } - assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); + assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]); // evict the pod let ep = EvictParams::default();