Skip to content

Commit

Permalink
Made decoding fallible (#178)
Browse files Browse the repository at this point in the history
Decoding currently panics in many cases. This is a continuation of #172 (cc @evanrichter).
Many decoders are now fallible, since invalid input could cause them to panic.
  • Loading branch information
jorgecarleitao authored Aug 15, 2022
1 parent 7a5fc27 commit 7caafa4
Show file tree
Hide file tree
Showing 21 changed files with 395 additions and 245 deletions.
2 changes: 1 addition & 1 deletion benches/decode_bitpacking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn add_benchmark(c: &mut Criterion) {
.collect::<Vec<_>>();

c.bench_function(&format!("bitpacking 2^{}", log2_size), |b| {
b.iter(|| Decoder::<u32>::new(&bytes, 1, size).count())
b.iter(|| Decoder::<u32>::try_new(&bytes, 1, size).unwrap().count())
});
})
}
Expand Down
72 changes: 49 additions & 23 deletions src/deserialize/hybrid_rle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::error::Error;

use crate::encoding::hybrid_rle::{self, BitmapIter};

/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1
Expand Down Expand Up @@ -26,21 +28,27 @@ impl<'a> HybridEncoded<'a> {
}
}

pub trait HybridRleRunsIterator<'a>: Iterator<Item = HybridEncoded<'a>> {
pub trait HybridRleRunsIterator<'a>: Iterator<Item = Result<HybridEncoded<'a>, Error>> {
/// Number of elements remaining. This may not be the items of the iterator - an item
/// of the iterator may contain more than one element.
fn number_of_elements(&self) -> usize;
}

/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`].
#[derive(Debug, Clone)]
pub struct HybridRleIter<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> {
pub struct HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
{
iter: I,
length: usize,
consumed: usize,
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridRleIter<'a, I> {
impl<'a, I> HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
{
/// Returns a new [`HybridRleIter`]
#[inline]
pub fn new(iter: I, length: usize) -> Self {
Expand All @@ -63,16 +71,20 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridRleIter<'a, I>
}
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridRleRunsIterator<'a>
for HybridRleIter<'a, I>
impl<'a, I> HybridRleRunsIterator<'a> for HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
{
fn number_of_elements(&self) -> usize {
self.len()
}
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> Iterator for HybridRleIter<'a, I> {
type Item = HybridEncoded<'a>;
impl<'a, I> Iterator for HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
{
type Item = Result<HybridEncoded<'a>, Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -81,7 +93,7 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> Iterator for HybridR
};
let run = self.iter.next()?;

Some(match run {
Some(run.map(|run| match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
// a pack has at most `pack.len() * 8` bits
let pack_size = pack.len() * 8;
Expand All @@ -99,7 +111,7 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> Iterator for HybridR
self.consumed += additional;
HybridEncoded::Repeated(is_set, additional)
}
})
}))
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand All @@ -122,12 +134,18 @@ enum HybridBooleanState<'a> {
/// An iterator adapter that maps an iterator of [`HybridEncoded`] into an iterator
/// over [`bool`].
#[derive(Debug)]
pub struct HybridRleBooleanIter<'a, I: Iterator<Item = HybridEncoded<'a>>> {
pub struct HybridRleBooleanIter<'a, I>
where
I: Iterator<Item = Result<HybridEncoded<'a>, Error>>,
{
iter: I,
current_run: Option<HybridBooleanState<'a>>,
}

impl<'a, I: Iterator<Item = HybridEncoded<'a>>> HybridRleBooleanIter<'a, I> {
impl<'a, I> HybridRleBooleanIter<'a, I>
where
I: Iterator<Item = Result<HybridEncoded<'a>, Error>>,
{
pub fn new(iter: I) -> Self {
Self {
iter,
Expand All @@ -136,33 +154,41 @@ impl<'a, I: Iterator<Item = HybridEncoded<'a>>> HybridRleBooleanIter<'a, I> {
}
}

impl<'a, I: HybridRleRunsIterator<'a>> Iterator for HybridRleBooleanIter<'a, I> {
type Item = bool;
impl<'a, I> Iterator for HybridRleBooleanIter<'a, I>
where
I: HybridRleRunsIterator<'a>,
{
type Item = Result<bool, Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(run) = &mut self.current_run {
match run {
HybridBooleanState::Bitmap(bitmap) => bitmap.next(),
HybridBooleanState::Repeated(value, remaining) => {
if *remaining == 0 {
None
} else {
*remaining -= 1;
Some(*value)
}
HybridBooleanState::Bitmap(bitmap) => bitmap.next().map(Ok),
HybridBooleanState::Repeated(value, remaining) => if *remaining == 0 {
None
} else {
*remaining -= 1;
Some(*value)
}
.map(Ok),
}
} else if let Some(run) = self.iter.next() {
self.current_run = Some(match run {
let run = run.map(|run| match run {
HybridEncoded::Bitmap(bitmap, length) => {
HybridBooleanState::Bitmap(BitmapIter::new(bitmap, 0, length))
}
HybridEncoded::Repeated(value, length) => {
HybridBooleanState::Repeated(value, length)
}
});
self.next()
match run {
Ok(run) => {
self.current_run = Some(run);
self.next()
}
Err(e) => Some(Err(e)),
}
} else {
None
}
Expand Down
31 changes: 16 additions & 15 deletions src/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::collections::VecDeque;

use crate::{
encoding::hybrid_rle::{self, HybridRleDecoder},
error::{Error, Result},
error::Error,
indexes::Interval,
page::{split_buffer, DataPage},
read::levels::get_bit_width,
};

use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter};

pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder> {
pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder, Error> {
let (_, _, indices_buffer) = split_buffer(page)?;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
Expand All @@ -23,11 +23,7 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::Hybrid
}
let indices_buffer = &indices_buffer[1..];

Ok(hybrid_rle::HybridRleDecoder::new(
indices_buffer,
bit_width as u32,
page.num_values(),
))
hybrid_rle::HybridRleDecoder::try_new(indices_buffer, bit_width as u32, page.num_values())
}

/// Decoder of definition levels.
Expand All @@ -42,7 +38,7 @@ pub enum DefLevelsDecoder<'a> {
}

impl<'a> DefLevelsDecoder<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
pub fn try_new(page: &'a DataPage) -> Result<Self, Error> {
let (_, def_levels, _) = split_buffer(page)?;

let max_def_level = page.descriptor.max_def_level;
Expand All @@ -51,8 +47,11 @@ impl<'a> DefLevelsDecoder<'a> {
let iter = HybridRleIter::new(iter, page.num_values());
Self::Bitmap(iter)
} else {
let iter =
HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values());
let iter = HybridRleDecoder::try_new(
def_levels,
get_bit_width(max_def_level),
page.num_values(),
)?;
Self::Levels(iter, max_def_level as u32)
})
}
Expand All @@ -61,25 +60,27 @@ impl<'a> DefLevelsDecoder<'a> {
/// Iterator adapter to convert an iterator of non-null values and an iterator over validity
/// into an iterator of optional values.
#[derive(Debug, Clone)]
pub struct OptionalValues<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> {
pub struct OptionalValues<T, V: Iterator<Item = Result<bool, Error>>, I: Iterator<Item = T>> {
validity: V,
values: I,
}

impl<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> OptionalValues<T, V, I> {
impl<T, V: Iterator<Item = Result<bool, Error>>, I: Iterator<Item = T>> OptionalValues<T, V, I> {
pub fn new(validity: V, values: I) -> Self {
Self { validity, values }
}
}

impl<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> Iterator for OptionalValues<T, V, I> {
type Item = Option<T>;
impl<T, V: Iterator<Item = Result<bool, Error>>, I: Iterator<Item = T>> Iterator
for OptionalValues<T, V, I>
{
type Item = Result<Option<T>, Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.validity
.next()
.map(|x| if x { self.values.next() } else { None })
.map(|x| x.map(|x| if x { self.values.next() } else { None }))
}

#[inline]
Expand Down
59 changes: 49 additions & 10 deletions src/encoding/bitpacked/decode.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::error::Error;

use super::{Packed, Unpackable, Unpacked};

/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.
Expand All @@ -7,7 +9,7 @@ use super::{Packed, Unpackable, Unpacked};
pub struct Decoder<'a, T: Unpackable> {
packed: std::slice::Chunks<'a, u8>,
num_bits: usize,
remaining: usize,
remaining: usize, // in number of items
current_pack_index: usize, // invariant: < T::PACK_LENGTH
unpacked: T::Unpacked, // has the current unpacked values.
}
Expand All @@ -25,9 +27,20 @@ fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::

impl<'a, T: Unpackable> Decoder<'a, T> {
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
pub fn new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Self {
pub fn try_new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Result<Self, Error> {
let block_size = std::mem::size_of::<T>() * num_bits;

if num_bits == 0 {
return Err(Error::oos("Bitpacking requires num_bits > 0"));
}

if packed.len() * 8 < length * num_bits {
return Err(Error::oos(format!(
"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",
length * num_bits / 8
)));
}

let mut packed = packed.chunks(block_size);
let mut unpacked = T::Unpacked::zero();
if let Some(chunk) = packed.next() {
Expand All @@ -36,13 +49,13 @@ impl<'a, T: Unpackable> Decoder<'a, T> {
length = 0
};

Self {
Ok(Self {
remaining: length,
packed,
num_bits,
unpacked,
current_pack_index: 0,
}
})
}
}

Expand Down Expand Up @@ -93,15 +106,19 @@ mod tests {
// encoded: 0b10001000u8, 0b11000110, 0b11111010
let data = vec![0b10001000u8, 0b11000110, 0b11111010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}

#[test]
fn decode_large() {
let (num_bits, expected, data) = case1();

let decoded = Decoder::<u32>::new(&data, num_bits, expected.len()).collect::<Vec<_>>();
let decoded = Decoder::<u32>::try_new(&data, num_bits, expected.len())
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

Expand All @@ -111,7 +128,9 @@ mod tests {
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

Expand All @@ -121,7 +140,9 @@ mod tests {
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u64>::new(&data, num_bits, length).collect::<Vec<_>>();
let decoded = Decoder::<u64>::try_new(&data, num_bits, length)
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

Expand All @@ -143,7 +164,9 @@ mod tests {
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

Expand All @@ -167,7 +190,23 @@ mod tests {
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn test_errors() {
// zero length
assert!(Decoder::<u64>::try_new(&[], 1, 0).is_ok());
// no bytes
assert!(Decoder::<u64>::try_new(&[], 1, 1).is_err());
// too few bytes
assert!(Decoder::<u64>::try_new(&[1], 1, 8).is_ok());
assert!(Decoder::<u64>::try_new(&[1, 1], 2, 8).is_ok());
assert!(Decoder::<u64>::try_new(&[1], 1, 9).is_err());
// zero num_bits
assert!(Decoder::<u64>::try_new(&[1], 0, 1).is_err());
}
}
Loading

0 comments on commit 7caafa4

Please sign in to comment.