Skip to content

Commit

Permalink
Merge the mem/stream modules
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Feb 7, 2016
1 parent 42566e0 commit 3cc2956
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 244 deletions.
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use std::io;

pub use gz::Builder as GzBuilder;
pub use gz::Header as GzHeader;
pub use mem::{Compress, Decompress, DataError, Status};
pub use stream::Flush;
pub use mem::{Compress, Decompress, DataError, Status, Flush};

mod bufreader;
mod crc;
Expand All @@ -48,7 +47,6 @@ mod ffi;
mod gz;
mod zio;
mod mem;
mod stream;
mod zlib;

/// Types which operate over `Read` streams, both encoders and decoders for
Expand Down
200 changes: 180 additions & 20 deletions src/mem.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::error::Error;
use std::fmt;
use std::marker;
use std::mem;

use libc::c_int;
use libc::{c_int, c_uint};

use {Compression, Flush};
use Compression;
use ffi;
use stream::{self, Stream};

/// Raw in-memory compression stream for blocks of data.
///
Expand All @@ -17,7 +18,7 @@ use stream::{self, Stream};
/// It is recommended to use the I/O stream adaptors over this type as they're
/// easier to use.
pub struct Compress {
inner: Stream<stream::Compress>,
inner: Stream<DirCompress>,
}

/// Raw in-memory decompression stream for blocks of data.
Expand All @@ -30,7 +31,75 @@ pub struct Compress {
/// It is recommended to use the I/O stream adaptors over this type as they're
/// easier to use.
pub struct Decompress {
inner: Stream<stream::Decompress>,
inner: Stream<DirDecompress>,
}

struct Stream<D: Direction> {
raw: ffi::mz_stream,
_marker: marker::PhantomData<D>,
}

unsafe impl<D: Direction> Send for Stream<D> {}
unsafe impl<D: Direction> Sync for Stream<D> {}

trait Direction {
unsafe fn destroy(stream: *mut ffi::mz_stream) -> c_int;
}

enum DirCompress {}
enum DirDecompress {}

/// Values which indicate the form of flushing to be used when compressing or
/// decompressing in-memory data.
pub enum Flush {
/// A typical parameter for passing to compression/decompression functions,
/// this indicates that the underlying stream to decide how much data to
/// accumulate before producing output in order to maximize compression.
None = ffi::MZ_NO_FLUSH as isize,

/// All pending output is flushed to the output buffer and the output is
/// aligned on a byte boundary so that the decompressor can get all input
/// data available so far.
///
/// Flushing may degrade compression for some compression algorithms and so
/// it should only be used when necessary. This will complete the current
/// deflate block and follow it with an empty stored block.
Sync = ffi::MZ_SYNC_FLUSH as isize,

/// All pending output is flushed to the output buffer, but the output is
/// not aligned to a byte boundary.
///
/// All of the input data so far will be available to the decompressor (as
/// with `Flush::Sync`. This completes the current deflate block and follows
/// it with an empty fixed codes block that is 10 bites long, and it assures
/// that enough bytes are output in order for the decompessor to finish the
/// block before the empty fixed code block.
Partial = ffi::MZ_PARTIAL_FLUSH as isize,

/// A deflate block is completed and emitted, as for `Flush::Sync`, but the
/// output is not aligned on a byte boundary and up to seven vits of the
/// current block are held to be written as the next byte after the next
/// deflate block is completed.
///
/// In this case the decompressor may not be provided enough bits at this
/// point in order to complete decompression of the data provided so far to
/// the compressor, it may need to wait for the next block to be emitted.
/// This is for advanced applications that need to control the emission of
/// deflate blocks.
Block = ffi::MZ_BLOCK as isize,

/// All output is flushed as with `Flush::Sync` and the compression state is
/// reset so decompression can restart from this point if previous
/// compressed data has been damaged or if random access is desired.
///
/// Using this option too often can seriously degrade compression.
Full = ffi::MZ_FULL_FLUSH as isize,

/// Pending input is processed and pending output is flushed.
///
/// The return value may indicate that the stream is not yet done and more
/// data has yet to be processed.
Finish = ffi::MZ_FINISH as isize,
}

/// Error returned when a decompression object finds that the input stream of
Expand Down Expand Up @@ -73,26 +142,46 @@ impl Compress {
/// to be performed, and the `zlib_header` argument indicates whether the
/// output data should have a zlib header or not.
pub fn new(level: Compression, zlib_header: bool) -> Compress {
Compress { inner: Stream::new_compress(level, !zlib_header) }
unsafe {
let mut state: ffi::mz_stream = mem::zeroed();
let ret = ffi::mz_deflateInit2(&mut state,
level as c_int,
ffi::MZ_DEFLATED,
if zlib_header {
ffi::MZ_DEFAULT_WINDOW_BITS
} else {
-ffi::MZ_DEFAULT_WINDOW_BITS
},
9,
ffi::MZ_DEFAULT_STRATEGY);
debug_assert_eq!(ret, 0);
Compress {
inner: Stream {
raw: state,
_marker: marker::PhantomData,
},
}
}
}

/// Returns the total number of input bytes which have been processed by
/// this compression object.
pub fn total_in(&self) -> u64 {
self.inner.total_in()
self.inner.raw.total_in as u64
}

/// Returns the total number of output bytes which have been produced by
/// this compression object.
pub fn total_out(&self) -> u64 {
self.inner.total_out()
self.inner.raw.total_out as u64
}

/// Quickly resets this compressor without having to reallocate anything.
///
/// This is equivalent to dropping this object and then creating a new one.
pub fn reset(&mut self) {
assert_eq!(self.inner.reset(), ffi::MZ_OK);
let rc = unsafe { ffi::mz_deflateReset(&mut self.inner.raw) };
assert_eq!(rc, ffi::MZ_OK);
}

/// Compresses the input data into the output, consuming only as much
Expand All @@ -107,8 +196,14 @@ impl Compress {
output: &mut [u8],
flush: Flush)
-> Status {
let rc = self.inner.compress(input, output, flush);
self.rc(rc)
self.inner.raw.next_in = input.as_ptr() as *mut _;
self.inner.raw.avail_in = input.len() as c_uint;
self.inner.raw.next_out = output.as_mut_ptr();
self.inner.raw.avail_out = output.len() as c_uint;
unsafe {
let rc = ffi::mz_deflate(&mut self.inner.raw, flush as c_int);
self.rc(rc)
}
}

/// Compresses the input data into the extra space of the output, consuming
Expand All @@ -124,8 +219,21 @@ impl Compress {
output: &mut Vec<u8>,
flush: Flush)
-> Status {
let rc = self.inner.compress_vec(input, output, flush);
self.rc(rc)
let cap = output.capacity();
let len = output.len();
self.inner.raw.avail_in = input.len() as c_uint;
self.inner.raw.next_in = input.as_ptr() as *mut _;
self.inner.raw.avail_out = (cap - len) as c_uint;

unsafe {
self.inner.raw.next_out = output.as_mut_ptr().offset(len as isize);

let before = self.total_out();
let rc = ffi::mz_deflate(&mut self.inner.raw, flush as c_int);
let diff = (self.total_out() - before) as usize;
output.set_len(len + diff);
self.rc(rc)
}
}

fn rc(&self, rc: c_int) -> Status {
Expand All @@ -144,19 +252,34 @@ impl Decompress {
/// The `zlib_header` argument indicates whether the input data is expected
/// to have a zlib header or not.
pub fn new(zlib_header: bool) -> Decompress {
Decompress { inner: Stream::new_decompress(!zlib_header) }
unsafe {
let mut state: ffi::mz_stream = mem::zeroed();
let ret = ffi::mz_inflateInit2(&mut state,
if zlib_header {
ffi::MZ_DEFAULT_WINDOW_BITS
} else {
-ffi::MZ_DEFAULT_WINDOW_BITS
});
debug_assert_eq!(ret, 0);
Decompress {
inner: Stream {
raw: state,
_marker: marker::PhantomData,
},
}
}
}

/// Returns the total number of input bytes which have been processed by
/// this decompression object.
pub fn total_in(&self) -> u64 {
self.inner.total_in()
self.inner.raw.total_in as u64
}

/// Returns the total number of output bytes which have been produced by
/// this decompression object.
pub fn total_out(&self) -> u64 {
self.inner.total_out()
self.inner.raw.total_out as u64
}

/// Decompresses the input data into the output, consuming only as much
Expand All @@ -178,8 +301,14 @@ impl Decompress {
output: &mut [u8],
flush: Flush)
-> Result<Status, DataError> {
let rc = self.inner.decompress(input, output, flush);
self.rc(rc)
self.inner.raw.next_in = input.as_ptr() as *mut u8;
self.inner.raw.avail_in = input.len() as c_uint;
self.inner.raw.next_out = output.as_mut_ptr();
self.inner.raw.avail_out = output.len() as c_uint;
unsafe {
let rc = ffi::mz_inflate(&mut self.inner.raw, flush as c_int);
self.rc(rc)
}
}

/// Decompresses the input data into the extra space in the output vector
Expand All @@ -195,8 +324,20 @@ impl Decompress {
output: &mut Vec<u8>,
flush: Flush)
-> Result<Status, DataError> {
let rc = self.inner.decompress_vec(input, output, flush);
self.rc(rc)
let cap = output.capacity();
let len = output.len();
self.inner.raw.next_in = input.as_ptr() as *mut _;
self.inner.raw.avail_in = input.len() as c_uint;
self.inner.raw.avail_out = (cap - len) as c_uint;

unsafe {
self.inner.raw.next_out = output.as_mut_ptr().offset(len as isize);
let before = self.total_out();
let rc = ffi::mz_inflate(&mut self.inner.raw, flush as c_int);
let diff = (self.total_out() - before) as usize;
output.set_len(len + diff);
self.rc(rc)
}
}

fn rc(&self, rc: c_int) -> Result<Status, DataError> {
Expand All @@ -219,3 +360,22 @@ impl fmt::Display for DataError {
self.description().fmt(f)
}
}

impl Direction for DirCompress {
unsafe fn destroy(stream: *mut ffi::mz_stream) -> c_int {
ffi::mz_deflateEnd(stream)
}
}
impl Direction for DirDecompress {
unsafe fn destroy(stream: *mut ffi::mz_stream) -> c_int {
ffi::mz_inflateEnd(stream)
}
}

impl<D: Direction> Drop for Stream<D> {
fn drop(&mut self) {
unsafe {
let _ = D::destroy(&mut self.raw);
}
}
}
Loading

0 comments on commit 3cc2956

Please sign in to comment.