Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reduced code duplication #805

Merged
merged 2 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
)
}

pub struct BinaryArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> BinaryArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
Self {
iter,
Expand All @@ -297,7 +297,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> BinaryArrayIterator<O, A,
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for BinaryArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I> {
type Item = Result<A>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
166 changes: 46 additions & 120 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use std::{collections::VecDeque, sync::Arc};

use parquet2::page::BinaryPageDict;
use parquet2::page::{BinaryPageDict, DictPage};

use crate::{
array::{
Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array,
},
array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array},
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::{ArrowError, Result},
error::Result,
io::parquet::read::utils::MaybeNext,
};

use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::ArrayIter;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
#[derive(Debug)]
pub struct ArrayIterator<K, O, I>
pub struct DictIter<K, O, I>
where
I: DataPages,
O: Offset,
Expand All @@ -33,13 +29,13 @@ where
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> ArrayIterator<K, O, I>
impl<K, O, I> DictIter<K, O, I>
where
K: DictionaryKey,
O: Offset,
I: DataPages,
{
fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand All @@ -55,7 +51,33 @@ where
}
}

impl<K, O, I> Iterator for ArrayIterator<K, O, I>
fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Arc<dyn Array> {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

match data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Arc::new(Utf8Array::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
PhysicalType::Binary | PhysicalType::LargeBinary => Arc::new(BinaryArray::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
_ => unreachable!(),
}
}

impl<K, O, I> Iterator for DictIter<K, O, I>
where
I: DataPages,
O: Offset,
Expand All @@ -64,114 +86,18 @@ where
type Item = Result<DictionaryArray<K>>;

fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
let keys = finish_key(values, validity);
let values = self.values.unwrap();
Ok(DictionaryArray::from_data(keys, values))
});
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
(None, Ok(None)) => None,
(state, Ok(Some(page))) => {
// consume the dictionary page
if let Some(dict) = page.dictionary_page() {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
self.values = match &mut self.values {
Dict::Empty => {
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

let array = match self.data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => {
Arc::new(Utf8Array::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
PhysicalType::Binary | PhysicalType::LargeBinary => {
Arc::new(BinaryArray::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
_ => unreachable!(),
};

Dict::Complete(array)
}
_ => unreachable!(),
};
} else {
return Some(Err(ArrowError::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}

let maybe_array = {
// there is a new page => consume the page from the start
let maybe_page = PrimitiveDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return Some(Err(e)),
};

utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
self.chunk_size,
&mut self.items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
}
(Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);

let keys = finish_key(values, validity);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
let maybe_state = next_dict(
&mut self.iter,
&mut self.items,
&mut self.values,
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, K, O, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
ArrayIterator::<K, O, I>::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
18 changes: 2 additions & 16 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ mod dictionary;
mod nested;
mod utils;

pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use std::sync::Arc;

use crate::{
Expand All @@ -14,25 +12,13 @@ use crate::{

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::ArrayIter;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};
use basic::BinaryArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, O, A, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
BinaryArrayIterator::<O, A, I>::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
pub use basic::Iter;
pub use dictionary::DictIter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap)

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct BooleanArrayIterator<I: DataPages> {
pub struct Iter<I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: usize,
}

impl<I: DataPages> BooleanArrayIterator<I> {
impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
Self {
iter,
Expand All @@ -158,7 +158,7 @@ impl<I: DataPages> BooleanArrayIterator<I> {
}
}

impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
impl<I: DataPages> Iterator for Iter<I> {
type Item = Result<BooleanArray>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
15 changes: 2 additions & 13 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,15 @@ mod nested;

use std::sync::Arc;

use crate::{array::Array, datatypes::DataType};
use crate::array::Array;

use self::basic::BooleanArrayIterator;
use self::nested::ArrayIterator;
use super::ArrayIter;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: DataPages,
{
Box::new(
BooleanArrayIterator::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
pub use self::basic::Iter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
Expand Down
Loading