Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed StackOverflow in many row groups (#1210)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 8, 2022
1 parent 3a828c6 commit 56189bd
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
36 changes: 16 additions & 20 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,9 @@ pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: Vec<RowGroupMetaData>,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
chunk_size: Option<usize>,
remaining_rows: usize,
current_group: usize,
}

impl<R: Read + Seek> RowGroupReader<R> {
Expand All @@ -199,10 +198,9 @@ impl<R: Read + Seek> RowGroupReader<R> {
reader,
schema,
groups_filter,
row_groups,
row_groups: row_groups.into_iter().enumerate(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
current_group: 0,
}
}

Expand All @@ -216,28 +214,27 @@ impl<R: Read + Seek> RowGroupReader<R> {
if self.schema.fields.is_empty() {
return Ok(None);
}
if self.current_group == self.row_groups.len() {
// reached the last row group
return Ok(None);
};
if self.remaining_rows == 0 {
// reached the limit
return Ok(None);
}

let current_row_group = self.current_group;
let row_group = &self.row_groups[current_row_group];
if let Some(groups_filter) = self.groups_filter.as_ref() {
if !(groups_filter)(current_row_group, row_group) {
self.current_group += 1;
return self._next();
}
}
self.current_group += 1;
let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() {
self.row_groups
.by_ref()
.find(|(index, row_group)| (groups_filter)(*index, row_group))
} else {
self.row_groups.next()
};
let row_group = if let Some((_, row_group)) = row_group {
row_group
} else {
return Ok(None);
};

let column_chunks = read_columns_many(
&mut self.reader,
row_group,
&row_group,
self.schema.fields.clone(),
self.chunk_size,
Some(self.remaining_rows),
Expand All @@ -263,7 +260,6 @@ impl<R: Read + Seek> Iterator for RowGroupReader<R> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.row_groups.len() - self.current_group;
(len, Some(len))
self.row_groups.size_hint()
}
}
32 changes: 30 additions & 2 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,7 @@ fn integration_write(schema: &Schema, chunks: &[Chunk<Box<dyn Array>>]) -> Resul
type IntegrationRead = (Schema, Vec<Chunk<Box<dyn Array>>>);

fn integration_read(data: &[u8], limit: Option<usize>) -> Result<IntegrationRead> {
let reader = Cursor::new(data);
let reader = FileReader::try_new(reader, None, None, limit, None)?;
let reader = FileReader::try_new(Cursor::new(data), None, None, limit, None)?;
let schema = reader.schema().clone();

for field in &schema.fields {
Expand Down Expand Up @@ -1519,3 +1518,32 @@ fn nested_dict_limit() -> Result<()> {

assert_roundtrip(schema, chunk, Some(2))
}

#[test]
fn filter_chunk() -> Result<()> {
let chunk1 = Chunk::new(vec![PrimitiveArray::from_slice([1i16, 3]).boxed()]);
let chunk2 = Chunk::new(vec![PrimitiveArray::from_slice([2i16, 4]).boxed()]);
let schema = Schema::from(vec![Field::new("c1", DataType::Int16, true)]);

let r = integration_write(&schema, &[chunk1.clone(), chunk2.clone()])?;

let reader = FileReader::try_new(
Cursor::new(r),
None,
None,
None,
// select chunk 1
Some(std::sync::Arc::new(|i, _| i == 0)),
)?;
let new_schema = reader.schema().clone();

for field in &schema.fields {
let mut _statistics = deserialize(field, &reader.metadata().row_groups)?;
}

let new_chunks = reader.collect::<Result<Vec<_>>>()?;

assert_eq!(new_schema, schema);
assert_eq!(new_chunks, vec![chunk1]);
Ok(())
}

0 comments on commit 56189bd

Please sign in to comment.