Skip to content

Commit

Permalink
impl async gzheader parse
Browse files Browse the repository at this point in the history
  • Loading branch information
quininer committed Feb 14, 2019
1 parent d06d479 commit 0981506
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 70 deletions.
331 changes: 268 additions & 63 deletions src/gz/bufread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,154 @@ pub(crate) fn read_gz_header<R: Read>(r: &mut R) -> io::Result<GzHeader> {
})
}

#[derive(Debug)]
pub(crate) enum State {
Header(usize, [u8; 10]), // pos, buf
ExtraLen(usize, [u8; 2]), // pos, buf
Extra(usize), // pos
FileName,
Comment,
Crc(u16, usize, [u8; 2]) // crc, pos, buf
}

pub(crate) fn read_gz_header2<R: Read>(r: &mut CrcReader<R>, state: &mut State, header: &mut GzHeader, flag: &mut u8) -> io::Result<()> {
enum Next {
None,
ExtraLen,
Extra,
FileName,
Comment,
Crc
}

let mut next = Next::None;

loop {
match state {
State::Header(pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let id1 = buf[0];
let id2 = buf[1];
if id1 != 0x1f || id2 != 0x8b {
return Err(bad_header());
}
let cm = buf[2];
if cm != 8 {
return Err(bad_header());
}

let flg = buf[3];
let mtime = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
let _xfl = buf[8];
let os = buf[9];

header.operating_system = os;
header.mtime = mtime;
*flag = flg;

next = Next::ExtraLen;
},
State::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName,
State::ExtraLen(pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let xlen = (buf[0] as u16) | ((buf[1] as u16) << 8);
header.extra = Some(vec![0; xlen as usize]);
if xlen != 0 {
next = Next::Extra;
} else {
next = Next::FileName;
}
},
State::Extra(pos) => if let Some(extra) = &mut header.extra {
if *pos < extra.len() {
let len = r.read(&mut extra[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
next = Next::FileName;
}
} else {
next = Next::FileName; // unreachable
},
State::FileName if *flag & FNAME == 0 => next = Next::Comment,
State::FileName => {
let filename = header.filename.get_or_insert_with(Vec::new);
// wow this is slow
for byte in r.by_ref().bytes() {
let byte = byte?;
if byte == 0 {
break;
}
filename.push(byte);
}
next = Next::Comment;
},
State::Comment if *flag & FCOMMENT == 0 => next = Next::Crc,
State::Comment => {
let comment = header.comment.get_or_insert_with(Vec::new);
// wow this is slow
for byte in r.by_ref().bytes() {
let byte = byte?;
if byte == 0 {
break;
}
comment.push(byte);
}
next = Next::Crc
},
State::Crc(..) if *flag & FHCRC == 0 => return Ok(()),
State::Crc(calced_crc, pos, buf) => if *pos < buf.len() {
let len = r.read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
} else {
let stored_crc = (buf[0] as u16) | ((buf[1] as u16) << 8);
if *calced_crc != stored_crc {
return Err(corrupt());
} else {
return Ok(())
}
}
};

match next {
Next::ExtraLen => *state = State::ExtraLen(0, [0; 2]),
Next::Extra => *state = State::Extra(0),
Next::FileName => *state = State::FileName,
Next::Comment => *state = State::Comment,
Next::Crc => *state = State::Crc(r.crc().sum() as u16, 0, [0; 2]),
Next::None => ()
}

next = Next::None;
}
}

/// A gzip streaming encoder
///
/// This structure exposes a [`BufRead`] interface that will read uncompressed data
Expand Down Expand Up @@ -211,6 +359,19 @@ impl<R> GzEncoder<R> {
}
}

#[inline]
fn finish(buf: &[u8; 8]) -> (u32, u32) {
let crc = ((buf[0] as u32) << 0)
| ((buf[1] as u32) << 8)
| ((buf[2] as u32) << 16)
| ((buf[3] as u32) << 24);
let amt = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
(crc, amt)
}

impl<R: BufRead> Read for GzEncoder<R> {
fn read(&mut self, mut into: &mut [u8]) -> io::Result<usize> {
let mut amt = 0;
Expand Down Expand Up @@ -280,104 +441,148 @@ impl<R: BufRead + Write> Write for GzEncoder<R> {
/// ```
#[derive(Debug)]
pub struct GzDecoder<R> {
inner: CrcReader<deflate::bufread::DeflateDecoder<R>>,
header: Option<io::Result<GzHeader>>,
finished: bool,
inner: Inner<R>,
header: GzHeader
}

#[derive(Debug)]
enum Inner<R> {
Header {
reader: CrcReader<R>,
state: State,
flag: u8
},
Body(CrcReader<deflate::bufread::DeflateDecoder<R>>),
Finished {
reader: CrcReader<deflate::bufread::DeflateDecoder<R>>,
pos: usize,
buf: [u8; 8]
},
None
}

impl<R: BufRead> GzDecoder<R> {
/// Creates a new decoder from the given reader, immediately parsing the
/// gzip header.
pub fn new(mut r: R) -> GzDecoder<R> {
let header = read_gz_header(&mut r);

let flate = deflate::bufread::DeflateDecoder::new(r);
pub fn new(r: R) -> GzDecoder<R> {
GzDecoder {
inner: CrcReader::new(flate),
header: Some(header),
finished: false,
}
}

fn finish(&mut self) -> io::Result<()> {
if self.finished {
return Ok(());
}
let ref mut buf = [0u8; 8];
{
let mut len = 0;

while len < buf.len() {
match self.inner.get_mut().get_mut().read(&mut buf[len..])? {
0 => return Err(corrupt()),
n => len += n,
}
}
}

let crc = ((buf[0] as u32) << 0)
| ((buf[1] as u32) << 8)
| ((buf[2] as u32) << 16)
| ((buf[3] as u32) << 24);
let amt = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
if crc != self.inner.crc().sum() {
return Err(corrupt());
}
if amt != self.inner.crc().amount() {
return Err(corrupt());
inner: Inner::Header {
reader: CrcReader::new(r),
state: State::Header(0, [0; 10]),
flag: 0
},
header: GzHeader::default(),
}
self.finished = true;
Ok(())
}
}

impl<R> GzDecoder<R> {
/// Returns the header associated with this stream, if it was valid
pub fn header(&self) -> Option<&GzHeader> {
self.header.as_ref().and_then(|h| h.as_ref().ok())
match self.inner {
Inner::Body(_) | Inner::Finished { .. } => Some(&self.header),
_ => None
}
}

/// Acquires a reference to the underlying reader.
pub fn get_ref(&self) -> &R {
self.inner.get_ref().get_ref()
match &self.inner {
Inner::Header { reader, .. } => reader.get_ref(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_ref().get_ref(),
_ => unreachable!()
}
}

/// Acquires a mutable reference to the underlying stream.
///
/// Note that mutation of the stream may result in surprising results if
/// this encoder is continued to be used.
pub fn get_mut(&mut self) -> &mut R {
self.inner.get_mut().get_mut()
match &mut self.inner {
Inner::Header { reader, .. } => reader.get_mut(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_mut().get_mut(),
_ => unreachable!()
}
}

/// Consumes this decoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.inner.into_inner().into_inner()
match self.inner {
Inner::Header { reader, .. } => reader.into_inner(),
Inner::Body(reader) | Inner::Finished { reader, .. } => reader.into_inner().into_inner(),
_ => unreachable!()
}
}
}

impl<R: BufRead> Read for GzDecoder<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
match self.header {
None => return Ok(0), // error already returned,
Some(Ok(_)) => {}
Some(Err(_)) => match self.header.take().unwrap() {
Ok(_) => panic!(),
Err(e) => return Err(e),
},
}
if into.is_empty() {
return Ok(0);
let GzDecoder { inner, header } = self;

enum Next {
None,
Body,
Finished
}
match self.inner.read(into)? {
0 => {
self.finish()?;
Ok(0)

let mut next = Next::None;

loop {
match inner {
Inner::Header { reader, state, flag } => {
read_gz_header2(reader, state, header, flag)?;
next = Next::Body;
},
Inner::Body(reader) => {
if into.is_empty() {
return Ok(0);
}

match reader.read(into)? {
0 => next = Next::Finished,
n => return Ok(n),
}
},
Inner::Finished { reader, pos, buf } => match buf.len().cmp(pos) {
cmp::Ordering::Greater => {
let len = reader.get_mut().get_mut().read(&mut buf[*pos..])
.and_then(|len| if len != 0 {
Ok(len)
} else {
Err(io::ErrorKind::UnexpectedEof.into())
})?;
*pos += len;
},
cmp::Ordering::Equal => {
*pos += 1;

let (crc, amt) = finish(buf);
if crc != reader.crc().sum() {
return Err(corrupt());
}
if amt != reader.crc().amount() {
return Err(corrupt());
}
return Ok(0);
},
cmp::Ordering::Less => return Ok(0)
},
Inner::None => () // unreachable
}
n => Ok(n),

match next {
Next::Body => if let Inner::Header { reader, .. } = mem::replace(inner, Inner::None) {
let reader = deflate::bufread::DeflateDecoder::new(reader.into_inner());
*inner = Inner::Body(CrcReader::new(reader));
},
Next::Finished => if let Inner::Body(reader) = mem::replace(inner, Inner::None) {
*inner = Inner::Finished { reader, pos: 0, buf: [0; 8] };
},
Next::None => ()
}

next = Next::None;
}
}
}
Expand Down
Loading

0 comments on commit 0981506

Please sign in to comment.