Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added read of 2-level nested lists from parquet (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 24, 2021
1 parent 788f382 commit 4491954
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 151 deletions.
26 changes: 26 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ def case_nested(size):
[[4, 5], [6]],
[],
[[7], None, [9]],
[[], [None], None],
[[10]],
]
items_required_nested = [
[[0, 1]],
None,
[[2, 3], [3]],
[[4, 5], [6]],
[],
[[7], None, [9]],
None,
[[10]],
]
items_required_nested_2 = [
[[0, 1]],
None,
[[2, 3], [3]],
[[4, 5], [6]],
[],
[[7], [8], [9]],
None,
[[10]],
]
Expand All @@ -140,6 +160,10 @@ def case_nested(size):
pa.field("list_utf8", pa.list_(pa.utf8())),
pa.field("list_large_binary", pa.list_(pa.large_binary())),
pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))),
pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))),
pa.field(
"list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64()))
),
]
schema = pa.schema(fields)
return (
Expand All @@ -152,6 +176,8 @@ def case_nested(size):
"list_utf8": string * size,
"list_large_binary": string * size,
"list_nested_i64": items_nested * size,
"list_nested_inner_required_i64": items_required_nested * size,
"list_nested_inner_required_required_i64": items_required_nested_2 * size,
},
schema,
f"nested_nullable_{size*10}.parquet",
Expand Down
35 changes: 4 additions & 31 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use parquet2::{
};

use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
array::{Array, Offset},
bitmap::{utils::BitmapIter, MutableBitmap},
buffer::MutableBuffer,
datatypes::DataType,
error::{ArrowError, Result},
};

use super::super::utils;
use super::utils::finish_array;

/// Assumptions: No rep levels
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -325,21 +326,7 @@ where
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
Ok(finish_array(data_type.clone(), offsets, values, validity))
}

pub async fn stream_to_array<O, I, E>(
Expand Down Expand Up @@ -371,19 +358,5 @@ where
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
Ok(finish_array(data_type.clone(), offsets, values, validity))
}
1 change: 1 addition & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod basic;
mod dictionary;
mod nested;
mod utils;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
Expand Down
34 changes: 5 additions & 29 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use parquet2::{
use super::super::nested_utils::*;
use super::super::utils;
use super::basic::read_plain_required;
use super::utils::finish_array;
use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
array::{Array, Offset},
bitmap::MutableBitmap,
buffer::MutableBuffer,
datatypes::DataType,
Expand Down Expand Up @@ -150,7 +151,7 @@ pub fn iter_to_array<O, I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>>
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)>
where
O: Offset,
ArrowError: From<E>,
Expand All @@ -176,32 +177,7 @@ where
)?
}

let inner_data_type = match data_type {
DataType::List(ref inner) => inner.data_type(),
DataType::LargeList(ref inner) => inner.data_type(),
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Read nested datatype {:?}",
data_type
)))
}
};

let values = match inner_data_type {
DataType::LargeBinary | DataType::Binary => Arc::new(BinaryArray::from_data(
inner_data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)) as Arc<dyn Array>,
DataType::LargeUtf8 | DataType::Utf8 => Arc::new(Utf8Array::from_data(
inner_data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)) as Arc<dyn Array>,
_ => unreachable!(),
};
let values = finish_array(data_type, offsets, values, validity).into();

create_list(data_type, &mut nested, values)
Ok((values, nested))
}
29 changes: 29 additions & 0 deletions src/io/parquet/read/binary/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
bitmap::MutableBitmap,
buffer::MutableBuffer,
datatypes::DataType,
};

pub(super) fn finish_array<O: Offset>(
data_type: DataType,
offsets: MutableBuffer<O>,
values: MutableBuffer<u8>,
validity: MutableBitmap,
) -> Box<dyn Array> {
match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type,
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type,
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
}
}
17 changes: 3 additions & 14 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn iter_to_array<I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>>
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)>
where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
Expand All @@ -157,22 +157,11 @@ where
)?
}

let inner_data_type = match data_type {
DataType::List(ref inner) => inner.data_type(),
DataType::LargeList(ref inner) => inner.data_type(),
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Read nested datatype {:?}",
data_type
)))
}
};

let values = Arc::new(BooleanArray::from_data(
inner_data_type.clone(),
data_type,
values.into(),
validity.into(),
));

create_list(data_type, &mut nested, values)
Ok((values, nested))
}
112 changes: 71 additions & 41 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! APIs to read from Parquet format.
#![allow(clippy::type_complexity)]

use std::{
convert::TryInto,
io::{Read, Seek},
Expand Down Expand Up @@ -28,6 +30,7 @@ use crate::{
array::{Array, DictionaryKey, PrimitiveArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::nested_utils::create_list,
};

mod binary;
Expand All @@ -44,6 +47,8 @@ pub use record_batch::RecordReader;
pub(crate) use schema::is_type_nullable;
pub use schema::{get_schema, FileMetaData};

use self::nested_utils::Nested;

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<'b, RR: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -165,6 +170,62 @@ fn dict_read<
}
}

fn page_iter_to_array_nested<
I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)> {
use DataType::*;
match data_type {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32),
Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8),
Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16),
Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32),

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_array_nested(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x),
},
_ => unreachable!(),
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x)
}
UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64),

Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x),
Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x),

Boolean => boolean::iter_to_array_nested(iter, metadata, data_type),

Binary | Utf8 => binary::iter_to_array_nested::<i32, _, _>(iter, metadata, data_type),
LargeBinary | LargeUtf8 => {
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
List(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
Ok((create_list(data_type, &mut nested, values)?.into(), nested))
}
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
))),
}
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
Expand Down Expand Up @@ -255,47 +316,16 @@ pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error =
},
_ => unreachable!(),
},
List(ref inner) => match inner.data_type() {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32),
Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8),
Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16),
Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32),

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_array_nested(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_array(iter, metadata, data_type, |x: i64| x),
},
_ => unreachable!(),
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x)
}
UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64),

Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x),
Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x),

Boolean => boolean::iter_to_array_nested(iter, metadata, data_type),

Binary | Utf8 => binary::iter_to_array_nested::<i32, _, _>(iter, metadata, data_type),
LargeBinary | LargeUtf8 => {
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
))),
},
List(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
create_list(data_type, &mut nested, values)
}
LargeList(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
create_list(data_type, &mut nested, values)
}

Dictionary(ref key, _) => match key.as_ref() {
Int8 => dict_read::<i8, _>(iter, metadata, data_type),
Expand Down
Loading

0 comments on commit 4491954

Please sign in to comment.