diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 105aebb04f1..1c0c93b783c 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -42,20 +42,27 @@ fn create_list( ) -> Result> { Ok(match data_type { DataType::List(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); + let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); + offsets.push(values.len() as i64); let offsets = offsets.iter().map(|x| *x as i32).collect::>(); Arc::new(ListArray::::new( data_type, offsets.into(), values, - validity, + validity.and_then(|x| x.into()), )) } DataType::LargeList(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); + let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); + offsets.push(values.len() as i64); - Arc::new(ListArray::::new(data_type, offsets, values, validity)) + Arc::new(ListArray::::new( + data_type, + offsets.into(), + values, + validity.and_then(|x| x.into()), + )) } _ => { return Err(ArrowError::NotYetImplemented(format!( diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 97b6b3fefd6..074d0f2a14e 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -4,24 +4,17 @@ use parquet2::{ encoding::hybrid_rle::HybridRleDecoder, page::DataPage, read::levels::get_bit_width, }; -use crate::{ - array::Array, - bitmap::{Bitmap, MutableBitmap}, - buffer::Buffer, - error::Result, -}; +use crate::{array::Array, bitmap::MutableBitmap, error::Result}; use super::super::DataPages; use super::utils::{split_buffer, Decoder, MaybeNext, Pushable}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug + Send + Sync { - fn inner(&mut self) -> (Buffer, Option); + fn inner(&mut self) -> (Vec, Option); fn push(&mut self, length: i64, is_valid: bool); - fn close(&mut self, length: i64); - fn is_nullable(&self) -> bool; /// number of rows @@ -47,7 +40,7 @@ impl NestedPrimitive { } impl Nested for NestedPrimitive { - fn inner(&mut self) -> (Buffer, Option) { + fn inner(&mut self) -> (Vec, Option) { (Default::default(), Default::default()) } @@ -59,8 +52,6 @@ impl Nested for NestedPrimitive { self.length += 1 } - fn close(&mut self, _length: i64) {} - fn len(&self) -> usize { self.length } @@ -77,10 +68,10 @@ pub struct NestedOptional { } impl Nested for NestedOptional { - fn inner(&mut self) -> (Buffer, Option) { + fn inner(&mut self) -> (Vec, Option) { let offsets = std::mem::take(&mut self.offsets); let validity = std::mem::take(&mut self.validity); - (offsets.into(), validity.into()) + (offsets, Some(validity)) } fn is_nullable(&self) -> bool { @@ -92,10 +83,6 @@ impl Nested for NestedOptional { self.validity.push(is_valid); } - fn close(&mut self, length: i64) { - self.offsets.push(length) - } - fn len(&self) -> usize { self.offsets.len() } @@ -119,9 +106,9 @@ pub struct NestedValid { } impl Nested for NestedValid { - fn inner(&mut self) -> (Buffer, Option) { + fn inner(&mut self) -> (Vec, Option) { let offsets = std::mem::take(&mut self.offsets); - (offsets.into(), None) + (offsets, None) } fn is_nullable(&self) -> bool { @@ -132,12 +119,8 @@ impl Nested for NestedValid { self.offsets.push(value); } - fn close(&mut self, length: i64) { - self.offsets.push(length) - } - fn len(&self) -> usize { - self.offsets.len().saturating_sub(1) + self.offsets.len() } fn num_values(&self) -> usize { @@ -164,7 +147,7 @@ impl NestedStructValid { } impl Nested for NestedStructValid { - fn inner(&mut self) -> (Buffer, Option) { + fn inner(&mut self) -> (Vec, Option) { (Default::default(), None) } @@ -176,8 +159,6 @@ impl Nested for NestedStructValid { self.length += 1; } - fn close(&mut self, _length: i64) {} - fn len(&self) -> usize { self.length } @@ -201,7 +182,7 @@ impl NestedStruct { } impl Nested for NestedStruct { - fn inner(&mut self) -> (Buffer, Option) { + fn inner(&mut self) -> (Vec, Option) { (Default::default(), None) } @@ -213,8 +194,6 @@ impl Nested for NestedStruct { self.validity.push(is_valid) } - fn close(&mut self, _length: i64) {} - fn len(&self) -> usize { self.validity.len() } @@ -297,9 +276,7 @@ fn init_nested(init: &InitNested, capacity: usize) -> NestedState { } pub struct NestedPage<'a> { - repetitions: HybridRleDecoder<'a>, - _max_rep_level: u32, - definitions: HybridRleDecoder<'a>, + iter: std::iter::Peekable, HybridRleDecoder<'a>>>, } impl<'a> NestedPage<'a> { @@ -309,24 +286,19 @@ impl<'a> NestedPage<'a> { let max_rep_level = page.descriptor().max_rep_level(); let max_def_level = page.descriptor().max_def_level(); - Self { - repetitions: HybridRleDecoder::new( - rep_levels, - get_bit_width(max_rep_level), - page.num_values(), - ), - _max_rep_level: max_rep_level as u32, - definitions: HybridRleDecoder::new( - def_levels, - get_bit_width(max_def_level), - page.num_values(), - ), - } + let reps = + HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values()); + let defs = + HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); + + let iter = reps.zip(defs).peekable(); + + Self { iter } } // number of values (!= number of rows) pub fn len(&self) -> usize { - self.repetitions.size_hint().0 + self.iter.size_hint().0 } } @@ -354,23 +326,19 @@ impl NestedState { pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Pushable>( mut page: T::State, - state: Option<(P, MutableBitmap)>, items: &mut VecDeque<(P, MutableBitmap)>, - nested_state: &Option, nested: &VecDeque, decoder: &T, -) -> Result<(P, MutableBitmap)> { - let needed = nested_state - .as_ref() - .map(|x| x.num_values()) - // unwrap is fine because either there is a state or the state is in nested - .unwrap_or_else(|| nested.back().unwrap().num_values()); - - let (mut values, mut validity) = if let Some((values, validity)) = state { +) { + let needed = nested.back().unwrap().num_values(); + + let (mut values, mut validity) = if let Some((values, validity)) = items.pop_back() { // there is a already a state => it must be incomplete... debug_assert!( values.len() < needed, - "the temp array is expected to be incomplete" + "the temp array is expected to be incomplete ({} < {})", + values.len(), + needed ); (values, validity) } else { @@ -389,32 +357,28 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push // the number of values required is always fulfilled because // dremel assigns one (rep, def) to each value and we request // items that complete a row - assert_eq!(values.len(), remaining); + assert_eq!(values.len(), needed); - for nest in nested { + items.push_back((values, validity)); + + for nest in nested.iter().skip(1) { let num_values = nest.num_values(); let mut values = decoder.with_capacity(num_values); let mut validity = MutableBitmap::with_capacity(num_values); decoder.extend_from_state(&mut page, &mut values, &mut validity, num_values); items.push_back((values, validity)); } - - assert_eq!(items.len(), nested.len()); - - // and return this item - Ok((values, validity)) } /// Extends `state` by consuming `page`, optionally extending `items` if `page` /// has less items than `chunk_size` pub fn extend_offsets1<'a>( page: &mut NestedPage<'a>, - state: Option, init: &InitNested, items: &mut VecDeque, chunk_size: usize, -) -> Result> { - let mut nested = if let Some(nested) = state { +) { + let mut nested = if let Some(nested) = items.pop_back() { // there is a already a state => it must be incomplete... debug_assert!( nested.len() < chunk_size, @@ -430,46 +394,44 @@ pub fn extend_offsets1<'a>( // extend the current state extend_offsets2(page, &mut nested, remaining); - - if nested.len() < chunk_size { - // the whole page was consumed and we still do not have enough items - // => push the values to `items` so that it can be continued later + // remaining > 0 => the page was consumed and we still do not have enough items to complete the chunk + // => push the values to `items` so that we empty the page + let mut remaining = chunk_size - nested.len(); + items.push_back(nested); + + while remaining > 0 && page.len() > 0 { + let additional = chunk_size.min(remaining); + let mut nested = init_nested(init, additional); + extend_offsets2(page, &mut nested, additional); + remaining -= additional; items.push_back(nested); - // and indicate that there is no item available - return Ok(None); - } - - while page.len() > 0 { - let mut nested = init_nested(init, chunk_size); - extend_offsets2(page, &mut nested, chunk_size); - items.push_back(nested) } - - // and return - Ok(Some(nested)) } fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { let nested = &mut nested.nested; let mut values_count = vec![0; nested.len()]; + for (depth, nest) in nested.iter().enumerate().skip(1) { + values_count[depth - 1] = nest.len() as i64 + } + values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64; + let mut cum_sum = vec![0u32; nested.len() + 1]; for (i, nest) in nested.iter().enumerate() { let delta = if nest.is_nullable() { 2 } else { 1 }; cum_sum[i + 1] = cum_sum[i] + delta; } - let iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); - let mut rows = 0; - for (rep, def) in iter { + while let Some((rep, def)) = page.iter.next() { if rep == 0 { rows += 1; } for (depth, (nest, length)) in nested.iter_mut().zip(values_count.iter()).enumerate() { if depth as u32 >= rep && def >= cum_sum[depth] { - let is_valid = nest.is_nullable() && def as u32 != cum_sum[depth]; + let is_valid = nest.is_nullable() && def != cum_sum[depth]; nest.push(*length, is_valid) } } @@ -478,18 +440,13 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi values_count[depth - 1] = nest.len() as i64 } values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64; - if rows == additional + 1 { + + let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0); + + if next_rep == 0 && rows == additional + 1 { break; } } - - // close validities - nested - .iter_mut() - .zip(values_count.iter()) - .for_each(|(nested, length)| { - nested.close(*length); - }); } // The state of an optional DataPage with a boolean physical type @@ -547,26 +504,23 @@ where let (values, validity) = items.pop_back().unwrap(); return MaybeNext::Some(Ok((nested, values, validity))); } - match (nested_items.pop_back(), items.pop_back(), iter.next()) { - (_, _, Err(e)) => MaybeNext::Some(Err(e.into())), - (None, None, Ok(None)) => MaybeNext::None, - (state, p_state, Ok(Some(page))) => { - // the invariant - assert_eq!(state.is_some(), p_state.is_some()); - + match iter.next() { + Err(e) => MaybeNext::Some(Err(e.into())), + Ok(None) => { + if let Some(nested) = nested_items.pop_back() { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + let (values, validity) = items.pop_back().unwrap(); + MaybeNext::Some(Ok((nested, values, validity))) + } else { + MaybeNext::None + } + } + Ok(Some(page)) => { // there is a new page => consume the page from the start let mut nested_page = NestedPage::new(page); - // read next chunk from `nested_page` and get number of values to read - let maybe_nested = - extend_offsets1(&mut nested_page, state, init, nested_items, chunk_size); - let nested = match maybe_nested { - Ok(nested) => nested, - Err(e) => return MaybeNext::Some(Err(e)), - }; - // at this point we know whether there were enough rows in `page` - // to fill chunk_size or not (`nested.is_some()`) - // irrespectively, we need to consume the values from the page + extend_offsets1(&mut nested_page, init, nested_items, chunk_size); let maybe_page = decoder.build_state(page); let page = match maybe_page { @@ -574,30 +528,16 @@ where Err(e) => return MaybeNext::Some(Err(e)), }; - let maybe_array = extend_from_new_page::( - page, - p_state, - items, - &nested, - nested_items, - decoder, - ); - let state = match maybe_array { - Ok(s) => s, - Err(e) => return MaybeNext::Some(Err(e)), - }; - match nested { - Some(p_state) => MaybeNext::Some(Ok((p_state, state.0, state.1))), - None => MaybeNext::More, + extend_from_new_page::(page, items, nested_items, decoder); + + if nested_items.back().unwrap().len() < chunk_size { + MaybeNext::More + } else { + let nested = nested_items.pop_back().unwrap(); + let (values, validity) = items.pop_back().unwrap(); + MaybeNext::Some(Ok((nested, values, validity))) } } - (Some(nested), Some((values, validity)), Ok(None)) => { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - MaybeNext::Some(Ok((nested, values, validity))) - } - (Some(_), None, _) => unreachable!(), - (None, Some(_), _) => unreachable!(), } }