Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for decoding delta-length-encoded binary (parquet) #1228

Merged
merged 1 commit into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 131 additions & 39 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::default::Default;

use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
};
Expand All @@ -23,44 +23,6 @@ use super::super::utils::{
use super::super::Pages;
use super::{super::utils, utils::*};

/*
fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let Binary {
offsets,
values,
last_offset,
} = values;

// values_buffer: first 4 bytes are len, remaining is values
let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer);
let offsets_iterator = values_iterator.by_ref().map(|x| {
*last_offset += O::from_usize(x as usize).unwrap();
*last_offset
});

let mut page_validity = OptionalPageValidity::new(validity_buffer, additional);

// offsets:
extend_from_decoder(
validity,
&mut page_validity,
None,
offsets,
offsets_iterator,
);

// values:
let new_values = values_iterator.into_values();
values.extend_from_slice(new_values);
}
*/

#[derive(Debug)]
pub(super) struct Required<'a> {
pub values: SizedBinaryIter<'a>,
Expand All @@ -79,6 +41,52 @@ impl<'a> Required<'a> {
}
}

#[derive(Debug)]
pub(super) struct Delta<'a> {
pub lengths: std::vec::IntoIter<usize>,
pub values: &'a [u8],
}

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();

let values = lengths_iter.into_values();
Ok(Self {
lengths: lengths.into_iter(),
values,
})
}

pub fn len(&self) -> usize {
self.lengths.size_hint().0
}
}

impl<'a> Iterator for Delta<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let length = self.lengths.next()?;
let (item, remaining) = self.values.split_at(length);
self.values = remaining;
Some(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.lengths.size_hint()
}
}

#[derive(Debug)]
pub(super) struct FilteredRequired<'a> {
pub values: SliceFilteredIter<SizedBinaryIter<'a>>,
Expand All @@ -99,6 +107,26 @@ impl<'a> FilteredRequired<'a> {
}
}

#[derive(Debug)]
pub(super) struct FilteredDelta<'a> {
pub values: SliceFilteredIter<Delta<'a>>,
}

impl<'a> FilteredDelta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let values = Delta::try_new(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Ok(Self { values })
}

pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

pub(super) type Dict = Vec<Vec<u8>>;

#[derive(Debug)]
Expand Down Expand Up @@ -167,7 +195,11 @@ enum State<'a> {
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
Delta(Delta<'a>),
OptionalDelta(OptionalPageValidity<'a>, Delta<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredDelta(FilteredDelta<'a>),
FilteredOptionalDelta(FilteredOptionalPageValidity<'a>, Delta<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>),
FilteredRequiredDictionary(FilteredRequiredDictionary<'a>),
FilteredOptionalDictionary(FilteredOptionalPageValidity<'a>, ValuesDictionary<'a>),
Expand All @@ -178,10 +210,14 @@ impl<'a> utils::PageState<'a> for State<'a> {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.len(),
State::Delta(state) => state.len(),
State::OptionalDelta(state, _) => state.len(),
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(validity, _) => validity.len(),
State::FilteredDelta(state) => state.len(),
State::FilteredOptionalDelta(state, _) => state.len(),
State::FilteredRequiredDictionary(values) => values.len(),
State::FilteredOptionalDictionary(optional, _) => optional.len(),
}
Expand Down Expand Up @@ -284,6 +320,20 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
BinaryIter::new(values),
))
}
(Encoding::DeltaLengthByteArray, _, false, false) => {
Delta::try_new(page).map(State::Delta)
}
(Encoding::DeltaLengthByteArray, _, true, false) => Ok(State::OptionalDelta(
OptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
(Encoding::DeltaLengthByteArray, _, false, true) => {
FilteredDelta::try_new(page).map(State::FilteredDelta)
}
(Encoding::DeltaLengthByteArray, _, true, true) => Ok(State::FilteredOptionalDelta(
FilteredOptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -315,11 +365,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::Delta(page) => {
values.extend_lengths(page.lengths.by_ref().take(additional), &mut page.values);
}
State::OptionalDelta(page_validity, page_values) => {
let Binary {
offsets,
values: values_,
last_offset,
} = values;

let offset = *last_offset;
extend_from_decoder(
validity,
page_validity,
Some(additional),
offsets,
page_values.lengths.by_ref().map(|x| {
*last_offset += O::from_usize(x).unwrap();
*last_offset
}),
);

let length = *last_offset - offset;

let (consumed, remaining) = page_values.values.split_at(length.to_usize());
page_values.values = remaining;
values_.extend_from_slice(consumed);
}
State::FilteredRequired(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::FilteredDelta(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down Expand Up @@ -348,6 +431,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page_values.by_ref(),
);
}
State::FilteredOptionalDelta(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
page_values.by_ref(),
);
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down
22 changes: 22 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ pub struct Binary<O: Offset> {
#[derive(Debug)]
pub struct Offsets<O: Offset>(pub Vec<O>);

impl<O: Offset> Offsets<O> {
#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I) {
let mut last_offset = *self.0.last().unwrap();
self.0.extend(lengths.map(|length| {
last_offset += O::from_usize(length).unwrap();
last_offset
}));
}
}

impl<O: Offset> Pushable<O> for Offsets<O> {
#[inline]
fn len(&self) -> usize {
Expand Down Expand Up @@ -63,6 +74,17 @@ impl<O: Offset> Binary<O> {
pub fn len(&self) -> usize {
self.offsets.len()
}

#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I, values: &mut &[u8]) {
let current_offset = self.last_offset;
self.offsets.extend_lengths(lengths);
self.last_offset = *self.offsets.0.last().unwrap(); // guaranteed to have one
let length = self.last_offset.to_usize() - current_offset.to_usize();
let (consumed, remaining) = values.split_at(length);
*values = remaining;
self.values.extend_from_slice(consumed);
}
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
Expand Down
7 changes: 7 additions & 0 deletions src/io/parquet/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ pub(crate) fn encode_delta<O: Offset>(

delta_bitpacked::encode(lengths, buffer);
} else {
println!(
"{:?}",
offsets
.windows(2)
.map(|w| (w[1] - w[0]).to_usize() as i64)
.collect::<Vec<_>>()
);
let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);
delta_bitpacked::encode(lengths, buffer);
}
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ fn indexed_required_utf8() -> Result<()> {
read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected)
}

#[test]
fn indexed_required_utf8_delta() -> Result<()> {
let array21 = Utf8Array::<i32>::from_slice(["a", "b", "c"]);
let array22 = Utf8Array::<i32>::from_slice(["d", "e", "f"]);
let expected = Utf8Array::<i32>::from_slice(["e"]).boxed();

read_with_indexes(
pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?,
expected,
)
}

#[test]
fn indexed_required_i32() -> Result<()> {
let array21 = Int32Array::from_slice([1, 2, 3]);
Expand Down Expand Up @@ -194,6 +206,18 @@ fn indexed_optional_utf8() -> Result<()> {
read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected)
}

#[test]
fn indexed_optional_utf8_delta() -> Result<()> {
let array21 = Utf8Array::<i32>::from([Some("a"), Some("b"), None]);
let array22 = Utf8Array::<i32>::from([None, Some("e"), Some("f")]);
let expected = Utf8Array::<i32>::from_slice(["e"]).boxed();

read_with_indexes(
pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?,
expected,
)
}

#[test]
fn indexed_required_fixed_len() -> Result<()> {
let array21 = FixedSizeBinaryArray::from_slice([[127], [128], [129]]);
Expand Down
12 changes: 11 additions & 1 deletion tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ fn list_large_binary_optional_v1() -> Result<()> {
}

#[test]
#[ignore]
fn utf8_optional_v2_delta() -> Result<()> {
round_trip(
"string",
Expand All @@ -350,6 +349,17 @@ fn utf8_optional_v2_delta() -> Result<()> {
)
}

#[test]
fn utf8_required_v2_delta() -> Result<()> {
round_trip(
"string",
"required",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::DeltaLengthByteArray],
)
}

#[test]
fn i32_optional_v2_dict() -> Result<()> {
round_trip(
Expand Down