Skip to content

Commit

Permalink
modify log_stream to return an async buf reader
Browse files Browse the repository at this point in the history
Modify `kube_client::api::subresource::Api::log_stream()` to return an
async buf reader (`futures::io::AsyncBufRead`) that can read logs
asynchronously. Users can call call `.lines()` on it to get a newline
buffered stream of logs.

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Jul 3, 2023
1 parent 9dd7bf5 commit 8144c7b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
6 changes: 3 additions & 3 deletions examples/log_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use futures::{StreamExt, TryStreamExt};
use futures::{AsyncBufReadExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, LogParams},
Expand All @@ -26,10 +26,10 @@ async fn main() -> Result<()> {
..LogParams::default()
})
.await?
.boxed();
.lines();

while let Some(line) = logs.try_next().await? {
info!("{:?}", String::from_utf8_lossy(&line));
info!("{}", line);
}
Ok(())
}
20 changes: 16 additions & 4 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::Bytes;
use futures::Stream;
use futures::{AsyncBufRead};
use serde::de::DeserializeOwned;
use std::fmt::Debug;

Expand Down Expand Up @@ -248,8 +247,21 @@ where
self.client.request_text(req).await
}

/// Fetch logs as a stream of bytes
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl Stream<Item = Result<Bytes>>> {
/// Fetch logs in the form of an async buf reader
///
/// # Example
///
/// ```no_run
/// let pods: Api<Pod> = Api::default_namespaced(client);
/// let mut logs = pods
/// .log_stream(&mypod, LogParams::default()).await?
/// .lines();
///
/// while let Some(line) = logs.try_next().await? {
/// info!("{}", line);
/// }
/// ```
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl AsyncBufRead> {
let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("log_stream");
self.client.request_text_stream(req).await
Expand Down
22 changes: 12 additions & 10 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
//!
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
Expand Down Expand Up @@ -238,15 +237,18 @@ impl Client {
Ok(text)
}

/// Perform a raw HTTP request against the API and get back the response
/// as a stream of bytes
pub async fn request_text_stream(
&self,
request: Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
/// Perform a raw HTTP request against the API, get back the response
/// and return it as an object that implements [`AsyncBufRead`] and thus
/// [`AsyncBufReadExt`]. Users can call `.lines()` on it to get a newline
/// buffered stream of logs.
pub async fn request_text_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
let res = self.send(request.map(Body::from)).await?;
// trace!("Status = {:?} for {}", res.status(), res.url());
Ok(res.into_body().map_err(Error::HyperError))
// Map the error as we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res
.into_body()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
Ok(body.into_async_read())
}

/// Perform a raw HTTP request against the API and get back either an object
Expand Down
10 changes: 5 additions & 5 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod test {
client::ConfigExt,
Api, Client, Config, ResourceExt,
};
use futures::{StreamExt, TryStreamExt};
use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube_core::{
params::{DeleteParams, Patch, WatchParams},
Expand Down Expand Up @@ -444,7 +444,7 @@ mod test {
follow: true,
..LogParams::default()
};
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed();
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();

// wait for container to finish
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Expand All @@ -453,11 +453,11 @@ mod test {
assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

// individual logs may or may not buffer
let mut output = String::new();
let mut output = vec![];
while let Some(line) = logs_stream.try_next().await? {
output.push_str(&String::from_utf8_lossy(&line));
output.push(line);
}
assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);

// evict the pod
let ep = EvictParams::default();
Expand Down

0 comments on commit 8144c7b

Please sign in to comment.