Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 4, 2022
1 parent 485ce56 commit 6c29f69
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 273 deletions.
142 changes: 42 additions & 100 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::{collections::VecDeque, sync::Arc};

use parquet2::page::BinaryPageDict;
use parquet2::page::{BinaryPageDict, DictPage};

use crate::{
array::{BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array},
array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array},
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::{ArrowError, Result},
error::Result,
io::parquet::read::utils::MaybeNext,
};

use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
Expand Down Expand Up @@ -52,6 +51,32 @@ where
}
}

fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Arc<dyn Array> {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

match data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Arc::new(Utf8Array::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
PhysicalType::Binary | PhysicalType::LargeBinary => Arc::new(BinaryArray::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
_ => unreachable!(),
}
}

impl<K, O, I> Iterator for DictIter<K, O, I>
where
I: DataPages,
Expand All @@ -61,101 +86,18 @@ where
type Item = Result<DictionaryArray<K>>;

fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
let keys = finish_key(values, validity);
let values = self.values.unwrap();
Ok(DictionaryArray::from_data(keys, values))
});
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
(None, Ok(None)) => None,
(state, Ok(Some(page))) => {
// consume the dictionary page
if let Some(dict) = page.dictionary_page() {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
self.values = match &mut self.values {
Dict::Empty => {
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

let array = match self.data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => {
Arc::new(Utf8Array::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
PhysicalType::Binary | PhysicalType::LargeBinary => {
Arc::new(BinaryArray::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
_ => unreachable!(),
};

Dict::Complete(array)
}
_ => unreachable!(),
};
} else {
return Some(Err(ArrowError::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}

let maybe_array = {
// there is a new page => consume the page from the start
let maybe_page = PrimitiveDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return Some(Err(e)),
};

utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
self.chunk_size,
&mut self.items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
}
(Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);

let keys = finish_key(values, validity);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
let maybe_state = next_dict(
&mut self.iter,
&mut self.items,
&mut self.values,
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}
91 changes: 85 additions & 6 deletions src/io/parquet/read/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};

use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
page::DataPage,
page::{DataPage, DictPage},
schema::Repetition,
};

use super::utils;
use crate::{
array::{Array, DictionaryKey, PrimitiveArray},
array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray},
bitmap::MutableBitmap,
error::Result,
io::parquet::read::utils::{extend_from_decoder, OptionalPageValidity},
error::{ArrowError, Result},
};

use super::{
utils::{self, extend_from_decoder, Decoder, MaybeNext, OptionalPageValidity},
DataPages,
};

// The state of a `DataPage` of `Primitive` parquet primitive type
Expand Down Expand Up @@ -170,3 +173,79 @@ impl Dict {
pub fn finish_key<K: DictionaryKey>(values: Vec<K>, validity: MutableBitmap) -> PrimitiveArray<K> {
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into())
}

#[inline]
pub(super) fn next_dict<
'a,
K: DictionaryKey,
I: DataPages,
F: Fn(&dyn DictPage) -> Arc<dyn Array>,
>(
iter: &'a mut I,
items: &mut VecDeque<(Vec<K>, MutableBitmap)>,
dict: &mut Dict,
chunk_size: usize,
read_dict: F,
) -> MaybeNext<Result<DictionaryArray<K>>> {
if items.len() > 1 {
let (values, validity) = items.pop_back().unwrap();
let keys = finish_key(values, validity);
return MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap())));
}
match (items.pop_back(), iter.next()) {
(_, Err(e)) => MaybeNext::Some(Err(e.into())),
(None, Ok(None)) => MaybeNext::None,
(state, Ok(Some(page))) => {
// consume the dictionary page
if let Some(dict_page) = page.dictionary_page() {
*dict = match dict {
Dict::Empty => Dict::Complete(read_dict(dict_page.as_ref())),
_ => unreachable!(),
};
} else {
return MaybeNext::Some(Err(ArrowError::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}

let maybe_array = {
// there is a new page => consume the page from the start
let maybe_page = PrimitiveDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return MaybeNext::Some(Err(e)),
};

utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
chunk_size,
items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap())))
}
Ok(None) => MaybeNext::More,
Err(e) => MaybeNext::Some(Err(e)),
}
}
(Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= chunk_size);

let keys = finish_key(values, validity);

MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap())))
}
}
}
Loading

0 comments on commit 6c29f69

Please sign in to comment.