Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an encapsulated file stream in axum-extra to make it more conveni… #3047

Merged
merged 22 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion axum-extra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ version = "0.10.0-alpha.1"
default = ["tracing", "multipart"]

async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"]
file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"]
attachment = ["dep:tracing"]
error_response = ["dep:tracing", "tracing/std"]
cookie = ["dep:cookie"]
Expand Down Expand Up @@ -67,7 +68,7 @@ prost = { version = "0.13", optional = true }
serde_html_form = { version = "0.2.0", optional = true }
serde_json = { version = "1.0.71", optional = true }
serde_path_to_error = { version = "0.1.8", optional = true }
tokio = { version = "1.19", optional = true }
tokio = { version = "1.19", optional = true, features = ["fs"] }
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
tokio-stream = { version = "0.1.9", optional = true }
tokio-util = { version = "0.7", optional = true }
tracing = { version = "0.1.37", default-features = false, optional = true }
Expand Down Expand Up @@ -104,6 +105,7 @@ allowed = [
"prost",
"serde",
"tokio",
"tokio-util",
"tower_layer",
"tower_service",
]
4 changes: 2 additions & 2 deletions axum-extra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
//! `tracing` | Log rejections from built-in extractors | Yes
//! `typed-routing` | Enables the [`TypedPath`](crate::routing::TypedPath) routing utilities | No
//! `typed-header` | Enables the [`TypedHeader`] extractor and response | No
//!
//! [`axum`]: https://crates.io/crates/axum
//! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No
jplatte marked this conversation as resolved.
Show resolved Hide resolved
//! [`axum`]: <https://crates.io/crates/axum>
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
jplatte marked this conversation as resolved.
Show resolved Hide resolved

#![warn(
clippy::all,
Expand Down
270 changes: 270 additions & 0 deletions axum-extra/src/response/file_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
use axum::{
body,
response::{IntoResponse, Response},
BoxError,
};
use bytes::Bytes;
use futures_util::TryStream;
use http::{header, StatusCode};
use std::{io, path::PathBuf};
use tokio::fs::File;
use tokio_util::io::ReaderStream;

/// Alias for `tokio_util::io::ReaderStream<File>`.
pub type AsyncReaderStream = ReaderStream<File>;
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved

/// Encapsulate the file stream.
/// The encapsulated file stream construct requires passing in a stream
/// # Examples
jplatte marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
/// use axum::{
/// http::StatusCode,
/// response::{Response, IntoResponse},
/// Router,
/// routing::get
/// };
/// use axum_extra::response::file_stream::FileStream;
/// use tokio::fs::File;
/// use tokio_util::io::ReaderStream;
/// async fn file_stream() -> Result<Response, (StatusCode, String)> {
/// let stream=ReaderStream::new(File::open("test.txt").await.map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))?);
/// let file_stream_resp = FileStream::new(stream)
/// .file_name("test.txt");
//
/// Ok(file_stream_resp.into_response())
/// }
/// let app = Router::new().route("/FileStreamDownload", get(file_stream));
/// # let _: Router = app;
/// ```
#[derive(Debug)]
pub struct FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
jplatte marked this conversation as resolved.
Show resolved Hide resolved
/// stream.
pub stream: S,
/// The file name of the file.
pub file_name: Option<String>,
/// The size of the file.
pub content_size: Option<u64>,
}

impl<S> FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
/// Create a file stream.
pub fn new(stream: S) -> Self {
Self {
stream,
file_name: None,
content_size: None,
}
}

/// Create a file stream from a file path.
/// # Examples
/// ```
jplatte marked this conversation as resolved.
Show resolved Hide resolved
/// use axum::{
/// http::StatusCode,
/// response::{Response, IntoResponse},
/// Router,
/// routing::get
/// };
/// use axum_extra::response::file_stream::FileStream;
/// use std::path::PathBuf;
jplatte marked this conversation as resolved.
Show resolved Hide resolved
/// use tokio::fs::File;
/// use tokio_util::io::ReaderStream;
/// async fn file_stream() -> Response {
/// FileStream::<ReaderStream<File>>::from_path(PathBuf::from("test.txt"))
/// .await
/// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))
/// .into_response()
/// }
/// let app = Router::new().route("/FileStreamDownload", get(file_stream));
/// # let _: Router = app;
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
/// ```
pub async fn from_path(path: PathBuf) -> io::Result<FileStream<AsyncReaderStream>> {
// open file
let file = File::open(&path).await?;
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
jplatte marked this conversation as resolved.
Show resolved Hide resolved
let mut content_size = None;
let mut file_name = None;

// get file metadata length
if let Ok(metadata) = file.metadata().await {
content_size = Some(metadata.len());
}

// get file name
if let Some(file_name_os) = path.file_name() {
if let Some(file_name_str) = file_name_os.to_str() {
file_name = Some(file_name_str.to_owned());
}
}

// return FileStream
Ok(FileStream {
stream: ReaderStream::new(file),
file_name,
content_size,
})
}

/// Set the file name of the file.
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
pub fn file_name<T: Into<String>>(mut self, file_name: T) -> Self {
jplatte marked this conversation as resolved.
Show resolved Hide resolved
self.file_name = Some(file_name.into());
self
}

/// Set the size of the file.
pub fn content_size<T: Into<u64>>(mut self, len: T) -> Self {
jplatte marked this conversation as resolved.
Show resolved Hide resolved
self.content_size = Some(len.into());
jplatte marked this conversation as resolved.
Show resolved Hide resolved
self
}
}

impl<S> IntoResponse for FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
fn into_response(self) -> Response {
let mut resp = Response::builder().header(header::CONTENT_TYPE, "application/octet-stream");

if let Some(file_name) = self.file_name {
resp = resp.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", file_name),
);
};
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved

if let Some(content_size) = self.content_size {
resp = resp.header(header::CONTENT_LENGTH, content_size);
};
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved

resp.body(body::Body::from_stream(self.stream))
.unwrap_or_else(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("build FileStream responsec error:{}", e),
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
)
.into_response()
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use axum::{extract::Request, routing::get, Router};
use body::Body;
use http_body_util::BodyExt;
use std::io::Cursor;
use tokio_util::io::ReaderStream;
use tower::ServiceExt;

#[tokio::test]
async fn response_file_stream() -> Result<(), Box<dyn std::error::Error>> {
let app = Router::new().route(
"/file",
get(|| async {
// Simulating a file stream
let file_content = b"Hello, this is the simulated file content!".to_vec();
let size = file_content.len() as u64;
let reader = Cursor::new(file_content);

// response file stream
let stream = ReaderStream::new(reader);
FileStream::new(stream)
.file_name("test")
.content_size(size)
.into_response()
yanns marked this conversation as resolved.
Show resolved Hide resolved
}),
);

// Simulating a GET request
let response = app
.oneshot(Request::builder().uri("/file").body(Body::empty())?)
.await?;

// Validate Response Status Code
assert_eq!(response.status(), StatusCode::OK);

// Validate Response Headers
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/octet-stream"
);
assert_eq!(
response.headers().get("content-disposition").unwrap(),
"attachment; filename=\"test\""
);
assert_eq!(response.headers().get("content-length").unwrap(), "42");

// Validate Response Body
let body: &[u8] = &response.into_body().collect().await?.to_bytes();
assert_eq!(
std::str::from_utf8(body)?,
"Hello, this is the simulated file content!"
);
Ok(())
}

#[tokio::test]
async fn response_from_path() -> Result<(), Box<dyn std::error::Error>> {
let app = Router::new().route(
"/from_path",
get(move || async move {
FileStream::<AsyncReaderStream>::from_path("CHANGELOG.md".into())
.await
.unwrap()
.into_response()
}),
);

// Simulating a GET request
let response = app
.oneshot(
Request::builder()
.uri("/from_path")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

// Validate Response Status Code
assert_eq!(response.status(), StatusCode::OK);

// Validate Response Headers
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/octet-stream"
);
assert_eq!(
response.headers().get("content-disposition").unwrap(),
"attachment; filename=\"CHANGELOG.md\""
);

let file = File::open("CHANGELOG.md").await.unwrap();
// get file size
let content_length = file.metadata().await.unwrap().len();

assert_eq!(
response
.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
content_length.to_string()
);
Ok(())
}
}
4 changes: 4 additions & 0 deletions axum-extra/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub mod multiple;
#[cfg(feature = "error_response")]
mod error_response;

#[cfg(feature = "file-stream")]
/// Module for handling file streams.
pub mod file_stream;
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(feature = "error_response")]
pub use error_response::InternalServerError;

Expand Down
2 changes: 2 additions & 0 deletions examples/stream-to-file/Cargo.toml
YanHeDoki marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ edition = "2021"
publish = false

[dependencies]
async-stream = "0.3"
axum = { path = "../../axum", features = ["multipart"] }
axum-extra = { path = "../../axum-extra", features = ["file-stream"] }
futures = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
Expand Down
Loading
Loading