diff --git a/.travis.yml b/.travis.yml index b90ed78d0..5b3a7ba31 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,8 @@ script: - cargo build --verbose - cargo test --verbose - cargo test --verbose --features zlib + - cargo test --verbose --features tokio + - cargo test --verbose --features 'tokio zlib' - cargo test --verbose --features zlib --no-default-features - cargo clean && cargo build - rustdoc --test README.md -L target/debug -L target/debug/deps diff --git a/Cargo.toml b/Cargo.toml index 5d9a8b2bf..0d5e24426 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,15 @@ streams. libc = "0.2" miniz-sys = { path = "miniz-sys", version = "0.1.7", optional = true } libz-sys = { version = "1.0", optional = true } +tokio-io = { version = "0.1", optional = true } +futures = { version = "0.1", optional = true } [dev-dependencies] rand = "0.3" quickcheck = { version = "0.4", default-features = false } +tokio-core = "0.1" [features] default = ["miniz-sys"] zlib = ["libz-sys"] +tokio = ["tokio-io", "futures"] diff --git a/appveyor.yml b/appveyor.yml index 4a6104291..f5d21540d 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -15,3 +15,4 @@ build: false test_script: - cargo test --verbose --target %TARGET% + - cargo test --verbose --target %TARGET% --features tokio diff --git a/src/deflate.rs b/src/deflate.rs index 480e3ef44..3b4571ab1 100644 --- a/src/deflate.rs +++ b/src/deflate.rs @@ -4,6 +4,11 @@ use std::io::prelude::*; use std::io; use std::mem; +#[cfg(feature = "tokio")] +use futures::Poll; +#[cfg(feature = "tokio")] +use tokio_io::{AsyncRead, AsyncWrite}; + use bufreader::BufReader; use zio; use {Compress, Decompress}; @@ -70,6 +75,19 @@ impl EncoderWriter { } } + /// Acquires a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Acquires a mutable reference to the underlying writer. + /// + /// Note that mutating the output/input state of the stream may corrupt this + /// object, so care must be taken when using this method. + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + /// Resets the state of this encoder entirely, swapping out the output /// stream for another. /// @@ -87,13 +105,33 @@ impl EncoderWriter { Ok(self.inner.replace(w)) } + /// Attempt to finish this output stream, writing out final chunks of data. + /// + /// Note that this function can only be used once data has finished being + /// written to the output stream. After this function is called then further + /// calls to `write` may result in a panic. + /// + /// # Panics + /// + /// Attempts to write data to this stream may result in a panic after this + /// function is called. + pub fn try_finish(&mut self) -> io::Result<()> { + self.inner.finish() + } + /// Consumes this encoder, flushing the output stream. /// /// This will flush the underlying data stream, close off the compressed /// stream and, if successful, return the contained writer. + /// + /// Note that this function may not be suitable to call in a situation where + /// the underlying stream is an asynchronous I/O stream. To finish a stream + /// the `try_finish` (or `shutdown`) method should be used instead. To + /// re-acquire ownership of a stream it is safe to call this method after + /// `try_finish` or `shutdown` has returned `Ok`. pub fn finish(mut self) -> io::Result { try!(self.inner.finish()); - Ok(self.inner.into_inner()) + Ok(self.inner.take_inner()) } /// Consumes this encoder, flushing the output stream. @@ -105,7 +143,23 @@ impl EncoderWriter { /// stream. To close the stream add the two bytes 0x3 and 0x0. pub fn flush_finish(mut self) -> io::Result { try!(self.inner.flush()); - Ok(self.inner.into_inner()) + Ok(self.inner.take_inner()) + } + + /// Returns the number of bytes that have been written to this compresor. + /// + /// Note that not all bytes written to this object may be accounted for, + /// there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.inner.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been written yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.inner.data.total_out() } } @@ -119,12 +173,24 @@ impl Write for EncoderWriter { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try_nb!(self.inner.finish()); + self.inner.get_mut().shutdown() + } +} + impl Read for EncoderWriter { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.get_mut().unwrap().read(buf) + self.inner.get_mut().read(buf) } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderWriter { +} + impl EncoderReader { /// Creates a new encoder which will read uncompressed data from the given /// stream and emit the compressed stream. @@ -141,6 +207,9 @@ impl EncoderReader { /// the input stream with the one provided, returning the previous input /// stream. Future data read from this encoder will be the compressed /// version of `r`'s data. + /// + /// Note that there may be currently buffered data when this function is + /// called, and in that case the buffered data is discarded. pub fn reset(&mut self, r: R) -> R { self.inner.data.reset(); self.inner.obj.reset(r) @@ -160,9 +229,29 @@ impl EncoderReader { } /// Consumes this encoder, returning the underlying reader. + /// + /// Note that there may be buffered bytes which are not re-acquired as part + /// of this transition. It's recommended to only call this function after + /// EOF has been reached. pub fn into_inner(self) -> R { self.inner.into_inner().into_inner() } + + /// Returns the number of bytes that have been read into this compressor. + /// + /// Note that not all bytes read from the underlying object may be accounted + /// for, there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.inner.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been read yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.inner.data.total_out() + } } impl Read for EncoderReader { @@ -171,6 +260,10 @@ impl Read for EncoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderReader { +} + impl Write for EncoderReader { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -181,6 +274,13 @@ impl Write for EncoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderReader { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl EncoderReaderBuf { /// Creates a new encoder which will read uncompressed data from the given /// stream and emit the compressed stream. @@ -220,6 +320,22 @@ impl EncoderReaderBuf { pub fn into_inner(self) -> R { self.obj } + + /// Returns the number of bytes that have been read into this compressor. + /// + /// Note that not all bytes read from the underlying object may be accounted + /// for, there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been read yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.data.total_out() + } } impl Read for EncoderReaderBuf { @@ -228,6 +344,10 @@ impl Read for EncoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderReaderBuf { +} + impl Write for EncoderReaderBuf { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -238,6 +358,13 @@ impl Write for EncoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderReaderBuf { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderReader { /// Creates a new decoder which will decompress data read from the given /// stream. @@ -262,6 +389,9 @@ impl DecoderReader { /// input stream with the one provided, returning the previous input /// stream. Future data read from this decoder will be the decompressed /// version of `r`'s data. + /// + /// Note that there may be currently buffered data when this function is + /// called, and in that case the buffered data is discarded. pub fn reset(&mut self, r: R) -> R { self.inner.data = Decompress::new(false); self.inner.obj.reset(r) @@ -281,6 +411,10 @@ impl DecoderReader { } /// Consumes this decoder, returning the underlying reader. + /// + /// Note that there may be buffered bytes which are not re-acquired as part + /// of this transition. It's recommended to only call this function after + /// EOF has been reached. pub fn into_inner(self) -> R { self.inner.into_inner().into_inner() } @@ -305,6 +439,10 @@ impl Read for DecoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderReader { +} + impl Write for DecoderReader { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -315,6 +453,13 @@ impl Write for DecoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderReader { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderReaderBuf { /// Creates a new decoder which will decompress data read from the given /// stream. @@ -383,6 +528,10 @@ impl Read for DecoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderReaderBuf { +} + impl Write for DecoderReaderBuf { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -393,6 +542,13 @@ impl Write for DecoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderReaderBuf { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderWriter { /// Creates a new decoder which will write uncompressed data to the stream. /// @@ -404,6 +560,19 @@ impl DecoderWriter { } } + /// Acquires a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Acquires a mutable reference to the underlying writer. + /// + /// Note that mutating the output/input state of the stream may corrupt this + /// object, so care must be taken when using this method. + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + /// Resets the state of this decoder entirely, swapping out the output /// stream for another. /// @@ -421,13 +590,33 @@ impl DecoderWriter { Ok(self.inner.replace(w)) } + /// Attempt to finish this output stream, writing out final chunks of data. + /// + /// Note that this function can only be used once data has finished being + /// written to the output stream. After this function is called then further + /// calls to `write` may result in a panic. + /// + /// # Panics + /// + /// Attempts to write data to this stream may result in a panic after this + /// function is called. + pub fn try_finish(&mut self) -> io::Result<()> { + self.inner.finish() + } + /// Consumes this encoder, flushing the output stream. /// /// This will flush the underlying data stream and then return the contained /// writer if the flush succeeded. + /// + /// Note that this function may not be suitable to call in a situation where + /// the underlying stream is an asynchronous I/O stream. To finish a stream + /// the `try_finish` (or `shutdown`) method should be used instead. To + /// re-acquire ownership of a stream it is safe to call this method after + /// `try_finish` or `shutdown` has returned `Ok`. pub fn finish(mut self) -> io::Result { try!(self.inner.finish()); - Ok(self.inner.into_inner()) + Ok(self.inner.take_inner()) } /// Returns the number of bytes that the decompressor has consumed for @@ -456,12 +645,24 @@ impl Write for DecoderWriter { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try_nb!(self.inner.finish()); + self.inner.get_mut().shutdown() + } +} + impl Read for DecoderWriter { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.get_mut().unwrap().read(buf) + self.inner.get_mut().read(buf) } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderWriter { +} + #[cfg(test)] mod tests { use std::io::prelude::*; diff --git a/src/gz.rs b/src/gz.rs index cd511f812..42bc9c0e8 100644 --- a/src/gz.rs +++ b/src/gz.rs @@ -9,6 +9,11 @@ use std::io::prelude::*; use std::io; use std::mem; +#[cfg(feature = "tokio")] +use futures::Poll; +#[cfg(feature = "tokio")] +use tokio_io::{AsyncRead, AsyncWrite}; + use {Compression, Compress}; use bufreader::BufReader; use crc::{CrcReader, Crc}; @@ -27,6 +32,7 @@ static FCOMMENT: u8 = 1 << 4; pub struct EncoderWriter { inner: zio::Writer, crc: Crc, + crc_bytes_written: usize, header: Vec, } @@ -166,6 +172,7 @@ impl Builder { inner: zio::Writer::new(w, Compress::new(lvl, false)), crc: Crc::new(), header: self.into_header(lvl), + crc_bytes_written: 0, } } @@ -257,16 +264,9 @@ impl EncoderWriter { Builder::new().write(w, level) } - /// Finish encoding this stream, returning the underlying writer once the - /// encoding is done. - pub fn finish(mut self) -> io::Result { - try!(self.do_finish()); - Ok(self.inner.take_inner().unwrap()) - } - /// Acquires a reference to the underlying writer. pub fn get_ref(&self) -> &W { - self.inner.get_ref().unwrap() + self.inner.get_ref() } /// Acquires a mutable reference to the underlying writer. @@ -274,54 +274,99 @@ impl EncoderWriter { /// Note that mutation of the writer may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut W { - self.inner.get_mut().unwrap() + self.inner.get_mut() } - fn do_finish(&mut self) -> io::Result<()> { - if self.header.len() != 0 { - try!(self.inner.get_mut().unwrap().write_all(&self.header)); - } + /// Attempt to finish this output stream, writing out final chunks of data. + /// + /// Note that this function can only be used once data has finished being + /// written to the output stream. After this function is called then further + /// calls to `write` may result in a panic. + /// + /// # Panics + /// + /// Attempts to write data to this stream may result in a panic after this + /// function is called. + pub fn try_finish(&mut self) -> io::Result<()> { + try!(self.write_header()); try!(self.inner.finish()); - let mut inner = self.inner.get_mut().unwrap(); - let (sum, amt) = (self.crc.sum() as u32, self.crc.amount()); - let buf = [(sum >> 0) as u8, - (sum >> 8) as u8, - (sum >> 16) as u8, - (sum >> 24) as u8, - (amt >> 0) as u8, - (amt >> 8) as u8, - (amt >> 16) as u8, - (amt >> 24) as u8]; - inner.write_all(&buf) + + while self.crc_bytes_written < 8 { + let (sum, amt) = (self.crc.sum() as u32, self.crc.amount()); + let buf = [(sum >> 0) as u8, + (sum >> 8) as u8, + (sum >> 16) as u8, + (sum >> 24) as u8, + (amt >> 0) as u8, + (amt >> 8) as u8, + (amt >> 16) as u8, + (amt >> 24) as u8]; + let mut inner = self.inner.get_mut(); + let n = try!(inner.write(&buf[self.crc_bytes_written..])); + self.crc_bytes_written += n; + } + Ok(()) + } + + /// Finish encoding this stream, returning the underlying writer once the + /// encoding is done. + /// + /// Note that this function may not be suitable to call in a situation where + /// the underlying stream is an asynchronous I/O stream. To finish a stream + /// the `try_finish` (or `shutdown`) method should be used instead. To + /// re-acquire ownership of a stream it is safe to call this method after + /// `try_finish` or `shutdown` has returned `Ok`. + pub fn finish(mut self) -> io::Result { + try!(self.try_finish()); + Ok(self.inner.take_inner()) + } + + fn write_header(&mut self) -> io::Result<()> { + while self.header.len() > 0 { + let n = try!(self.inner.get_mut().write(&self.header)); + self.header.drain(..n); + } + Ok(()) } } impl Write for EncoderWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.header.len() != 0 { - try!(self.inner.get_mut().unwrap().write_all(&self.header)); - self.header.truncate(0); - } + assert_eq!(self.crc_bytes_written, 0); + try!(self.write_header()); let n = try!(self.inner.write(buf)); self.crc.update(&buf[..n]); Ok(n) } fn flush(&mut self) -> io::Result<()> { + assert_eq!(self.crc_bytes_written, 0); self.inner.flush() } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try_nb!(self.try_finish()); + self.get_mut().shutdown() + } +} + impl Read for EncoderWriter { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.get_mut().read(buf) } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderWriter { +} + impl Drop for EncoderWriter { fn drop(&mut self) { - if self.inner.get_mut().is_some() { - let _ = self.do_finish(); + if self.inner.is_present() { + let _ = self.try_finish(); } } } diff --git a/src/lib.rs b/src/lib.rs index 84c624793..25decfedd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,26 @@ //! There are two helper traits provided: `FlateReader` and `FlateWriter`. //! These provide convenience methods for creating a decoder/encoder out of an //! already existing stream to chain construction. +//! +//! # Async I/O +//! +//! This crate optionally can support async I/O streams with the Tokio stack via +//! the `tokio` feature of this crate: +//! +//! ```toml +//! flate2 = { version = "0.2", features = ["tokio"] } +//! ``` +//! +//! All methods are internally capable of working with streams that may return +//! `ErrorKind::WouldBlock` when they're not ready to perform the particular +//! operation. +//! +//! Note that care needs to be taken when using these objects, however. The +//! Tokio runtime, in particular, requires that data is fully flushed before +//! dropping streams. For compatibility with blocking streams all streams are +//! flushed/written when they are dropped, and this is not always a suitable +//! time to perform I/O. If I/O streams are flushed before drop, however, then +//! these operations will be a noop. #![doc(html_root_url = "https://docs.rs/flate2/0.2")] #![deny(missing_docs)] @@ -34,6 +54,11 @@ extern crate libc; extern crate rand; #[cfg(test)] extern crate quickcheck; +#[cfg(feature = "tokio")] +#[macro_use] +extern crate tokio_io; +#[cfg(feature = "tokio")] +extern crate futures; use std::io::prelude::*; use std::io; diff --git a/src/zio.rs b/src/zio.rs index e003b404d..efb49f9c8 100644 --- a/src/zio.rs +++ b/src/zio.rs @@ -104,29 +104,35 @@ impl Writer { pub fn replace(&mut self, w: W) -> W { self.buf.truncate(0); - mem::replace(&mut self.obj, Some(w)).unwrap() + mem::replace(self.get_mut(), w) } - pub fn get_ref(&self) -> Option<&W> { - self.obj.as_ref() + pub fn get_ref(&self) -> &W { + self.obj.as_ref().unwrap() } - pub fn get_mut(&mut self) -> Option<&mut W> { - self.obj.as_mut() + pub fn get_mut(&mut self) -> &mut W { + self.obj.as_mut().unwrap() } - pub fn take_inner(&mut self) -> Option { - self.obj.take() + // Note that this should only be called if the outer object is just about + // to be consumed! + // + // (e.g. an implementation of `into_inner`) + pub fn take_inner(&mut self) -> W { + self.obj.take().unwrap() } - pub fn into_inner(mut self) -> W { - self.take_inner().unwrap() + pub fn is_present(&self) -> bool { + self.obj.is_some() } fn dump(&mut self) -> io::Result<()> { - if self.buf.len() > 0 { - try!(self.obj.as_mut().unwrap().write_all(&self.buf)); - self.buf.truncate(0); + // TODO: should manage this buffer not with `drain` but probably more of + // a deque-like strategy. + while self.buf.len() > 0 { + let n = try!(self.obj.as_mut().unwrap().write(&self.buf)); + self.buf.drain(..n); } Ok(()) } diff --git a/src/zlib.rs b/src/zlib.rs index 4c429eb3b..8babef74d 100644 --- a/src/zlib.rs +++ b/src/zlib.rs @@ -4,6 +4,11 @@ use std::io::prelude::*; use std::io; use std::mem; +#[cfg(feature = "tokio")] +use futures::Poll; +#[cfg(feature = "tokio")] +use tokio_io::{AsyncRead, AsyncWrite}; + use bufreader::BufReader; use zio; use {Compress, Decompress}; @@ -70,6 +75,19 @@ impl EncoderWriter { } } + /// Acquires a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Acquires a mutable reference to the underlying writer. + /// + /// Note that mutating the output/input state of the stream may corrupt this + /// object, so care must be taken when using this method. + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + /// Resets the state of this encoder entirely, swapping out the output /// stream for another. /// @@ -87,13 +105,61 @@ impl EncoderWriter { Ok(self.inner.replace(w)) } + /// Attempt to finish this output stream, writing out final chunks of data. + /// + /// Note that this function can only be used once data has finished being + /// written to the output stream. After this function is called then further + /// calls to `write` may result in a panic. + /// + /// # Panics + /// + /// Attempts to write data to this stream may result in a panic after this + /// function is called. + pub fn try_finish(&mut self) -> io::Result<()> { + self.inner.finish() + } + /// Consumes this encoder, flushing the output stream. /// - /// This will flush the underlying data stream and then return the contained - /// writer if the flush succeeded. + /// This will flush the underlying data stream, close off the compressed + /// stream and, if successful, return the contained writer. + /// + /// Note that this function may not be suitable to call in a situation where + /// the underlying stream is an asynchronous I/O stream. To finish a stream + /// the `try_finish` (or `shutdown`) method should be used instead. To + /// re-acquire ownership of a stream it is safe to call this method after + /// `try_finish` or `shutdown` has returned `Ok`. pub fn finish(mut self) -> io::Result { try!(self.inner.finish()); - Ok(self.inner.into_inner()) + Ok(self.inner.take_inner()) + } + + /// Consumes this encoder, flushing the output stream. + /// + /// This will flush the underlying data stream and then return the contained + /// writer if the flush succeeded. + /// The compressed stream will not closed but only flushed. This + /// means that obtained byte array can by extended by another deflated + /// stream. To close the stream add the two bytes 0x3 and 0x0. + pub fn flush_finish(mut self) -> io::Result { + try!(self.inner.flush()); + Ok(self.inner.take_inner()) + } + + /// Returns the number of bytes that have been written to this compresor. + /// + /// Note that not all bytes written to this object may be accounted for, + /// there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.inner.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been written yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.inner.data.total_out() } } @@ -107,12 +173,24 @@ impl Write for EncoderWriter { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try_nb!(self.try_finish()); + self.get_mut().shutdown() + } +} + impl Read for EncoderWriter { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.get_mut().unwrap().read(buf) + self.get_mut().read(buf) } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderWriter { +} + impl EncoderReader { /// Creates a new encoder which will read uncompressed data from the given /// stream and emit the compressed stream. @@ -129,6 +207,9 @@ impl EncoderReader { /// the input stream with the one provided, returning the previous input /// stream. Future data read from this encoder will be the compressed /// version of `r`'s data. + /// + /// Note that there may be currently buffered data when this function is + /// called, and in that case the buffered data is discarded. pub fn reset(&mut self, r: R) -> R { self.inner.data.reset(); self.inner.obj.reset(r) @@ -148,9 +229,29 @@ impl EncoderReader { } /// Consumes this encoder, returning the underlying reader. + /// + /// Note that there may be buffered bytes which are not re-acquired as part + /// of this transition. It's recommended to only call this function after + /// EOF has been reached. pub fn into_inner(self) -> R { self.inner.into_inner().into_inner() } + + /// Returns the number of bytes that have been read into this compressor. + /// + /// Note that not all bytes read from the underlying object may be accounted + /// for, there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.inner.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been read yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.inner.data.total_out() + } } impl Read for EncoderReader { @@ -159,7 +260,11 @@ impl Read for EncoderReader { } } -impl Write for EncoderReader { +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderReader { +} + +impl Write for EncoderReader { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) } @@ -169,6 +274,13 @@ impl Write for EncoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderReader { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl EncoderReaderBuf { /// Creates a new encoder which will read uncompressed data from the given /// stream and emit the compressed stream. @@ -191,7 +303,7 @@ impl EncoderReaderBuf { mem::replace(&mut self.obj, r) } - /// Acquires a reference to the underlying stream + /// Acquires a reference to the underlying reader pub fn get_ref(&self) -> &R { &self.obj } @@ -208,6 +320,22 @@ impl EncoderReaderBuf { pub fn into_inner(self) -> R { self.obj } + + /// Returns the number of bytes that have been read into this compressor. + /// + /// Note that not all bytes read from the underlying object may be accounted + /// for, there may still be some active buffering. + pub fn total_in(&self) -> u64 { + self.data.total_in() + } + + /// Returns the number of bytes that the compressor has produced. + /// + /// Note that not all bytes may have been read yet, some may still be + /// buffered. + pub fn total_out(&self) -> u64 { + self.data.total_out() + } } impl Read for EncoderReaderBuf { @@ -216,6 +344,10 @@ impl Read for EncoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncRead for EncoderReaderBuf { +} + impl Write for EncoderReaderBuf { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -226,6 +358,13 @@ impl Write for EncoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for EncoderReaderBuf { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderReader { /// Creates a new decoder which will decompress data read from the given /// stream. @@ -250,6 +389,9 @@ impl DecoderReader { /// input stream with the one provided, returning the previous input /// stream. Future data read from this decoder will be the decompressed /// version of `r`'s data. + /// + /// Note that there may be currently buffered data when this function is + /// called, and in that case the buffered data is discarded. pub fn reset(&mut self, r: R) -> R { self.inner.data = Decompress::new(true); self.inner.obj.reset(r) @@ -269,6 +411,10 @@ impl DecoderReader { } /// Consumes this decoder, returning the underlying reader. + /// + /// Note that there may be buffered bytes which are not re-acquired as part + /// of this transition. It's recommended to only call this function after + /// EOF has been reached. pub fn into_inner(self) -> R { self.inner.into_inner().into_inner() } @@ -293,6 +439,10 @@ impl Read for DecoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderReader { +} + impl Write for DecoderReader { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -303,6 +453,13 @@ impl Write for DecoderReader { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderReader { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderReaderBuf { /// Creates a new decoder which will decompress data read from the given /// stream. @@ -363,6 +520,10 @@ impl Read for DecoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderReaderBuf { +} + impl Write for DecoderReaderBuf { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -373,6 +534,13 @@ impl Write for DecoderReaderBuf { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderReaderBuf { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + impl DecoderWriter { /// Creates a new decoder which will write uncompressed data to the stream. /// @@ -384,6 +552,19 @@ impl DecoderWriter { } } + /// Acquires a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Acquires a mutable reference to the underlying writer. + /// + /// Note that mutating the output/input state of the stream may corrupt this + /// object, so care must be taken when using this method. + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + /// Resets the state of this decoder entirely, swapping out the output /// stream for another. /// @@ -397,13 +578,33 @@ impl DecoderWriter { Ok(self.inner.replace(w)) } + /// Attempt to finish this output stream, writing out final chunks of data. + /// + /// Note that this function can only be used once data has finished being + /// written to the output stream. After this function is called then further + /// calls to `write` may result in a panic. + /// + /// # Panics + /// + /// Attempts to write data to this stream may result in a panic after this + /// function is called. + pub fn try_finish(&mut self) -> io::Result<()> { + self.inner.finish() + } + /// Consumes this encoder, flushing the output stream. /// /// This will flush the underlying data stream and then return the contained /// writer if the flush succeeded. + /// + /// Note that this function may not be suitable to call in a situation where + /// the underlying stream is an asynchronous I/O stream. To finish a stream + /// the `try_finish` (or `shutdown`) method should be used instead. To + /// re-acquire ownership of a stream it is safe to call this method after + /// `try_finish` or `shutdown` has returned `Ok`. pub fn finish(mut self) -> io::Result { try!(self.inner.finish()); - Ok(self.inner.into_inner()) + Ok(self.inner.take_inner()) } /// Returns the number of bytes that the decompressor has consumed for @@ -432,12 +633,24 @@ impl Write for DecoderWriter { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for DecoderWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try_nb!(self.inner.finish()); + self.inner.get_mut().shutdown() + } +} + impl Read for DecoderWriter { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.get_mut().unwrap().read(buf) + self.inner.get_mut().read(buf) } } +#[cfg(feature = "tokio")] +impl AsyncRead for DecoderWriter { +} + #[cfg(test)] mod tests { use std::io::prelude::*; diff --git a/tests/tokio.rs b/tests/tokio.rs new file mode 100644 index 000000000..47fb55c12 --- /dev/null +++ b/tests/tokio.rs @@ -0,0 +1,125 @@ +#![cfg(feature = "tokio")] + +extern crate tokio_core; +extern crate flate2; +extern crate tokio_io; +extern crate futures; +extern crate rand; + +use std::thread; +use std::net::{Shutdown, TcpListener}; +use std::io::{Read, Write}; + +use flate2::Compression; +use flate2::read; +use flate2::write; +use futures::Future; +use rand::{Rng, thread_rng}; +use tokio_core::net::TcpStream; +use tokio_core::reactor::Core; +use tokio_io::AsyncRead; +use tokio_io::io::{copy, shutdown}; + +#[test] +fn tcp_stream_echo_pattern() { + const N: u8 = 16; + const M: usize = 16 * 1024; + + let mut core = Core::new().unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let t = thread::spawn(move || { + let a = listener.accept().unwrap().0; + let b = a.try_clone().unwrap(); + + let t = thread::spawn(move || { + let mut b = read::DeflateDecoder::new(b); + let mut buf = [0; M]; + for i in 0..N { + b.read_exact(&mut buf).unwrap(); + for byte in buf.iter() { + assert_eq!(*byte, i); + } + } + + assert_eq!(b.read(&mut buf).unwrap(), 0); + }); + + let mut a = write::ZlibEncoder::new(a, Compression::Default); + for i in 0..N { + let buf = [i; M]; + a.write_all(&buf).unwrap(); + } + a.finish().unwrap() + .shutdown(Shutdown::Write).unwrap(); + + t.join().unwrap(); + }); + + let handle = core.handle(); + let stream = TcpStream::connect(&addr, &handle); + let copy = stream.and_then(|s| { + let (a, b) = s.split(); + let a = read::ZlibDecoder::new(a); + let b = write::DeflateEncoder::new(b, Compression::Default); + copy(a, b) + }).then(|result| { + let (amt, _a, b) = result.unwrap(); + assert_eq!(amt, (N as u64) * (M as u64)); + shutdown(b).map(|_| ()) + }); + + core.run(copy).unwrap(); + t.join().unwrap(); +} + +#[test] +fn echo_random() { + let v = thread_rng().gen_iter::().take(1024 * 1024).collect::>(); + let mut core = Core::new().unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let v2 = v.clone(); + let t = thread::spawn(move || { + let a = listener.accept().unwrap().0; + let b = a.try_clone().unwrap(); + + let mut v3 = v2.clone(); + let t = thread::spawn(move || { + let mut b = read::DeflateDecoder::new(b); + let mut buf = [0; 1024]; + while v3.len() > 0 { + let n = b.read(&mut buf).unwrap(); + for (actual, expected) in buf[..n].iter().zip(&v3) { + assert_eq!(*actual, *expected); + } + v3.drain(..n); + } + + assert_eq!(b.read(&mut buf).unwrap(), 0); + }); + + let mut a = write::ZlibEncoder::new(a, Compression::Default); + a.write_all(&v2).unwrap(); + a.finish().unwrap() + .shutdown(Shutdown::Write).unwrap(); + + t.join().unwrap(); + }); + + let handle = core.handle(); + let stream = TcpStream::connect(&addr, &handle); + let copy = stream.and_then(|s| { + let (a, b) = s.split(); + let a = read::ZlibDecoder::new(a); + let b = write::DeflateEncoder::new(b, Compression::Default); + copy(a, b) + }).then(|result| { + let (amt, _a, b) = result.unwrap(); + assert_eq!(amt, v.len() as u64); + shutdown(b).map(|_| ()) + }); + + core.run(copy).unwrap(); + t.join().unwrap(); +}