Skip to content

Commit

Permalink
modify request_text_stream to return a stream of log lines
Browse files Browse the repository at this point in the history
Modify `kube_client::client::Client::request_text_stream()` to return a
steam of strings where each chunk represents an individual log line
using the `LinesCodec` decoder.

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Jul 2, 2023
1 parent f792fd2 commit 0281b8f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
2 changes: 1 addition & 1 deletion examples/log_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<()> {
.boxed();

while let Some(line) = logs.try_next().await? {
info!("{:?}", String::from_utf8_lossy(&line));
info!("{}", line);
}
Ok(())
}
2 changes: 1 addition & 1 deletion kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ where
}

/// Fetch logs as a stream of bytes
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl Stream<Item = Result<Bytes>>> {
pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl Stream<Item = Result<String>>> {
let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("log_stream");
self.client.request_text_stream(req).await
Expand Down
38 changes: 34 additions & 4 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,44 @@ impl Client {
}

/// Perform a raw HTTP request against the API and get back the response
/// as a stream of bytes
/// as a stream of strings where each chunk represents a log line.
pub async fn request_text_stream(
&self,
request: Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
) -> Result<impl Stream<Item = Result<String>>> {
let res = self.send(request.map(Body::from)).await?;
// trace!("Status = {:?} for {}", res.status(), res.url());
Ok(res.into_body().map_err(Error::HyperError))
let frames = FramedRead::new(
StreamReader::new(res.into_body().map_err(|e| {
// Client timeout. This will be ignored.
if e.is_timeout() {
return std::io::Error::new(std::io::ErrorKind::TimedOut, e);
}
std::io::Error::new(std::io::ErrorKind::Other, e)
})),
LinesCodec::new(),
);
Ok(frames.filter_map(|res| async {
match res {
Ok(val) => {
Some(Ok(val))
},

Err(LinesCodecError::Io(e)) => match e.kind() {
// Client timeout
std::io::ErrorKind::TimedOut => {
tracing::warn!("timeout in poll: {}", e); // our client timeout
None
}
_ => Some(Err(Error::ReadEvents(e))),
},

// Reached the maximum line length without finding a newline.
// This should never happen because we're using the default `usize::MAX`.
Err(LinesCodecError::MaxLineLengthExceeded) => {
Some(Err(Error::LinesCodecMaxLineLengthExceeded))
}
}
}))
}

/// Perform a raw HTTP request against the API and get back either an object
Expand Down
6 changes: 3 additions & 3 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,11 @@ mod test {
assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

// individual logs may or may not buffer
let mut output = String::new();
let mut output = vec![];
while let Some(line) = logs_stream.try_next().await? {
output.push_str(&String::from_utf8_lossy(&line));
output.push(line);
}
assert_eq!(output, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);

// evict the pod
let ep = EvictParams::default();
Expand Down

0 comments on commit 0281b8f

Please sign in to comment.