-
Notifications
You must be signed in to change notification settings - Fork 634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AsyncWriteExt::write_all_vectored utility #1741
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,6 +124,11 @@ pub use self::write_vectored::WriteVectored; | |
mod write_all; | ||
pub use self::write_all::WriteAll; | ||
|
||
#[cfg(feature = "write_all_vectored")] | ||
mod write_all_vectored; | ||
#[cfg(feature = "write_all_vectored")] | ||
pub use self::write_all_vectored::WriteAllVectored; | ||
|
||
/// An extension trait which adds utility methods to `AsyncRead` types. | ||
pub trait AsyncReadExt: AsyncRead { | ||
/// Creates an adaptor which will chain this stream with another. | ||
|
@@ -460,6 +465,60 @@ pub trait AsyncWriteExt: AsyncWrite { | |
WriteAll::new(self, buf) | ||
} | ||
|
||
/// Attempts to write multiple buffers into this writer. | ||
/// | ||
/// Creates a future that will write the entire contents of `bufs` into this | ||
/// `AsyncWrite` using [vectored writes]. | ||
/// | ||
/// The returned future will not complete until all the data has been | ||
/// written. | ||
/// | ||
/// [vectored writes]: std::io::Write::write_vectored | ||
/// | ||
/// # Notes | ||
/// | ||
/// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to | ||
/// a slice of `IoSlice`s, not an immutable one. That's because we need to | ||
/// modify the slice to keep track of the bytes already written. | ||
/// | ||
/// Once this futures returns, the contents of `bufs` are unspecified, as | ||
/// this depends on how many calls to `write_vectored` were necessary. It is | ||
/// best to understand this function as taking ownership of `bufs` and to | ||
/// not use `bufs` afterwards. The underlying buffers, to which the | ||
/// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and | ||
/// can be reused. | ||
/// | ||
/// # Examples | ||
/// | ||
/// ``` | ||
/// # futures::executor::block_on(async { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for including this example! |
||
/// use futures::io::AsyncWriteExt; | ||
/// use std::io::{Cursor, IoSlice}; | ||
/// | ||
/// let mut writer = Cursor::new([0u8; 7]); | ||
/// let bufs = &mut [ | ||
/// IoSlice::new(&[1]), | ||
/// IoSlice::new(&[2, 3]), | ||
/// IoSlice::new(&[4, 5, 6]), | ||
/// ]; | ||
/// | ||
/// writer.write_all_vectored(bufs).await?; | ||
/// // Note: the contents of `bufs` is now undefined, see the Notes section. | ||
/// | ||
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]); | ||
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); | ||
/// ``` | ||
#[cfg(feature = "write_all_vectored")] | ||
fn write_all_vectored<'a>( | ||
&'a mut self, | ||
bufs: &'a mut [IoSlice<'a>], | ||
) -> WriteAllVectored<'a, Self> | ||
where | ||
Self: Unpin, | ||
{ | ||
WriteAllVectored::new(self, bufs) | ||
} | ||
|
||
/// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be | ||
/// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`. | ||
/// Requires the `io-compat` feature to enable. | ||
|
@@ -470,7 +529,6 @@ pub trait AsyncWriteExt: AsyncWrite { | |
Compat::new(self) | ||
} | ||
|
||
|
||
/// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`. | ||
/// | ||
/// This adapter produces a sink that will write each value passed to it | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
use futures_core::future::Future; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implementation and tests here look good to me, thanks! |
||
use futures_core::task::{Context, Poll}; | ||
use futures_io::AsyncWrite; | ||
use futures_io::IoSlice; | ||
use std::io; | ||
use std::mem; | ||
use std::pin::Pin; | ||
|
||
/// Future for the | ||
/// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method. | ||
#[derive(Debug)] | ||
#[must_use = "futures do nothing unless you `.await` or poll them"] | ||
pub struct WriteAllVectored<'a, W: ?Sized + Unpin> { | ||
writer: &'a mut W, | ||
bufs: &'a mut [IoSlice<'a>], | ||
} | ||
|
||
impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {} | ||
|
||
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { | ||
pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { | ||
WriteAllVectored { writer, bufs } | ||
} | ||
} | ||
|
||
impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> { | ||
type Output = io::Result<()>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
let this = &mut *self; | ||
while !this.bufs.is_empty() { | ||
let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?; | ||
if n == 0 { | ||
return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); | ||
} else { | ||
this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n); | ||
} | ||
} | ||
|
||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::cmp::min; | ||
use std::future::Future; | ||
use std::io; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice}; | ||
use crate::task::noop_waker; | ||
|
||
/// Create a new writer that reads from at most `n_bufs` and reads | ||
/// `per_call` bytes (in total) per call to write. | ||
fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter { | ||
TestWriter { | ||
n_bufs, | ||
per_call, | ||
written: Vec::new(), | ||
} | ||
} | ||
|
||
// TODO: maybe move this the future-test crate? | ||
struct TestWriter { | ||
n_bufs: usize, | ||
per_call: usize, | ||
written: Vec<u8>, | ||
} | ||
|
||
impl AsyncWrite for TestWriter { | ||
fn poll_write( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<io::Result<usize>> { | ||
self.poll_write_vectored(cx, &[IoSlice::new(buf)]) | ||
} | ||
|
||
fn poll_write_vectored( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut Context<'_>, | ||
bufs: &[IoSlice<'_>], | ||
) -> Poll<io::Result<usize>> { | ||
let mut left = self.per_call; | ||
let mut written = 0; | ||
for buf in bufs.iter().take(self.n_bufs) { | ||
let n = min(left, buf.len()); | ||
self.written.extend_from_slice(&buf[0..n]); | ||
left -= n; | ||
written += n; | ||
} | ||
Poll::Ready(Ok(written)) | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
|
||
// TODO: maybe move this the future-test crate? | ||
macro_rules! assert_poll_ok { | ||
($e:expr, $expected:expr) => { | ||
let expected = $expected; | ||
match $e { | ||
Poll::Ready(Ok(ok)) if ok == expected => {} | ||
got => panic!( | ||
"unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", | ||
got, expected | ||
), | ||
} | ||
}; | ||
} | ||
|
||
#[test] | ||
fn test_writer_read_from_one_buf() { | ||
let waker = noop_waker(); | ||
let mut cx = Context::from_waker(&waker); | ||
|
||
let mut dst = test_writer(1, 2); | ||
let mut dst = Pin::new(&mut dst); | ||
|
||
assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0); | ||
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0); | ||
|
||
// Read at most 2 bytes. | ||
assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2); | ||
let bufs = &[IoSlice::new(&[2, 2, 2])]; | ||
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2); | ||
|
||
// Only read from first buf. | ||
let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])]; | ||
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1); | ||
|
||
assert_eq!(dst.written, &[1, 1, 2, 2, 3]); | ||
} | ||
|
||
#[test] | ||
fn test_writer_read_from_multiple_bufs() { | ||
let waker = noop_waker(); | ||
let mut cx = Context::from_waker(&waker); | ||
|
||
let mut dst = test_writer(3, 3); | ||
let mut dst = Pin::new(&mut dst); | ||
|
||
// Read at most 3 bytes from two buffers. | ||
let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])]; | ||
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); | ||
|
||
// Read at most 3 bytes from three buffers. | ||
let bufs = &[ | ||
IoSlice::new(&[3]), | ||
IoSlice::new(&[4]), | ||
IoSlice::new(&[5, 5]), | ||
]; | ||
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); | ||
|
||
assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]); | ||
} | ||
|
||
#[test] | ||
fn test_write_all_vectored() { | ||
let waker = noop_waker(); | ||
let mut cx = Context::from_waker(&waker); | ||
|
||
#[rustfmt::skip] // Becomes unreadable otherwise. | ||
let tests: Vec<(_, &'static [u8])> = vec![ | ||
(vec![], &[]), | ||
(vec![IoSlice::new(&[1])], &[1]), | ||
(vec![IoSlice::new(&[1, 2])], &[1, 2]), | ||
(vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]), | ||
(vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]), | ||
(vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]), | ||
(vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]), | ||
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]), | ||
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]), | ||
(vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]), | ||
(vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]), | ||
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]), | ||
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]), | ||
]; | ||
|
||
for (mut input, wanted) in tests.into_iter() { | ||
let mut dst = test_writer(2, 2); | ||
{ | ||
let mut future = dst.write_all_vectored(&mut *input); | ||
match Pin::new(&mut future).poll(&mut cx) { | ||
Poll::Ready(Ok(())) => {} | ||
other => panic!("unexpected result polling future: {:?}", other), | ||
} | ||
} | ||
assert_eq!(&*dst.written, &*wanted); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please carefully review this section, I don't if it clearly states what I mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the future successfully completes,
bufs
is an empty slice, right?On failure
bufs
may have been modified, but is not guaranteed to match with what has actually been written out (because I believe the underlying IO doesn't guarantee that it either writes some data or returns an error, it could have written some of the data and then returned an error). That's pretty much equivalent to whatwrite_all
guarantees, if it returns an error some part of the data will have been written with no way to know how much.I think this should try to avoid the "undefined" term since that has such strong memory-safety connotations, and the failure cases are relatively well-defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, but in the future we could optimise to just return ready if the entire (remaining) buffer is written, not modifying
bufs
.Do you want this to be reflected in the documentation somehow, or leave it as is?
Hence I added the it was not about memory safety, but I do understand the concern. Should I replace "undefined" with "unknown" (or something else) or you want to provide the guarantee that
bufs
will be empty after it successfully returns (also see my first remark in this comment)?