Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added example to read parquet in parallel with rayon. (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 5, 2021
1 parent 299818a commit de87058
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 4 deletions.
6 changes: 3 additions & 3 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let start = SystemTime::now();
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
assert!(batch.num_rows() > 0);
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
8 changes: 8 additions & 0 deletions examples/parquet_read_parallel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "parquet_write_parallel"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rayon = { version = "1", default-features = false }
82 changes: 82 additions & 0 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Example demonstrating how to read from parquet in parallel using rayon
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use std::time::SystemTime;

use rayon::prelude::*;

use arrow2::{
error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
let mut file = BufReader::new(File::open(path)?);
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

// IO-bounded
let columns = file_metadata
.schema()
.fields()
.iter()
.enumerate()
.map(|(field_i, field)| {
let start = SystemTime::now();
println!("read start - field: {}", field_i);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
println!(
"read end - {:?}: {} {}",
start.elapsed().unwrap(),
field_i,
row_group
);
(field_i, field.clone(), column_chunks)
})
.collect::<Vec<_>>();

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|(field_i, parquet_field, column_chunks)| {
let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
let field = &arrow_schema.fields()[field_i];

read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into())
})
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];
let row_group = args[2].parse::<usize>().unwrap();

let start = SystemTime::now();
let batch = parallel_read(file_path, row_group)?;
assert!(batch.num_rows() > 0);
println!("took: {} ms", start.elapsed().unwrap().as_millis());

Ok(())
}
5 changes: 4 additions & 1 deletion examples/parquet_read_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::io::parquet::read;
Expand All @@ -12,9 +13,11 @@ fn main() -> Result<()> {
let reader = File::open(file_path)?;
let reader = read::RecordReader::try_new(reader, None, None, None, None)?;

let start = SystemTime::now();
for maybe_batch in reader {
let batch = maybe_batch?;
println!("{:?}", batch);
assert!(batch.num_rows() > 0);
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}

0 comments on commit de87058

Please sign in to comment.