Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async GzDecoder #185

Merged
merged 12 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ quickcheck = { version = "0.7", default-features = false }
tokio-io = "0.1.11"
tokio-tcp = "0.1.3"
tokio-threadpool = "0.1.10"
futures = "0.1"

[features]
default = ["miniz-sys"]
Expand Down
331 changes: 268 additions & 63 deletions src/gz/bufread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,154 @@ pub(crate) fn read_gz_header<R: Read>(r: &mut R) -> io::Result<GzHeader> {
})
}

#[derive(Debug)]
pub(crate) enum State {
Header(usize, [u8; 10]), // pos, buf
ExtraLen(usize, [u8; 2]), // pos, buf
Extra(usize), // pos
FileName,
Comment,
Crc(u16, usize, [u8; 2]) // crc, pos, buf
}

pub(crate) fn read_gz_header2<R: Read>(r: &mut CrcReader<R>, state: &mut State, header: &mut GzHeader, flag: &mut u8) -> io::Result<()> {
enum Next {
None,
ExtraLen,
Extra,
FileName,
Comment,
Crc
}

let mut next = Next::None;

loop {
match state {
State::Header(pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let id1 = buf[0];
let id2 = buf[1];
if id1 != 0x1f || id2 != 0x8b {
return Err(bad_header());
}
let cm = buf[2];
if cm != 8 {
return Err(bad_header());
}

let flg = buf[3];
let mtime = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
let _xfl = buf[8];
let os = buf[9];

header.operating_system = os;
header.mtime = mtime;
*flag = flg;

next = Next::ExtraLen;
},
State::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName,
State::ExtraLen(pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let xlen = (buf[0] as u16) | ((buf[1] as u16) << 8);
header.extra = Some(vec![0; xlen as usize]);
if xlen != 0 {
next = Next::Extra;
} else {
next = Next::FileName;
}
},
State::Extra(pos) => if let Some(extra) = &mut header.extra {
if *pos < extra.len() {
let len = r.read(&mut extra[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
next = Next::FileName;
}
} else {
next = Next::FileName; // unreachable
},
State::FileName if *flag & FNAME == 0 => next = Next::Comment,
State::FileName => {
let filename = header.filename.get_or_insert_with(Vec::new);
// wow this is slow
for byte in r.by_ref().bytes() {
let byte = byte?;
if byte == 0 {
break;
}
filename.push(byte);
}
next = Next::Comment;
},
State::Comment if *flag & FCOMMENT == 0 => next = Next::Crc,
State::Comment => {
let comment = header.comment.get_or_insert_with(Vec::new);
// wow this is slow
for byte in r.by_ref().bytes() {
let byte = byte?;
if byte == 0 {
break;
}
comment.push(byte);
}
next = Next::Crc
},
State::Crc(..) if *flag & FHCRC == 0 => return Ok(()),
State::Crc(calced_crc, pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let stored_crc = (buf[0] as u16) | ((buf[1] as u16) << 8);
if *calced_crc != stored_crc {
return Err(corrupt());
} else {
return Ok(())
}
}
};

match next {
Next::ExtraLen => *state = State::ExtraLen(0, [0; 2]),
Next::Extra => *state = State::Extra(0),
Next::FileName => *state = State::FileName,
Next::Comment => *state = State::Comment,
Next::Crc => *state = State::Crc(r.crc().sum() as u16, 0, [0; 2]),
Next::None => ()
}

next = Next::None;
}
}

/// A gzip streaming encoder
///
/// This structure exposes a [`BufRead`] interface that will read uncompressed data
Expand Down Expand Up @@ -211,6 +359,19 @@ impl<R> GzEncoder<R> {
}
}

#[inline]
fn finish(buf: &[u8; 8]) -> (u32, u32) {
let crc = ((buf[0] as u32) << 0)
| ((buf[1] as u32) << 8)
| ((buf[2] as u32) << 16)
| ((buf[3] as u32) << 24);
let amt = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
(crc, amt)
}

impl<R: BufRead> Read for GzEncoder<R> {
fn read(&mut self, mut into: &mut [u8]) -> io::Result<usize> {
let mut amt = 0;
Expand Down Expand Up @@ -280,104 +441,148 @@ impl<R: BufRead + Write> Write for GzEncoder<R> {
/// ```
#[derive(Debug)]
pub struct GzDecoder<R> {
inner: CrcReader<deflate::bufread::DeflateDecoder<R>>,
header: Option<io::Result<GzHeader>>,
finished: bool,
inner: Inner<R>,
header: GzHeader
}

#[derive(Debug)]
enum Inner<R> {
Header {
reader: CrcReader<R>,
state: State,
flag: u8
},
Body(CrcReader<deflate::bufread::DeflateDecoder<R>>),
Finished {
reader: CrcReader<deflate::bufread::DeflateDecoder<R>>,
pos: usize,
buf: [u8; 8]
},
None
}

impl<R: BufRead> GzDecoder<R> {
/// Creates a new decoder from the given reader, immediately parsing the
/// gzip header.
pub fn new(mut r: R) -> GzDecoder<R> {
let header = read_gz_header(&mut r);

let flate = deflate::bufread::DeflateDecoder::new(r);
pub fn new(r: R) -> GzDecoder<R> {
GzDecoder {
inner: CrcReader::new(flate),
header: Some(header),
finished: false,
}
}

fn finish(&mut self) -> io::Result<()> {
if self.finished {
return Ok(());
}
let ref mut buf = [0u8; 8];
{
let mut len = 0;

while len < buf.len() {
match self.inner.get_mut().get_mut().read(&mut buf[len..])? {
0 => return Err(corrupt()),
n => len += n,
}
}
}

let crc = ((buf[0] as u32) << 0)
| ((buf[1] as u32) << 8)
| ((buf[2] as u32) << 16)
| ((buf[3] as u32) << 24);
let amt = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
if crc != self.inner.crc().sum() {
return Err(corrupt());
}
if amt != self.inner.crc().amount() {
return Err(corrupt());
inner: Inner::Header {
reader: CrcReader::new(r),
state: State::Header(0, [0; 10]),
flag: 0
},
header: GzHeader::default(),
}
self.finished = true;
Ok(())
}
}

impl<R> GzDecoder<R> {
/// Returns the header associated with this stream, if it was valid
pub fn header(&self) -> Option<&GzHeader> {
self.header.as_ref().and_then(|h| h.as_ref().ok())
match self.inner {
Inner::Body(_) | Inner::Finished { .. } => Some(&self.header),
_ => None
}
}

/// Acquires a reference to the underlying reader.
pub fn get_ref(&self) -> &R {
self.inner.get_ref().get_ref()
match &self.inner {
Inner::Header { reader, .. } => reader.get_ref(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_ref().get_ref(),
_ => unreachable!()
}
}

/// Acquires a mutable reference to the underlying stream.
///
/// Note that mutation of the stream may result in surprising results if
/// this encoder is continued to be used.
pub fn get_mut(&mut self) -> &mut R {
self.inner.get_mut().get_mut()
match &mut self.inner {
Inner::Header { reader, .. } => reader.get_mut(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_mut().get_mut(),
_ => unreachable!()
}
}

/// Consumes this decoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.inner.into_inner().into_inner()
match self.inner {
Inner::Header { reader, .. } => reader.into_inner(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.into_inner().into_inner(),
_ => unreachable!()
}
}
}

impl<R: BufRead> Read for GzDecoder<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
match self.header {
None => return Ok(0), // error already returned,
Some(Ok(_)) => {}
Some(Err(_)) => match self.header.take().unwrap() {
Ok(_) => panic!(),
Err(e) => return Err(e),
},
}
if into.is_empty() {
return Ok(0);
let GzDecoder { inner, header } = self;

enum Next {
quininer marked this conversation as resolved.
Show resolved Hide resolved
None,
Body,
Finished
}
match self.inner.read(into)? {
0 => {
self.finish()?;
Ok(0)

let mut next = Next::None;

loop {
match inner {
Inner::Header { reader, state, flag } => {
read_gz_header2(reader, state, header, flag)?;
next = Next::Body;
},
Inner::Body(reader) => {
if into.is_empty() {
return Ok(0);
}

match reader.read(into)? {
0 => next = Next::Finished,
n => return Ok(n),
}
},
Inner::Finished { reader, pos, buf } => match buf.len().cmp(pos) {
cmp::Ordering::Greater => {
let len = reader.get_mut().get_mut().read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
},
cmp::Ordering::Equal => {
*pos += 1;

let (crc, amt) = finish(buf);
if crc != reader.crc().sum() {
return Err(corrupt());
}
if amt != reader.crc().amount() {
return Err(corrupt());
}
return Ok(0);
},
cmp::Ordering::Less => return Ok(0)
},
Inner::None => () // unreachable
}
n => Ok(n),

match next {
Next::Body => if let Inner::Header { reader, .. } = mem::replace(inner, Inner::None) {
let reader = deflate::bufread::DeflateDecoder::new(reader.into_inner());
*inner = Inner::Body(CrcReader::new(reader));
},
Next::Finished => if let Inner::Body(reader) = mem::replace(inner, Inner::None) {
*inner = Inner::Finished { reader, pos: 0, buf: [0; 8] };
},
Next::None => ()
}

next = Next::None;
}
}
}
Expand Down
Loading