Skip to content

Commit

Permalink
Reuse compress buffer (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 17, 2021
1 parent 258644e commit 7fbf8e4
Show file tree
Hide file tree
Showing 27 changed files with 509 additions and 275 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bench = false
[dependencies]
parquet-format-async-temp = "0.2.0"
bitpacking = { version = "0.8.2", features = ["bitpacker1x"] }
streaming-iterator = "0.1.5"
streaming-decompression = "0.1"

async-stream = { version = "0.3.2", optional = true }
futures = { version = "0.3", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/read/binary.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use parquet::{
encoding::{bitpacking, hybrid_rle::HybridRleDecoder, plain_byte_array, uleb128, Encoding},
encoding::{hybrid_rle::HybridRleDecoder, plain_byte_array, Encoding},
error::Result,
metadata::ColumnDescriptor,
page::{split_buffer, BinaryPageDict, DataPage},
Expand Down
51 changes: 26 additions & 25 deletions integration-tests/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,55 @@ pub fn page_to_array(page: &DataPage, descriptor: &ColumnDescriptor) -> Result<A
match (descriptor.type_(), descriptor.max_rep_level()) {
(ParquetType::PrimitiveType { physical_type, .. }, 0) => match page.dictionary_page() {
Some(_) => match physical_type {
PhysicalType::Int32 => Ok(Array::Int32(primitive::page_dict_to_vec(
&page, descriptor,
)?)),
PhysicalType::Int64 => Ok(Array::Int64(primitive::page_dict_to_vec(
&page, descriptor,
)?)),
PhysicalType::Int96 => Ok(Array::Int96(primitive::page_dict_to_vec(
&page, descriptor,
)?)),
PhysicalType::Int32 => {
Ok(Array::Int32(primitive::page_dict_to_vec(page, descriptor)?))
}
PhysicalType::Int64 => {
Ok(Array::Int64(primitive::page_dict_to_vec(page, descriptor)?))
}
PhysicalType::Int96 => {
Ok(Array::Int96(primitive::page_dict_to_vec(page, descriptor)?))
}
PhysicalType::Float => Ok(Array::Float32(primitive::page_dict_to_vec(
&page, descriptor,
page, descriptor,
)?)),
PhysicalType::Double => Ok(Array::Float64(primitive::page_dict_to_vec(
&page, descriptor,
page, descriptor,
)?)),
PhysicalType::ByteArray => {
Ok(Array::Binary(binary::page_dict_to_vec(&page, descriptor)?))
Ok(Array::Binary(binary::page_dict_to_vec(page, descriptor)?))
}
_ => todo!(),
},
None => match physical_type {
PhysicalType::Boolean => {
Ok(Array::Boolean(boolean::page_to_vec(&page, descriptor)?))
Ok(Array::Boolean(boolean::page_to_vec(page, descriptor)?))
}
PhysicalType::Int32 => Ok(Array::Int32(primitive::page_to_vec(&page, descriptor)?)),
PhysicalType::Int64 => Ok(Array::Int64(primitive::page_to_vec(&page, descriptor)?)),
PhysicalType::Int96 => Ok(Array::Int96(primitive::page_to_vec(&page, descriptor)?)),
PhysicalType::Int32 => Ok(Array::Int32(primitive::page_to_vec(page, descriptor)?)),
PhysicalType::Int64 => Ok(Array::Int64(primitive::page_to_vec(page, descriptor)?)),
PhysicalType::Int96 => Ok(Array::Int96(primitive::page_to_vec(page, descriptor)?)),
PhysicalType::Float => {
Ok(Array::Float32(primitive::page_to_vec(&page, descriptor)?))
Ok(Array::Float32(primitive::page_to_vec(page, descriptor)?))
}
PhysicalType::Double => {
Ok(Array::Float64(primitive::page_to_vec(&page, descriptor)?))
Ok(Array::Float64(primitive::page_to_vec(page, descriptor)?))
}
PhysicalType::ByteArray => {
Ok(Array::Binary(binary::page_to_vec(&page, descriptor)?))
Ok(Array::Binary(binary::page_to_vec(page, descriptor)?))
}
_ => todo!(),
},
},
(ParquetType::PrimitiveType { physical_type, .. }, _) => match page.dictionary_page() {
None => match physical_type {
PhysicalType::Int64 => {
Ok(primitive_nested::page_to_array::<i64>(&page, descriptor)?)
Ok(primitive_nested::page_to_array::<i64>(page, descriptor)?)
}
_ => todo!(),
},
Some(_) => match physical_type {
PhysicalType::Int64 => Ok(primitive_nested::page_dict_to_array::<i64>(
&page, descriptor,
page, descriptor,
)?),
_ => todo!(),
},
Expand All @@ -83,9 +83,10 @@ pub(crate) mod tests {
use std::fs::File;

use parquet::error::Result;
use parquet::read::{get_page_iterator, read_metadata, Decompressor, StreamingIterator};
use parquet::read::{get_page_iterator, read_metadata, Decompressor};
use parquet::statistics::{BinaryStatistics, PrimitiveStatistics, Statistics};
use parquet::types::int96_to_i64_ns;
use parquet::FallibleStreamingIterator;

use super::*;
use crate::tests::*;
Expand All @@ -100,14 +101,14 @@ pub(crate) mod tests {
let column_meta = metadata.row_groups[row_group].column(column);
let descriptor = column_meta.descriptor().clone();

let iterator = get_page_iterator(&column_meta, reader, None, vec![])?;
let iterator = get_page_iterator(column_meta, reader, None, vec![])?;

let buffer = vec![];
let mut iterator = Decompressor::new(iterator, buffer);

let statistics = column_meta.statistics().transpose()?;

let page = iterator.next().unwrap().as_ref().unwrap();
let page = iterator.next()?.unwrap();

let array = page_to_array(page, &descriptor)?;

Expand Down Expand Up @@ -201,7 +202,7 @@ pub(crate) mod tests {
];

let expected = expected.into_iter().map(Some).collect::<Vec<_>>();
let (array, _) = get_column(&path, 10)?;
let (array, _) = get_column(path, 10)?;
if let Array::Int96(array) = array {
let a = array
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/read/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::TryInto;
use super::utils::ValuesDef;

use parquet::{
encoding::{bitpacking, hybrid_rle::HybridRleDecoder, uleb128, Encoding},
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
error::Result,
metadata::ColumnDescriptor,
page::{split_buffer, DataPage, PrimitivePageDict},
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/read/primitive_nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ fn read_dict_array<T: NativeType>(
let bit_width = values[0];
let values = &values[1..];

let (_, consumed) = uleb128::decode(&values);
let (_, consumed) = uleb128::decode(values);
let values = &values[consumed..];

let indices = bitpacking::Decoder::new(values, bit_width, length as usize);
Expand Down
44 changes: 27 additions & 17 deletions integration-tests/src/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
pub(crate) mod primitive;

use parquet::{
error::Result, metadata::ColumnDescriptor, page::CompressedPage, write::WriteOptions,
};
use parquet::{error::Result, metadata::ColumnDescriptor, page::EncodedPage, write::WriteOptions};

use super::Array;

pub fn array_to_page(
array: &Array,
options: &WriteOptions,
descriptor: &ColumnDescriptor,
) -> Result<CompressedPage> {
) -> Result<EncodedPage> {
// using plain encoding format
match array {
Array::Int32(array) => primitive::array_to_page_v1(&array, options, descriptor),
Array::Int64(array) => primitive::array_to_page_v1(&array, options, descriptor),
Array::Int96(array) => primitive::array_to_page_v1(&array, options, descriptor),
Array::Float32(array) => primitive::array_to_page_v1(&array, options, descriptor),
Array::Float64(array) => primitive::array_to_page_v1(&array, options, descriptor),
Array::Int32(array) => primitive::array_to_page_v1(array, options, descriptor),
Array::Int64(array) => primitive::array_to_page_v1(array, options, descriptor),
Array::Int96(array) => primitive::array_to_page_v1(array, options, descriptor),
Array::Float32(array) => primitive::array_to_page_v1(array, options, descriptor),
Array::Float64(array) => primitive::array_to_page_v1(array, options, descriptor),
_ => todo!(),
}
}
Expand All @@ -33,7 +31,7 @@ mod tests {
use parquet::error::Result;
use parquet::metadata::SchemaDescriptor;
use parquet::statistics::Statistics;
use parquet::write::{write_file, DynIter, Version};
use parquet::write::{write_file, Compressor, DynIter, DynStreamingIterator, Version};

use super::*;

Expand Down Expand Up @@ -67,9 +65,13 @@ mod tests {

let a = schema.columns();

let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(DynIter::new(
std::iter::once(array_to_page(&array, &options, &a[0])),
))))));
let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(
DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))),
options.compression,
vec![],
)),
)))));

let mut writer = Cursor::new(vec![]);
write_file(&mut writer, row_groups, schema, options, None, None)?;
Expand Down Expand Up @@ -140,7 +142,7 @@ mod tests2 {
error::Result,
metadata::SchemaDescriptor,
read::read_metadata,
write::{write_file, DynIter, Version},
write::{write_file, Compressor, DynIter, DynStreamingIterator, Version},
};

#[test]
Expand All @@ -163,9 +165,17 @@ mod tests2 {

let schema = SchemaDescriptor::try_from_message("message schema { OPTIONAL INT32 col; }")?;

let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(DynIter::new(
std::iter::once(array_to_page_v1(&array, &options, &schema.columns()[0])),
))))));
let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(
DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page_v1(
&array,
&options,
&schema.columns()[0],
))),
options.compression,
vec![],
)),
)))));

let mut writer = Cursor::new(vec![]);
write_file(&mut writer, row_groups, schema, options, None, None)?;
Expand Down
22 changes: 4 additions & 18 deletions integration-tests/src/write/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use parquet::{
encoding::Encoding,
metadata::ColumnDescriptor,
page::{CompressedDataPage, CompressedPage, DataPageHeader, DataPageHeaderV1},
page::{DataPage, DataPageHeader, DataPageHeaderV1, EncodedPage},
statistics::{serialize_statistics, PrimitiveStatistics, Statistics},
types::NativeType,
write::WriteOptions,
{compression::create_codec, encoding::hybrid_rle::encode_bool, error::Result},
{encoding::hybrid_rle::encode_bool, error::Result},
};

fn unzip_option<T: NativeType>(array: &[Option<T>]) -> Result<(Vec<u8>, Vec<u8>)> {
Expand Down Expand Up @@ -44,22 +44,10 @@ pub fn array_to_page_v1<T: NativeType>(
array: &[Option<T>],
options: &WriteOptions,
descriptor: &ColumnDescriptor,
) -> Result<CompressedPage> {
) -> Result<EncodedPage> {
let (values, mut buffer) = unzip_option(array)?;

buffer.extend_from_slice(&values);
let uncompressed_page_size = buffer.len();

let codec = create_codec(&options.compression)?;
let buffer = if let Some(mut codec) = codec {
// todo: remove this allocation by extending `buffer` directly.
// needs refactoring `compress`'s API.
let mut tmp = vec![];
codec.compress(&values, &mut tmp)?;
tmp
} else {
buffer
};

let statistics = if options.write_statistics {
let statistics = &PrimitiveStatistics {
Expand All @@ -82,11 +70,9 @@ pub fn array_to_page_v1<T: NativeType>(
statistics,
};

Ok(CompressedPage::Data(CompressedDataPage::new(
Ok(EncodedPage::Data(DataPage::new(
DataPageHeader::V1(header),
buffer,
options.compression,
uncompressed_page_size,
None,
descriptor.clone(),
)))
Expand Down
26 changes: 9 additions & 17 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,33 +274,25 @@ mod tests {

fn test_roundtrip(c: Compression, data: &[u8]) {
let mut c1 = create_codec(&c).unwrap().unwrap();
let mut c2 = create_codec(&c).unwrap().unwrap();

// Compress with c1
let mut compressed = Vec::new();
let offset = 2;

// Compress to a buffer that already has data is possible
let mut compressed = vec![2; offset];
c1.compress(data, &mut compressed)
.expect("Error when compressing");

// Decompress with c2
let mut decompressed = vec![0; data.len()];
c2.decompress(compressed.as_slice(), &mut decompressed)
.expect("Error when decompressing");
assert_eq!(data, decompressed.as_slice());

compressed.clear();

// Compress with c2
c2.compress(data, &mut compressed)
.expect("Error when compressing");
// data is compressed...
assert!(compressed.len() - 2 < data.len());

// Decompress with c1
c1.decompress(compressed.as_slice(), &mut decompressed)
let mut decompressed = vec![0; data.len()];
c1.decompress(&compressed[offset..], &mut decompressed)
.expect("Error when decompressing");
assert_eq!(data, decompressed.as_slice());
}

fn test_codec(c: Compression) {
let sizes = vec![100, 10000, 100000];
let sizes = vec![10000, 100000];
for size in sizes {
let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();
test_roundtrip(c, &data);
Expand Down
7 changes: 0 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ impl std::fmt::Display for ParquetError {
}
}

impl ParquetError {
/// Wraps an external error in an `ParquetError`.
pub fn from_external_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::External("".to_string(), Arc::new(error))
}
}

#[cfg(feature = "snappy")]
impl From<snap::Error> for ParquetError {
fn from(e: snap::Error) -> ParquetError {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub mod statistics;
pub mod types;
pub mod write;

pub use streaming_decompression::fallible_streaming_iterator;
pub use streaming_decompression::FallibleStreamingIterator;

const FOOTER_SIZE: u64 = 8;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

Expand Down
Loading

0 comments on commit 7fbf8e4

Please sign in to comment.