Skip to content

Commit

Permalink
util: makes Framed and FramedStream resumable after eof (#3272)
Browse files Browse the repository at this point in the history
  • Loading branch information
somethingelseentirely authored Mar 22, 2021
1 parent 63395f0 commit 8ed825f
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 12 deletions.
21 changes: 20 additions & 1 deletion tokio-util/src/codec/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ use std::io;
/// implementing stateful streaming parsers. In many cases, though, this type
/// will simply be a unit struct (e.g. `struct HttpDecoder`).
///
/// For some underlying data-sources, namely files and FIFOs,
/// it's possible to temporarily read 0 bytes by reaching EOF.
///
/// In these cases `decode_eof` will be called until it signals
/// fullfillment of all closing frames by returning `Ok(None)`.
/// After that, repeated attempts to read from the [`Framed`] or [`FramedRead`]
/// will not invoke `decode` or `decode_eof` again, until data can be read
/// during a retry.
///
/// It is up to the Decoder to keep track of a restart after an EOF,
/// and to decide how to handle such an event by, for example,
/// allowing frames to cross EOF boundaries, re-emitting opening frames, or
/// reseting the entire internal state.
///
/// [`Framed`]: crate::codec::Framed
/// [`FramedRead`]: crate::codec::FramedRead
pub trait Decoder {
Expand Down Expand Up @@ -115,13 +129,18 @@ pub trait Decoder {
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned while there is unconsumed data in `buf`.
/// Typically this doesn't need to be implemented unless the framing
/// protocol differs near the end of the stream.
/// protocol differs near the end of the stream, or if you need to construct
/// frames _across_ eof boundaries on sources that can be resumed.
///
/// Note that the `buf` argument may be empty. If a previous call to
/// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
/// called again until it returns `None`, indicating that there are no more
/// frames to yield. This behavior enables returning finalization frames
/// that may not be based on inbound data.
///
/// Once `None` has been returned, `decode_eof` won't be called again until
/// an attempt to resume the stream has been made, where the underlying stream
/// actually returned more data.
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(buf)? {
Some(frame) => Ok(Some(frame)),
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/codec/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ where
/// calling [`split`] on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
///
/// Note that, for some byte sources, the stream can be resumed after an EOF
/// by reading from it, even after it has returned `None`. Repeated attempts
/// to do so, without new data available, continue to return `None` without
/// creating more (closing) frames.
///
/// [`Stream`]: futures_core::Stream
/// [`Sink`]: futures_sink::Sink
/// [`Decode`]: crate::codec::Decoder
Expand Down
77 changes: 66 additions & 11 deletions tokio-util/src/codec/framed_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,42 +120,97 @@ where

let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
// The following loops implements a state machine with each state corresponding
// to a combination of the `is_readable` and `eof` flags. States persist across
// loop entries and most state transitions occur with a return.
//
// The intitial state is `reading`.
//
// | state | eof | is_readable |
// |---------|-------|-------------|
// | reading | false | false |
// | framing | false | true |
// | pausing | true | true |
// | paused | true | false |
//
// `decode_eof`
// returns `Some` read 0 bytes
// │ │ │ │
// │ ▼ │ ▼
// ┌───────┐ `decode_eof` ┌──────┐
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐
// │ └───────┘ └──────┘ │
// pending read┐ │ ┌──────┐ │ ▲ │
// │ │ │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read
// │ ╔═══════╗ ┌───────┐◀─┘ │
// └──║reading║─read n>0 bytes─▶│framing│ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘
// ▲ │
// │ │
// └─`decode` returns `None`─┘
loop {
// Repeatedly call `decode` or `decode_eof` as long as it is
// "readable". Readable is defined as not having returned `None`. If
// the upstream has returned EOF, and the decoder is no longer
// readable, it can be assumed that the decoder will never become
// readable again, at which point the stream is terminated.
// Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`.
//
// If `decode` couldn't read a frame and the upstream source has returned eof,
// `decode_eof` will attemp to decode the remaining bytes as closing frames.
//
// If the underlying AsyncRead is resumable, we may continue after an EOF,
// but must finish emmiting all of it's associated `decode_eof` frames.
// Furthermore, we don't want to emit any `decode_eof` frames on retried
// reads after an EOF unless we've actually read more data.
if state.is_readable {
// pausing or framing
if state.eof {
// pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused
}
// implicit pausing -> pausing or pausing -> paused
return Poll::Ready(frame.map(Ok));
}

// framing
trace!("attempting to decode a frame");

if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
trace!("frame decoded from buffer");
// implicit framing -> framing
return Poll::Ready(Some(Ok(frame)));
}

// framing -> reading
state.is_readable = false;
}

assert!(!state.eof);

// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
// reading or paused
// If we can't build a frame yet, try to read more data and try again.
// Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending,
};
if bytect == 0 {
if state.eof {
// We're already at an EOF, and since we've reached this path
// we're also not readable. This implies that we've already finished
// our `decode_eof` handling, so we can simply return `None`.
// implicit paused -> paused
return Poll::Ready(None);
}
// prepare reading -> paused
state.eof = true;
} else {
// prepare paused -> framing or noop reading -> framing
state.eof = false;
}

// paused -> framing or reading -> framing or reading -> pausing
state.is_readable = true;
}
}
Expand Down
23 changes: 23 additions & 0 deletions tokio-util/tests/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,29 @@ fn multi_frames_on_eof() {
});
}

#[test]
fn read_eof_then_resume() {
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x01".to_vec()),
Ok(b"".to_vec()),
Ok(b"\x00\x00\x00\x02".to_vec()),
Ok(b"".to_vec()),
Ok(b"\x00\x00\x00\x03".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);

task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 1);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert_read!(pin!(framed).poll_next(cx), 2);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert_read!(pin!(framed).poll_next(cx), 3);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}

// ===== Mock ======

struct Mock {
Expand Down

0 comments on commit 8ed825f

Please sign in to comment.