diff --git a/tokio-util/src/codec/decoder.rs b/tokio-util/src/codec/decoder.rs index 895ea107bdb..4b9da0865d3 100644 --- a/tokio-util/src/codec/decoder.rs +++ b/tokio-util/src/codec/decoder.rs @@ -16,6 +16,20 @@ use std::io; /// implementing stateful streaming parsers. In many cases, though, this type /// will simply be a unit struct (e.g. `struct HttpDecoder`). /// +/// For some underlying data-sources, namely files and FIFOs, +/// it's possible to temporarily read 0 bytes by reaching EOF. +/// +/// In these cases `decode_eof` will be called until it signals +/// fullfillment of all closing frames by returning `Ok(None)`. +/// After that, repeated attempts to read from the [`Framed`] or [`FramedRead`] +/// will not invoke `decode` or `decode_eof` again, until data can be read +/// during a retry. +/// +/// It is up to the Decoder to keep track of a restart after an EOF, +/// and to decide how to handle such an event by, for example, +/// allowing frames to cross EOF boundaries, re-emitting opening frames, or +/// reseting the entire internal state. +/// /// [`Framed`]: crate::codec::Framed /// [`FramedRead`]: crate::codec::FramedRead pub trait Decoder { @@ -115,13 +129,18 @@ pub trait Decoder { /// This method defaults to calling `decode` and returns an error if /// `Ok(None)` is returned while there is unconsumed data in `buf`. /// Typically this doesn't need to be implemented unless the framing - /// protocol differs near the end of the stream. + /// protocol differs near the end of the stream, or if you need to construct + /// frames _across_ eof boundaries on sources that can be resumed. /// /// Note that the `buf` argument may be empty. If a previous call to /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be /// called again until it returns `None`, indicating that there are no more /// frames to yield. This behavior enables returning finalization frames /// that may not be based on inbound data. + /// + /// Once `None` has been returned, `decode_eof` won't be called again until + /// an attempt to resume the stream has been made, where the underlying stream + /// actually returned more data. fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { match self.decode(buf)? { Some(frame) => Ok(Some(frame)), diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index c7701e05ff6..516590081f4 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -52,6 +52,11 @@ where /// calling [`split`] on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. /// + /// Note that, for some byte sources, the stream can be resumed after an EOF + /// by reading from it, even after it has returned `None`. Repeated attempts + /// to do so, without new data available, continue to return `None` without + /// creating more (closing) frames. + /// /// [`Stream`]: futures_core::Stream /// [`Sink`]: futures_sink::Sink /// [`Decode`]: crate::codec::Decoder diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index 66714d9f4b7..89f4a06ba7d 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -120,42 +120,97 @@ where let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); + // The following loops implements a state machine with each state corresponding + // to a combination of the `is_readable` and `eof` flags. States persist across + // loop entries and most state transitions occur with a return. + // + // The intitial state is `reading`. + // + // | state | eof | is_readable | + // |---------|-------|-------------| + // | reading | false | false | + // | framing | false | true | + // | pausing | true | true | + // | paused | true | false | + // + // `decode_eof` + // returns `Some` read 0 bytes + // │ │ │ │ + // │ ▼ │ ▼ + // ┌───────┐ `decode_eof` ┌──────┐ + // ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐ + // │ └───────┘ └──────┘ │ + // pending read┐ │ ┌──────┐ │ ▲ │ + // │ │ │ │ │ │ │ │ + // │ ▼ │ │ `decode` returns `Some`│ pending read + // │ ╔═══════╗ ┌───────┐◀─┘ │ + // └──║reading║─read n>0 bytes─▶│framing│ │ + // ╚═══════╝ └───────┘◀──────read n>0 bytes┘ + // ▲ │ + // │ │ + // └─`decode` returns `None`─┘ loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. + // Repeatedly call `decode` or `decode_eof` while the buffer is "readable", + // i.e. it _might_ contain data consumable as a frame or closing frame. + // Both signal that there is no such data by returning `None`. + // + // If `decode` couldn't read a frame and the upstream source has returned eof, + // `decode_eof` will attemp to decode the remaining bytes as closing frames. + // + // If the underlying AsyncRead is resumable, we may continue after an EOF, + // but must finish emmiting all of it's associated `decode_eof` frames. + // Furthermore, we don't want to emit any `decode_eof` frames on retried + // reads after an EOF unless we've actually read more data. if state.is_readable { + // pausing or framing if state.eof { + // pausing let frame = pinned.codec.decode_eof(&mut state.buffer)?; + if frame.is_none() { + state.is_readable = false; // prepare pausing -> paused + } + // implicit pausing -> pausing or pausing -> paused return Poll::Ready(frame.map(Ok)); } + // framing trace!("attempting to decode a frame"); if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { trace!("frame decoded from buffer"); + // implicit framing -> framing return Poll::Ready(Some(Ok(frame))); } + // framing -> reading state.is_readable = false; } - - assert!(!state.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF + // reading or paused + // If we can't build a frame yet, try to read more data and try again. + // Make sure we've got room for at least one byte to read to ensure + // that we don't get a spurious 0 that looks like EOF. state.buffer.reserve(1); let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, + // implicit reading -> reading or implicit paused -> paused Poll::Pending => return Poll::Pending, }; if bytect == 0 { + if state.eof { + // We're already at an EOF, and since we've reached this path + // we're also not readable. This implies that we've already finished + // our `decode_eof` handling, so we can simply return `None`. + // implicit paused -> paused + return Poll::Ready(None); + } + // prepare reading -> paused state.eof = true; + } else { + // prepare paused -> framing or noop reading -> framing + state.eof = false; } + // paused -> framing or reading -> framing or reading -> pausing state.is_readable = true; } } diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs index 0ce8118ee9e..930a631d5d0 100644 --- a/tokio-util/tests/framed_read.rs +++ b/tokio-util/tests/framed_read.rs @@ -254,6 +254,29 @@ fn multi_frames_on_eof() { }); } +#[test] +fn read_eof_then_resume() { + let mut task = task::spawn(()); + let mock = mock! { + Ok(b"\x00\x00\x00\x01".to_vec()), + Ok(b"".to_vec()), + Ok(b"\x00\x00\x00\x02".to_vec()), + Ok(b"".to_vec()), + Ok(b"\x00\x00\x00\x03".to_vec()), + }; + let mut framed = FramedRead::new(mock, U32Decoder); + + task.enter(|cx, _| { + assert_read!(pin!(framed).poll_next(cx), 1); + assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); + assert_read!(pin!(framed).poll_next(cx), 2); + assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); + assert_read!(pin!(framed).poll_next(cx), 3); + assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); + assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); + }); +} + // ===== Mock ====== struct Mock {