From 0281b8f1b63c853c74358e998827fd00b1d40c2b Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Sun, 2 Jul 2023 12:35:37 +0530 Subject: [PATCH] modify `request_text_stream` to return a stream of log lines Modify `kube_client::client::Client::request_text_stream()` to return a steam of strings where each chunk represents an individual log line using the `LinesCodec` decoder. Signed-off-by: Sanskar Jaiswal --- examples/log_stream.rs | 2 +- kube-client/src/api/subresource.rs | 2 +- kube-client/src/client/mod.rs | 38 ++++++++++++++++++++++++++---- kube-client/src/lib.rs | 6 ++--- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/examples/log_stream.rs b/examples/log_stream.rs index 19fc1c3ff..ec2078113 100644 --- a/examples/log_stream.rs +++ b/examples/log_stream.rs @@ -29,7 +29,7 @@ async fn main() -> Result<()> { .boxed(); while let Some(line) = logs.try_next().await? { - info!("{:?}", String::from_utf8_lossy(&line)); + info!("{}", line); } Ok(()) } diff --git a/kube-client/src/api/subresource.rs b/kube-client/src/api/subresource.rs index f1876737b..39af8a61d 100644 --- a/kube-client/src/api/subresource.rs +++ b/kube-client/src/api/subresource.rs @@ -249,7 +249,7 @@ where } /// Fetch logs as a stream of bytes - pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result>> { + pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result>> { let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?; req.extensions_mut().insert("log_stream"); self.client.request_text_stream(req).await diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index e17e46599..86a0950e0 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -239,14 +239,44 @@ impl Client { } /// Perform a raw HTTP request against the API and get back the response - /// as a stream of bytes + /// as a stream of strings where each chunk represents a log line. pub async fn request_text_stream( &self, request: Request>, - ) -> Result>> { + ) -> Result>> { let res = self.send(request.map(Body::from)).await?; - // trace!("Status = {:?} for {}", res.status(), res.url()); - Ok(res.into_body().map_err(Error::HyperError)) + let frames = FramedRead::new( + StreamReader::new(res.into_body().map_err(|e| { + // Client timeout. This will be ignored. + if e.is_timeout() { + return std::io::Error::new(std::io::ErrorKind::TimedOut, e); + } + std::io::Error::new(std::io::ErrorKind::Other, e) + })), + LinesCodec::new(), + ); + Ok(frames.filter_map(|res| async { + match res { + Ok(val) => { + Some(Ok(val)) + }, + + Err(LinesCodecError::Io(e)) => match e.kind() { + // Client timeout + std::io::ErrorKind::TimedOut => { + tracing::warn!("timeout in poll: {}", e); // our client timeout + None + } + _ => Some(Err(Error::ReadEvents(e))), + }, + + // Reached the maximum line length without finding a newline. + // This should never happen because we're using the default `usize::MAX`. + Err(LinesCodecError::MaxLineLengthExceeded) => { + Some(Err(Error::LinesCodecMaxLineLengthExceeded)) + } + } + })) } /// Perform a raw HTTP request against the API and get back either an object diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index c92cf83b0..8eabb6c57 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -453,11 +453,11 @@ mod test { assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); // individual logs may or may not buffer - let mut output = String::new(); + let mut output = vec![]; while let Some(line) = logs_stream.try_next().await? { - output.push_str(&String::from_utf8_lossy(&line)); + output.push(line); } - assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n"); + assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]); // evict the pod let ep = EvictParams::default();