Skip to content

Commit

Permalink
Add a bufread module for buffered types
Browse files Browse the repository at this point in the history
Right now all stream types in `read` have an unconditional buffer between the
underlying stream and what we're passing down to the compression layer. This
buffer, however, is redundant for types that implement the `BufRead` trait.

This commit adds explicit support for `BufRead` types which have their own local
buffer, and then all other types are built on top of these types. A local
implementation of `BufReader` is vendored for the `reset()` method.
  • Loading branch information
alexcrichton committed Feb 7, 2016
1 parent bb5d7c9 commit b91748a
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 20 deletions.
83 changes: 83 additions & 0 deletions src/bufreader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::cmp;
use std::io;
use std::io::prelude::*;
use std::mem;

pub struct BufReader<R> {
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}

impl<R: Read> BufReader<R> {
pub fn new(inner: R) -> BufReader<R> {
BufReader::with_buf(vec![0; 32 * 1024], inner)
}

pub fn with_buf(buf: Vec<u8>, inner: R) -> BufReader<R> {
BufReader {
inner: inner,
buf: buf.into_boxed_slice(),
pos: 0,
cap: 0,
}
}

pub fn get_ref(&self) -> &R {
&self.inner
}

pub fn into_inner(self) -> R {
self.inner
}

pub fn reset(&mut self, inner: R) -> R {
self.pos = 0;
self.cap = 0;
mem::replace(&mut self.inner, inner)
}
}

impl<R: Read> Read for BufReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() {
return self.inner.read(buf);
}
let nread = {
let mut rem = try!(self.fill_buf());
try!(rem.read(buf))
};
self.consume(nread);
Ok(nread)
}
}

impl<R: Read> BufRead for BufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
if self.pos == self.cap {
self.cap = try!(self.inner.read(&mut self.buf));
self.pos = 0;
}
Ok(&self.buf[self.pos..self.cap])
}

fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt, self.cap);
}
}
122 changes: 114 additions & 8 deletions src/deflate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
use std::io::prelude::*;
use std::io;
use std::mem;

use {Compress, Decompress};
use bufreader::BufReader;
use raw;

/// A DEFLATE encoder, or compressor.
Expand All @@ -18,15 +21,33 @@ pub struct EncoderWriter<W: Write> {
/// This structure implements a `Read` interface and will read uncompressed
/// data from an underlying stream and emit a stream of compressed data.
pub struct EncoderReader<R: Read> {
inner: raw::EncoderReader<R>,
inner: EncoderReaderBuf<BufReader<R>>,
}

/// A DEFLATE encoder, or compressor.
///
/// This structure implements a `BufRead` interface and will read uncompressed
/// data from an underlying stream and emit a stream of compressed data.
pub struct EncoderReaderBuf<R: BufRead> {
obj: R,
data: Compress,
}

/// A DEFLATE decoder, or decompressor.
///
/// This structure implements a `Read` interface and takes a stream of
/// compressed data as input, providing the decompressed data when read from.
pub struct DecoderReader<R: Read> {
inner: raw::DecoderReader<R>,
inner: DecoderReaderBuf<BufReader<R>>,
}

/// A DEFLATE decoder, or decompressor.
///
/// This structure implements a `BufRead` interface and takes a stream of
/// compressed data as input, providing the decompressed data when read from.
pub struct DecoderReaderBuf<R: BufRead> {
obj: R,
data: Decompress,
}

/// A DEFLATE decoder, or decompressor.
Expand Down Expand Up @@ -93,7 +114,7 @@ impl<R: Read> EncoderReader<R> {
/// stream and emit the compressed stream.
pub fn new(r: R, level: ::Compression) -> EncoderReader<R> {
EncoderReader {
inner: raw::EncoderReader::new(r, level, true, vec![0; 32 * 1024]),
inner: EncoderReaderBuf::new(BufReader::new(r), level),
}
}

Expand All @@ -105,12 +126,13 @@ impl<R: Read> EncoderReader<R> {
/// stream. Future data read from this encoder will be the compressed
/// version of `r`'s data.
pub fn reset(&mut self, r: R) -> R {
self.inner.reset(r)
self.inner.data.reset();
self.inner.obj.reset(r)
}

/// Consumes this encoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.inner.into_inner()
self.inner.into_inner().into_inner()
}
}

Expand All @@ -120,6 +142,40 @@ impl<R: Read> Read for EncoderReader<R> {
}
}

impl<R: BufRead> EncoderReaderBuf<R> {
/// Creates a new encoder which will read uncompressed data from the given
/// stream and emit the compressed stream.
pub fn new(r: R, level: ::Compression) -> EncoderReaderBuf<R> {
EncoderReaderBuf {
obj: r,
data: Compress::new(level, false),
}
}

/// Resets the state of this encoder entirely, swapping out the input
/// stream for another.
///
/// This function will reset the internal state of this encoder and replace
/// the input stream with the one provided, returning the previous input
/// stream. Future data read from this encoder will be the compressed
/// version of `r`'s data.
pub fn reset(&mut self, r: R) -> R {
self.data.reset();
mem::replace(&mut self.obj, r)
}

/// Consumes this encoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.obj
}
}

impl<R: BufRead> Read for EncoderReaderBuf<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
::zio::read(&mut self.obj, &mut self.data, buf)
}
}

impl<R: Read> DecoderReader<R> {
/// Creates a new decoder which will decompress data read from the given
/// stream.
Expand All @@ -132,7 +188,9 @@ impl<R: Read> DecoderReader<R> {
/// Note that the capacity of the intermediate buffer is never increased,
/// and it is recommended for it to be large.
pub fn new_with_buf(r: R, buf: Vec<u8>) -> DecoderReader<R> {
DecoderReader { inner: raw::DecoderReader::new(r, true, buf) }
DecoderReader {
inner: DecoderReaderBuf::new(BufReader::with_buf(buf, r))
}
}

/// Resets the state of this decoder entirely, swapping out the input
Expand All @@ -143,12 +201,13 @@ impl<R: Read> DecoderReader<R> {
/// stream. Future data read from this decoder will be the decompressed
/// version of `r`'s data.
pub fn reset(&mut self, r: R) -> R {
self.inner.reset(r, true)
self.inner.data = Decompress::new(false);
self.inner.obj.reset(r)
}

/// Consumes this decoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.inner.into_inner()
self.inner.into_inner().into_inner()
}

/// Returns the number of bytes that the decompressor has consumed.
Expand All @@ -171,6 +230,53 @@ impl<R: Read> Read for DecoderReader<R> {
}
}

impl<R: BufRead> DecoderReaderBuf<R> {
/// Creates a new decoder which will decompress data read from the given
/// stream.
pub fn new(r: R) -> DecoderReaderBuf<R> {
DecoderReaderBuf {
obj: r,
data: Decompress::new(false),
}
}

/// Resets the state of this decoder entirely, swapping out the input
/// stream for another.
///
/// This will reset the internal state of this decoder and replace the
/// input stream with the one provided, returning the previous input
/// stream. Future data read from this decoder will be the decompressed
/// version of `r`'s data.
pub fn reset(&mut self, r: R) -> R {
self.data = Decompress::new(false);
mem::replace(&mut self.obj, r)
}

/// Consumes this decoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.obj
}

/// Returns the number of bytes that the decompressor has consumed.
///
/// Note that this will likely be smaller than what the decompressor
/// actually read from the underlying stream due to buffering.
pub fn total_in(&self) -> u64 {
self.data.total_in()
}

/// Returns the number of bytes that the decompressor has produced.
pub fn total_out(&self) -> u64 {
self.data.total_out()
}
}

impl<R: BufRead> Read for DecoderReaderBuf<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
::zio::read(&mut self.obj, &mut self.data, into)
}
}

impl<W: Write> DecoderWriter<W> {
/// Creates a new decoder which will write uncompressed data to the stream.
///
Expand Down
19 changes: 15 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ pub use gz::Header as GzHeader;
pub use mem::{Compress, Decompress, DataError, Status};
pub use stream::Flush;

mod bufreader;
mod crc;
mod deflate;
mod ffi;
mod gz;
mod zio;
mod mem;
mod raw;
mod stream;
mod zlib;
mod mem;
mod ffi;

/// Types which operate over `Reader` streams, both encoders and decoders for
/// Types which operate over `Read` streams, both encoders and decoders for
/// various formats.
pub mod read {
pub use deflate::EncoderReader as DeflateEncoder;
Expand All @@ -61,7 +63,7 @@ pub mod read {
pub use gz::DecoderReader as GzDecoder;
}

/// Types which operate over `Writer` streams, both encoders and decoders for
/// Types which operate over `Write` streams, both encoders and decoders for
/// various formats.
pub mod write {
pub use deflate::EncoderWriter as DeflateEncoder;
Expand All @@ -71,6 +73,15 @@ pub mod write {
pub use gz::EncoderWriter as GzEncoder;
}

/// Types which operate over `BufRead` streams, both encoders and decoders for
/// various formats.
pub mod bufread {
pub use deflate::EncoderReaderBuf as DeflateEncoder;
pub use deflate::DecoderReaderBuf as DeflateDecoder;
pub use zlib::EncoderReaderBuf as ZlibEncoder;
pub use zlib::DecoderReaderBuf as ZlibDecoder;
}

fn _assert_send_sync() {
fn _assert_send_sync<T: Send + Sync>() {}

Expand Down
65 changes: 65 additions & 0 deletions src/zio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::io;
use std::io::BufRead;

use {Decompress, Compress, Status, Flush, DataError};

pub trait ReadData {
fn total_in(&self) -> u64;
fn total_out(&self) -> u64;
fn run(&mut self, input: &[u8], output: &mut [u8], flush: Flush)
-> Result<Status, DataError>;
}

impl ReadData for Compress {
fn total_in(&self) -> u64 { self.total_in() }
fn total_out(&self) -> u64 { self.total_out() }
fn run(&mut self, input: &[u8], output: &mut [u8], flush: Flush)
-> Result<Status, DataError> {
Ok(self.compress(input, output, flush))
}
}

impl ReadData for Decompress {
fn total_in(&self) -> u64 { self.total_in() }
fn total_out(&self) -> u64 { self.total_out() }
fn run(&mut self, input: &[u8], output: &mut [u8], flush: Flush)
-> Result<Status, DataError> {
self.decompress(input, output, flush)
}
}

pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
where R: BufRead, D: ReadData
{
loop {
let (read, consumed, ret, eof);
{
let input = try!(obj.fill_buf());
eof = input.is_empty();
let before_out = data.total_out();
let before_in = data.total_in();
let flush = if eof {Flush::Finish} else {Flush::None};
ret = data.run(input, dst, flush);
read = (data.total_out() - before_out) as usize;
consumed = (data.total_in() - before_in) as usize;
}
obj.consume(consumed);

match ret {
// If we haven't ready any data and we haven't hit EOF yet,
// then we need to keep asking for more data because if we
// return that 0 bytes of data have been read then it will
// be interpreted as EOF.
Ok(Status::Ok) |
Ok(Status::BufError) if read == 0 && !eof && dst.len() > 0 => {
continue
}
Ok(Status::Ok) |
Ok(Status::BufError) |
Ok(Status::StreamEnd) => return Ok(read),

Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput,
"corrupt deflate stream"))
}
}
}
Loading

0 comments on commit b91748a

Please sign in to comment.