From 838decac839f66d1f54f9c0cc418e5e0c8b37f87 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 9 Aug 2022 06:29:51 +0200 Subject: [PATCH] Added support for parquet sidecar to `FileReader` (#1215) --- benches/read_parquet.rs | 34 ++++---- examples/parquet_read.rs | 36 ++++++--- examples/parquet_read_async.rs | 2 +- examples/s3/src/main.rs | 2 +- src/datatypes/schema.rs | 22 +++++ src/io/parquet/read/file.rs | 120 ++++------------------------ src/io/parquet/read/row_group.rs | 4 +- tests/it/io/parquet/mod.rs | 65 ++++++++------- tests/it/io/parquet/read.rs | 22 +++-- tests/it/io/parquet/read_indexes.rs | 2 +- tests/it/io/parquet/write_async.rs | 2 +- 11 files changed, 138 insertions(+), 173 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 3d1dc858588..94c089931a2 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -32,10 +32,16 @@ fn to_buffer( buffer } -fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { - let file = Cursor::new(buffer); +fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> { + let mut reader = Cursor::new(buffer); - let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?; + let metadata = read::read_metadata(&mut reader)?; + + let schema = read::infer_schema(&metadata)?; + + let schema = schema.filter(|index, _| index == column); + + let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None); for maybe_chunk in reader { let columns = maybe_chunk?; @@ -49,43 +55,43 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(i); let buffer = to_buffer(size, true, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 6).unwrap())); let a = format!("read utf8 emoji 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 12).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 12).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 3).unwrap())); let buffer = to_buffer(size, true, true, false, false); let a = format!("read utf8 dict 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, false, true); let a = format!("read i64 snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, true, false, true, false); let a = format!("read utf8 multi 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, true, true); let a = format!("read utf8 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, true, true); let a = format!("read i64 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, false, false, false, false); let a = format!("read required utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); }); } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 92fa83c521e..e0e5a220e8a 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,30 +1,44 @@ use std::fs::File; use std::time::SystemTime; -use arrow2::error::Result; +use arrow2::error::Error; use arrow2::io::parquet::read; -fn main() -> Result<()> { +fn main() -> Result<(), Error> { + // say we have a file use std::env; let args: Vec = env::args().collect(); - let file_path = &args[1]; + let mut reader = File::open(file_path)?; - let reader = File::open(file_path)?; - let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?; + // we can read its metadata: + let metadata = read::read_metadata(&mut reader)?; - println!("{:#?}", reader.schema()); + // and infer a [`Schema`] from the `metadata`. + let schema = read::infer_schema(&metadata)?; - // say we want to evaluate if the we can skip some row groups based on a field's value - let field = &reader.schema().fields[0]; + // we can filter the columns we need (here we select all) + let schema = schema.filter(|_index, _field| true); - // we can deserialize the parquet statistics from this field - let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?; + // we can read the statistics of all parquet's row groups (here for the first field) + let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?; println!("{:#?}", statistics); + // say we found that we only need to read the first two row groups, "0" and "1" + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0 || *index == 1) + .map(|(_, row_group)| row_group) + .collect(); + + // we can then read the row groups into chunks + let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None); + let start = SystemTime::now(); - for maybe_chunk in reader { + for maybe_chunk in chunks { let chunk = maybe_chunk?; assert!(!chunk.is_empty()); } diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs index 2428b368869..3f1abc4faf6 100644 --- a/examples/parquet_read_async.rs +++ b/examples/parquet_read_async.rs @@ -43,7 +43,7 @@ async fn main() -> Result<()> { // the runtime. // Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator // can be advanced in parallel (parallel decompression and deserialization). - let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None); + let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None); for maybe_chunk in chunks { let chunk = maybe_chunk?; println!("{}", chunk.len()); diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs index 5aa24431dd9..d3f668d4421 100644 --- a/examples/s3/src/main.rs +++ b/examples/s3/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<()> { // this is CPU-bounded and should be sent to a separate thread-pool. // We do it here for simplicity - let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows(), None); let chunks = chunks.collect::>>()?; // this is a single chunk because chunk_size is `None` diff --git a/src/datatypes/schema.rs b/src/datatypes/schema.rs index baa04476360..671c9438622 100644 --- a/src/datatypes/schema.rs +++ b/src/datatypes/schema.rs @@ -26,6 +26,28 @@ impl Schema { metadata, } } + + /// Returns a new [`Schema`] with a subset of all fields whose `predicate` + /// evaluates to true. + pub fn filter bool>(self, predicate: F) -> Self { + let fields = self + .fields + .into_iter() + .enumerate() + .filter_map(|(index, f)| { + if (predicate)(index, &f) { + Some(f) + } else { + None + } + }) + .collect(); + + Schema { + fields, + metadata: self.metadata, + } + } } impl From> for Schema { diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 638089668af..ff8944d38cf 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -1,18 +1,12 @@ use std::io::{Read, Seek}; -use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; +use crate::error::Result; use crate::io::parquet::read::read_columns_many; -use crate::{ - datatypes::Field, - error::{Error, Result}, -}; -use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; - -type GroupFilter = Arc bool + Send + Sync>; +use super::{RowGroupDeserializer, RowGroupMetaData}; /// An iterator of [`Chunk`]s coming from row groups of a parquet file. /// @@ -20,96 +14,29 @@ type GroupFilter = Arc bool + Send + Sync>; /// mapped to an [`Iterator`] and each iterator is iterated upon until either the limit /// or the last iterator ends. /// # Implementation -/// This iterator mixes IO-bounded and CPU-bounded operations. +/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them. pub struct FileReader { row_groups: RowGroupReader, - metadata: FileMetaData, remaining_rows: usize, current_row_group: Option, } impl FileReader { - /// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing - /// Arrow's schema from it. - /// - /// # Error - /// This function errors iff: - /// * reading the metadata from the reader fails - /// * it is not possible to derive an arrow schema from the parquet file - /// * the projection contains columns that do not exist - pub fn try_new( - mut reader: R, - projection: Option<&[usize]>, + /// Returns a new [`FileReader`]. + pub fn new( + reader: R, + row_groups: Vec, + schema: Schema, chunk_size: Option, limit: Option, - groups_filter: Option, - ) -> Result { - let metadata = read_metadata(&mut reader)?; - - let schema = infer_schema(&metadata)?; - - let schema_metadata = schema.metadata; - let fields: Vec = if let Some(projection) = &projection { - schema - .fields - .into_iter() - .enumerate() - .filter_map(|(index, f)| { - if projection.iter().any(|&i| i == index) { - Some(f) - } else { - None - } - }) - .collect() - } else { - schema.fields.into_iter().collect() - }; - - if let Some(projection) = &projection { - if fields.len() != projection.len() { - return Err(Error::InvalidArgumentError( - "While reading parquet, some columns in the projection do not exist in the file" - .to_string(), - )); - } - } - - let schema = Schema { - fields, - metadata: schema_metadata, - }; - - let row_groups = RowGroupReader::new( - reader, - schema, - groups_filter, - metadata.row_groups.clone(), - chunk_size, - limit, - ); + ) -> Self { + let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit); - Ok(Self { + Self { row_groups, - metadata, remaining_rows: limit.unwrap_or(usize::MAX), current_row_group: None, - }) - } - - /// Returns the derived arrow [`Schema`] of the file - pub fn schema(&self) -> &Schema { - &self.row_groups.schema - } - - /// Returns parquet's [`FileMetaData`]. - pub fn metadata(&self) -> &FileMetaData { - &self.metadata - } - - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.row_groups.set_groups_filter(groups_filter); + } } fn next_row_group(&mut self) -> Result> { @@ -178,7 +105,6 @@ impl Iterator for FileReader { pub struct RowGroupReader { reader: R, schema: Schema, - groups_filter: Option, row_groups: std::iter::Enumerate>, chunk_size: Option, remaining_rows: usize, @@ -189,7 +115,6 @@ impl RowGroupReader { pub fn new( reader: R, schema: Schema, - groups_filter: Option, row_groups: Vec, chunk_size: Option, limit: Option, @@ -197,18 +122,12 @@ impl RowGroupReader { Self { reader, schema, - groups_filter, row_groups: row_groups.into_iter().enumerate(), chunk_size, remaining_rows: limit.unwrap_or(usize::MAX), } } - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.groups_filter = Some(groups_filter); - } - #[inline] fn _next(&mut self) -> Result> { if self.schema.fields.is_empty() { @@ -219,14 +138,7 @@ impl RowGroupReader { return Ok(None); } - let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() { - self.row_groups - .by_ref() - .find(|(index, row_group)| (groups_filter)(*index, row_group)) - } else { - self.row_groups.next() - }; - let row_group = if let Some((_, row_group)) = row_group { + let row_group = if let Some((_, row_group)) = self.row_groups.next() { row_group } else { return Ok(None); @@ -242,12 +154,10 @@ impl RowGroupReader { let result = RowGroupDeserializer::new( column_chunks, - row_group.num_rows() as usize, + row_group.num_rows(), Some(self.remaining_rows), ); - self.remaining_rows = self - .remaining_rows - .saturating_sub(row_group.num_rows() as usize); + self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows()); Ok(Some(result)) } } diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 4dde21e467d..59fba886a65 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -260,8 +260,6 @@ pub async fn read_columns_many_async< field_columns .into_iter() .zip(fields.into_iter()) - .map(|(columns, field)| { - to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) - }) + .map(|(columns, field)| to_deserializer(columns, field, row_group.num_rows(), chunk_size)) .collect() } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 8b8a3c2038d..55baceaa9af 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -7,8 +7,8 @@ use arrow2::{ chunk::Chunk, datatypes::*, error::Result, + io::parquet::read as p_read, io::parquet::read::statistics::*, - io::parquet::read::*, io::parquet::write::*, types::{days_ms, NativeType}, }; @@ -23,29 +23,24 @@ mod write_async; type ArrayStats = (Box, Statistics); pub fn read_column(mut reader: R, column: &str) -> Result { - let metadata = read_metadata(&mut reader)?; - let schema = infer_schema(&metadata)?; + let metadata = p_read::read_metadata(&mut reader)?; + let schema = p_read::infer_schema(&metadata)?; // verify that we can read indexes - let _indexes = read_columns_indexes( + let _indexes = p_read::read_columns_indexes( &mut reader, metadata.row_groups[0].columns(), &schema.fields, )?; - let column = schema - .fields - .iter() - .enumerate() - .find_map(|(i, f)| if f.name == column { Some(i) } else { None }) - .unwrap(); + let schema = schema.filter(|_, f| f.name == column); - let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?; - - let field = &schema.fields[column]; + let field = &schema.fields[0]; let statistics = deserialize(field, &metadata.row_groups)?; + let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None, None); + Ok(( reader.next().unwrap()?.into_arrays().pop().unwrap(), statistics, @@ -1150,13 +1145,22 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul type IntegrationRead = (Schema, Vec>>); fn integration_read(data: &[u8], limit: Option) -> Result { - let reader = FileReader::try_new(Cursor::new(data), None, None, limit, None)?; - let schema = reader.schema().clone(); + let mut reader = Cursor::new(data); + let metadata = p_read::read_metadata(&mut reader)?; + let schema = p_read::infer_schema(&metadata)?; for field in &schema.fields { - let mut _statistics = deserialize(field, &reader.metadata().row_groups)?; + let mut _statistics = deserialize(field, &metadata.row_groups)?; } + let reader = p_read::FileReader::new( + Cursor::new(data), + metadata.row_groups, + schema.clone(), + None, + limit, + ); + let batches = reader.collect::>>()?; Ok((schema, batches)) @@ -1527,23 +1531,26 @@ fn filter_chunk() -> Result<()> { let r = integration_write(&schema, &[chunk1.clone(), chunk2.clone()])?; - let reader = FileReader::try_new( - Cursor::new(r), - None, - None, - None, - // select chunk 1 - Some(std::sync::Arc::new(|i, _| i == 0)), - )?; - let new_schema = reader.schema().clone(); + let mut reader = Cursor::new(r); - for field in &schema.fields { - let mut _statistics = deserialize(field, &reader.metadata().row_groups)?; - } + let metadata = p_read::read_metadata(&mut reader)?; + + let new_schema = p_read::infer_schema(&metadata)?; + assert_eq!(new_schema, schema); + + // select chunk 1 + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0) + .map(|(_, row_group)| row_group) + .collect(); + + let reader = p_read::FileReader::new(reader, row_groups, schema, None, None); let new_chunks = reader.collect::>>()?; - assert_eq!(new_schema, schema); assert_eq!(new_chunks, vec![chunk1]); Ok(()) } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 45c5b714d6a..efb3943da94 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -496,9 +496,11 @@ fn v1_map_nullable() -> Result<()> { #[test] fn all_types() -> Result<()> { let path = "testing/parquet-testing/data/alltypes_plain.parquet"; - let reader = std::fs::File::open(path)?; + let mut reader = std::fs::File::open(path)?; - let reader = FileReader::try_new(reader, None, None, None, None)?; + let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; + let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -535,10 +537,12 @@ fn all_types() -> Result<()> { fn all_types_chunked() -> Result<()> { // this has one batch with 8 elements let path = "testing/parquet-testing/data/alltypes_plain.parquet"; - let reader = std::fs::File::open(path)?; + let mut reader = std::fs::File::open(path)?; + let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::try_new(reader, None, Some(5), None, None)?; + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -584,7 +588,7 @@ fn all_types_chunked() -> Result<()> { #[cfg(feature = "io_parquet_compression")] #[test] -fn invalid_utf8() { +fn invalid_utf8() -> Result<()> { let invalid_data = &[ 0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x24, 0x15, 0x28, 0x2c, 0x15, 0x02, 0x15, 0x00, 0x15, 0x06, 0x15, 0x08, 0x00, 0x00, 0x12, 0x44, 0x02, 0x00, 0x00, 0x00, 0x03, 0xff, 0x08, @@ -597,8 +601,11 @@ fn invalid_utf8() { 0x42, 0x00, 0x51, 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31, ]; - let reader = Cursor::new(invalid_data); - let reader = FileReader::try_new(reader, None, Some(5), None, None).unwrap(); + let mut reader = Cursor::new(invalid_data); + + let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let error = reader.collect::>>().unwrap_err(); assert!( @@ -606,4 +613,5 @@ fn invalid_utf8() { "unexpected error: {}", error ); + Ok(()) } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 68f09bae804..328a826eaf0 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -125,7 +125,7 @@ fn read_with_indexes( vec![&c1.descriptor().descriptor.primitive_type], schema.fields[1].clone(), None, - row_group.num_rows() as usize, + row_group.num_rows(), )?; let arrays = arrays.collect::>>()?; diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 5644caf67f5..000c39ca64c 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -63,7 +63,7 @@ async fn test_parquet_async_roundtrip() { let column_chunks = read_columns_many_async(factory, group, schema.fields.clone(), None) .await .unwrap(); - let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows(), None); let mut chunks = chunks.collect::>>().unwrap(); out.append(&mut chunks); }