diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 997b3e830a3..3971ff3e548 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -166,6 +166,11 @@ def case_nested() -> Tuple[dict, pa.Schema, str]: pa.list_(pa.field("item", pa.int64(), False)), False, ), + pa.field( + "list_int64_optional_required", + pa.list_(pa.field("item", pa.int64(), True)), + False, + ), pa.field("list_int16", pa.list_(pa.int16())), pa.field("list_bool", pa.list_(pa.bool_())), pa.field("list_utf8", pa.list_(pa.utf8())), @@ -182,6 +187,7 @@ def case_nested() -> Tuple[dict, pa.Schema, str]: "list_int64": items_nullable, "list_int64_required": items_required, "list_int64_required_required": all_required, + "list_int64_optional_required": all_required, "list_int16": i16, "list_bool": boolean, "list_utf8": string, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index b8b67364761..04a387a895c 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -27,7 +27,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index b415c6efb9e..0ebe62cd11c 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -26,7 +26,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer)?; diff --git a/src/io/parquet/write/levels.rs b/src/io/parquet/write/levels.rs index 68d3c0b343c..26e6a25cd7f 100644 --- a/src/io/parquet/write/levels.rs +++ b/src/io/parquet/write/levels.rs @@ -1,11 +1,7 @@ use parquet2::encoding::hybrid_rle::encode_u32; use parquet2::write::Version; -use crate::{ - array::Offset, - bitmap::{utils::BitmapIter, Bitmap}, - error::Result, -}; +use crate::{array::Offset, bitmap::Bitmap, error::Result}; pub fn num_values(offsets: &[O]) -> usize { offsets @@ -72,88 +68,53 @@ impl Iterator for RepLevelsIter<'_, O> { } } -enum OffsetsIter<'a, O> { - Optional(std::iter::Zip, BitmapIter<'a>>), - Required(std::slice::Windows<'a, O>), -} - /// Iterator adapter of parquet / dremel definition levels -pub struct DefLevelsIter<'a, O: Offset> { - iter: OffsetsIter<'a, O>, - primitive_validity: Option>, +pub struct DefLevelsIter<'a, O: Offset, II: Iterator, I: Iterator> { + iter: std::iter::Zip, II>, + primitive_validity: I, remaining: usize, - is_valid: bool, - length: usize, + is_valid: u32, total_size: usize, } -impl<'a, O: Offset> DefLevelsIter<'a, O> { - pub fn new( - offsets: &'a [O], - validity: Option<&'a Bitmap>, - primitive_validity: Option<&'a Bitmap>, - ) -> Self { +impl<'a, O: Offset, II: Iterator, I: Iterator> DefLevelsIter<'a, O, II, I> { + pub fn new(offsets: &'a [O], validity: II, primitive_validity: I) -> Self { let total_size = num_values(offsets); - let primitive_validity = primitive_validity.map(|x| x.iter()); - - let iter = validity - .map(|x| OffsetsIter::Optional(offsets.windows(2).zip(x.iter()))) - .unwrap_or_else(|| OffsetsIter::Required(offsets.windows(2))); + let iter = offsets.windows(2).zip(validity); Self { iter, primitive_validity, remaining: 0, - length: 0, - is_valid: false, + is_valid: 0, total_size, } } } -impl Iterator for DefLevelsIter<'_, O> { +impl, I: Iterator> Iterator + for DefLevelsIter<'_, O, II, I> +{ type Item = u32; fn next(&mut self) -> Option { - if self.remaining == self.length { - match &mut self.iter { - OffsetsIter::Optional(iter) => { - let (w, is_valid) = iter.next()?; - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.length = end - start; - self.remaining = 0; - self.is_valid = is_valid; - if self.length == 0 { - self.total_size -= 1; - return Some(self.is_valid as u32); - } - } - OffsetsIter::Required(iter) => { - let w = iter.next()?; - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.length = end - start; - self.remaining = 0; - self.is_valid = true; - if self.length == 0 { - self.total_size -= 1; - return Some(0); - } - } + if self.remaining == 0 { + let (w, is_valid) = self.iter.next()?; + let start = w[0].to_usize(); + let end = w[1].to_usize(); + self.remaining = end - start; + self.is_valid = is_valid + 1; + if self.remaining == 0 { + self.total_size -= 1; + return Some(self.is_valid - 1); } } - self.remaining += 1; + self.remaining -= 1; self.total_size -= 1; - let (base_def, p_is_valid) = self - .primitive_validity - .as_mut() - .map(|x| (1, x.next().unwrap() as u32)) - .unwrap_or((0, 0)); - let def_ = (base_def + 1) * self.is_valid as u32 + p_is_valid; - Some(def_) + let p_is_valid = self.primitive_validity.next().unwrap_or_default(); + Some(self.is_valid + p_is_valid) } fn size_hint(&self) -> (usize, Option) { @@ -163,7 +124,7 @@ impl Iterator for DefLevelsIter<'_, O> { #[derive(Debug)] pub struct NestedInfo<'a, O: Offset> { - _is_optional: bool, + is_optional: bool, offsets: &'a [O], validity: Option<&'a Bitmap>, } @@ -171,7 +132,7 @@ pub struct NestedInfo<'a, O: Offset> { impl<'a, O: Offset> NestedInfo<'a, O> { pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self { Self { - _is_optional: is_optional, + is_optional, offsets, validity, } @@ -182,7 +143,7 @@ impl<'a, O: Offset> NestedInfo<'a, O> { } } -fn write_levels_v1) -> Result<()>>( +fn write_levels_v1) -> Result<()>>( buffer: &mut Vec, encode: F, ) -> Result<()> { @@ -226,30 +187,107 @@ pub fn write_rep_levels( Ok(()) } +fn write_def_levels1>( + buffer: &mut Vec, + levels: I, + num_bits: u8, + version: Version, +) -> Result<()> { + match version { + Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { + encode_u32(buffer, levels, num_bits)?; + Ok(()) + }), + Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), + } +} + /// writes the rep levels to a `Vec`. pub fn write_def_levels( buffer: &mut Vec, nested: &NestedInfo, validity: Option<&Bitmap>, + primitive_is_optional: bool, version: Version, ) -> Result<()> { - let num_bits = 1 + validity.is_some() as u8; - - match version { - Version::V1 => { - write_levels_v1(buffer, |buffer: &mut Vec| { - let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity); - encode_u32(buffer, levels, num_bits)?; - Ok(()) - })?; + let mut num_bits = 1 + nested.is_optional as u8 + primitive_is_optional as u8; + if num_bits == 3 { + // brute-force log2 - this needs to be generalized for e.g. list of list + num_bits = 2 + }; + + // this match ensures that irrespectively of the arrays' validities, we write def levels + // that are consistent with the declared parquet schema. + // see comments on some of the variants + match ( + nested.is_optional, + nested.validity.as_ref(), + primitive_is_optional, + validity.as_ref(), + ) { + // if the validity is optional and there is no validity in the array, we + // need to write 1 to mark the fields as valid + (true, None, true, None) => { + let nested_validity = std::iter::repeat(1); + let validity = std::iter::repeat(1); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) } - Version::V2 => { - let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity); - encode_u32(buffer, levels, num_bits)?; + (true, Some(nested_validity), true, None) => { + let levels = DefLevelsIter::new( + nested.offsets, + nested_validity.iter().map(|x| x as u32), + std::iter::repeat(1), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, None, true, Some(validity)) => { + let levels = DefLevelsIter::new( + nested.offsets, + std::iter::repeat(1), + validity.iter().map(|x| x as u32), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, Some(nested_validity), true, Some(validity)) => { + let levels = DefLevelsIter::new( + nested.offsets, + nested_validity.iter().map(|x| x as u32), + validity.iter().map(|x| x as u32), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, None, false, _) => { + let nested_validity = std::iter::repeat(1); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, Some(nested_validity), false, _) => { + let nested_validity = nested_validity.iter().map(|x| x as u32); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, true, None) => { + let nested_validity = std::iter::repeat(0); + let validity = std::iter::repeat(1); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, true, Some(validity)) => { + let nested_validity = std::iter::repeat(0); + let validity = validity.iter().map(|x| x as u32); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, false, _) => { + let nested_validity = std::iter::repeat(0); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) } } - - Ok(()) } #[cfg(test)] @@ -269,20 +307,21 @@ mod tests { fn test_def_levels() { // [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = Some(Bitmap::from([ - true, false, true, true, true, true, false, true, - ])); - let primitive_validity = Some(Bitmap::from([ + let validity = [true, false, true, true, true, true, false, true] + .into_iter() + .map(|x| x as u32); + let primitive_validity = [ true, true, //[0, 1] true, false, true, //[2, None, 3] true, true, true, //[4, 5, 6] true, true, true, //[7, 8, 9] true, //[10] - ])); + ] + .into_iter() + .map(|x| x as u32); let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3]; - let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref()) - .collect::>(); + let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); assert_eq!(result, expected) } @@ -290,12 +329,11 @@ mod tests { fn test_def_levels1() { // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = None; - let primitive_validity = None; + let validity = std::iter::repeat(0); + let primitive_validity = std::iter::repeat(0); let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1]; - let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref()) - .collect::>(); + let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); assert_eq!(result, expected) } } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 87823723fec..19871fa3f41 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -33,7 +33,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 67360d10682..7637978d272 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -27,7 +27,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7804804e90b..225f19df9c8 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -88,7 +88,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { Some(10), ])) as Arc } - "list_int64_required" | "list_int64_required_required" => { + "list_int64_required" | "list_int64_optional_required" | "list_int64_required_required" => { // [[0, 1], None, [2, 0, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] Arc::new(PrimitiveArray::::from(&[ Some(0), @@ -178,7 +178,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { | "list_nested_inner_required_required_i64" => { Arc::new(NullArray::from_data(DataType::Null, 1)) } - _ => unreachable!(), + other => unreachable!("{}", other), }; match column { @@ -189,6 +189,13 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { data_type, offsets, values, None, )) } + "list_int64_optional_required" => { + // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] + let data_type = DataType::List(Box::new(Field::new("item", DataType::Int64, true))); + Box::new(ListArray::::from_data( + data_type, offsets, values, None, + )) + } "list_nested_i64" => { // [[0, 1]], None, [[2, None], [3]], [[4, 5], [6]], [], [[7], None, [9]], [[], [None], None], [[10]] let data = [ @@ -253,7 +260,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { "list_bool" => Field::new("item", DataType::Boolean, true), "list_utf8" => Field::new("item", DataType::Utf8, true), "list_large_binary" => Field::new("item", DataType::LargeBinary, true), - _ => unreachable!(), + other => unreachable!("{}", other), }; let validity = Some(Bitmap::from([ @@ -594,7 +601,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, - "list_int64_required_required" => Statistics { + "list_int64_required_required" | "list_int64_optional_required" => Statistics { distinct_count: Count::Single(UInt64Array::from([None])), null_count: Count::Single([Some(0)].into()), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), @@ -636,7 +643,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { true, ), }, - _ => todo!(), + other => todo!("{}", other), } } @@ -918,3 +925,62 @@ fn arrow_type() -> Result<()> { assert_eq!(new_batches, vec![batch]); Ok(()) } + +fn data() -> Vec>>> { + // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] + vec![ + Some(vec![Some(0), Some(1)]), + Some(vec![]), + Some(vec![Some(2), Some(0), Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + Some(vec![]), + Some(vec![Some(7), Some(8), Some(9)]), + Some(vec![]), + Some(vec![Some(10)]), + ] +} + +fn list_array_generic(inner_is_nullable: bool, is_nullable: bool) -> Result<()> { + let mut array = MutableListArray::::new_with_field( + MutablePrimitiveArray::::new(), + "item", + inner_is_nullable, + ); + array.try_extend(data()).unwrap(); + let array: ListArray = array.into(); + + let schema = Schema::from(vec![Field::new( + "a1", + array.data_type().clone(), + is_nullable, + )]); + let batch = Chunk::try_new(vec![Arc::new(array) as Arc])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +} + +#[test] +fn list_array_required_required() -> Result<()> { + list_array_generic(false, false) +} + +#[test] +fn list_array_optional_optional() -> Result<()> { + list_array_generic(true, true) +} + +#[test] +fn list_array_required_optional() -> Result<()> { + list_array_generic(false, true) +} + +#[test] +fn list_array_optional_required() -> Result<()> { + list_array_generic(true, false) +} diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index f8283a099e1..0e4d7726389 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -227,7 +227,7 @@ fn v1_nested_int64_required_required() -> Result<()> { } #[test] -fn v2_nested_i16() -> Result<()> { +fn v2_list_int64_required_required() -> Result<()> { test_pyarrow_integration( "list_int64_required_required", 2, @@ -238,6 +238,19 @@ fn v2_nested_i16() -> Result<()> { ) } +#[test] +#[ignore] // see https://issues.apache.org/jira/browse/ARROW-15073 +fn v2_list_int64_optional_required() -> Result<()> { + test_pyarrow_integration( + "list_int64_optional_required", + 2, + "nested", + false, + false, + None, + ) +} + #[test] fn v1_nested_i16() -> Result<()> { test_pyarrow_integration("list_int16", 1, "nested", false, false, None)