Skip to content

Commit

Permalink
modify log_stream to return an async buf reader
Browse files Browse the repository at this point in the history
Modify `kube_client::api::subresource::Api::log_stream()` to return an
async buf reader (`futures::io::AsyncBufRead`) that can read logs
asynchronously. Users can call call `.lines()` on it to get a newline
buffered stream of logs.

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Jul 3, 2023
1 parent f792fd2 commit 5361191
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 19 deletions.
5 changes: 3 additions & 2 deletions examples/log_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use futures::{StreamExt, TryStreamExt};
use futures::{AsyncBufReadExt, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, LogParams},
Expand All @@ -26,10 +26,11 @@ async fn main() -> Result<()> {
..LogParams::default()
})
.await?
.lines()
.boxed();

while let Some(line) = logs.try_next().await? {
info!("{:?}", String::from_utf8_lossy(&line));
info!("{}", line);
}
Ok(())
}
7 changes: 4 additions & 3 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use futures::Stream;
use futures::{AsyncBufRead, Stream};
use serde::de::DeserializeOwned;
use std::fmt::Debug;

Expand Down Expand Up @@ -248,8 +248,9 @@ where
self.client.request_text(req).await
}

/// Fetch logs as a stream of bytes
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl Stream<Item = Result<Bytes>>> {
/// Fetch logs in the form of an async buf reader. Users can call `.lines()`
/// on it to get a newline buffered stream of logs.
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl AsyncBufRead> {
let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("log_stream");
self.client.request_text_stream(req).await
Expand Down
20 changes: 11 additions & 9 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! retrieve the resources served by the kubernetes API.
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use futures::{self, AsyncBufRead, Stream, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
Expand Down Expand Up @@ -238,15 +238,17 @@ impl Client {
Ok(text)
}

/// Perform a raw HTTP request against the API and get back the response
/// as a stream of bytes
pub async fn request_text_stream(
&self,
request: Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
/// Perform a raw HTTP request against the API, get back the response
/// and return it as an AsyncBufRead. Users can call `.lines()` on it
/// to get a newline buffered stream of logs.
pub async fn request_text_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
let res = self.send(request.map(Body::from)).await?;
// trace!("Status = {:?} for {}", res.status(), res.url());
Ok(res.into_body().map_err(Error::HyperError))
// Map the error as we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res
.into_body()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
Ok(body.into_async_read())
}

/// Perform a raw HTTP request against the API and get back either an object
Expand Down
10 changes: 5 additions & 5 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod test {
client::ConfigExt,
Api, Client, Config, ResourceExt,
};
use futures::{StreamExt, TryStreamExt};
use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube_core::{
params::{DeleteParams, Patch, WatchParams},
Expand Down Expand Up @@ -444,7 +444,7 @@ mod test {
follow: true,
..LogParams::default()
};
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed();
let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines().boxed();

// wait for container to finish
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Expand All @@ -453,11 +453,11 @@ mod test {
assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

// individual logs may or may not buffer
let mut output = String::new();
let mut output = vec![];
while let Some(line) = logs_stream.try_next().await? {
output.push_str(&String::from_utf8_lossy(&line));
output.push(line);
}
assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);

// evict the pod
let ep = EvictParams::default();
Expand Down

0 comments on commit 5361191

Please sign in to comment.