Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Replaced RecordBatch by Chunk (#717)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jan 3, 2022
1 parent ef7937d commit 9b54146
Show file tree
Hide file tree
Showing 89 changed files with 840 additions and 1,351 deletions.
24 changes: 14 additions & 10 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
io::{
Expand All @@ -13,15 +15,17 @@ use arrow2::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
},
record_batch::RecordBatch,
};

use clap::{App, Arg};

use flate2::read::GzDecoder;

/// Read gzipped JSON file
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec<RecordBatch>) {
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Arc<dyn Array>>>)> {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand All @@ -31,10 +35,11 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
// convert to Arrow JSON
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&s)?;

let schema = serde_json::to_value(arrow_json.schema).unwrap();
let (schema, ipc_fields) = read::deserialize_schema(&schema).unwrap();

let (schema, ipc_fields) = read::deserialize_schema(&schema)?;

// read dictionaries
let mut dictionaries = HashMap::new();
Expand All @@ -48,11 +53,10 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec
let batches = arrow_json
.batches
.iter()
.map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()
.unwrap();
.map(|batch| read::deserialize_chunk(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()?;

(schema, ipc_fields, batches)
Ok((schema, ipc_fields, batches))
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -108,7 +112,7 @@ fn main() -> Result<()> {
.collect::<Vec<_>>()
});

let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file);
let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file)?;

let schema = if let Some(projection) = &projection {
let fields = schema
Expand Down Expand Up @@ -144,7 +148,7 @@ fn main() -> Result<()> {
}
})
.collect();
RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap()
Chunk::try_new(columns).unwrap()
})
.collect::<Vec<_>>()
} else {
Expand Down
11 changes: 4 additions & 7 deletions benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter};
use arrow2::chunk::Chunk;
use arrow2::compute::filter::{build_filter, filter, filter_chunk, Filter};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn bench_filter(data_array: &dyn Array, filter_array: &BooleanArray) {
Expand Down Expand Up @@ -125,13 +125,10 @@ fn add_benchmark(c: &mut Criterion) {

let data_array = create_primitive_array::<f32>(size, 0.0);

let field = Field::new("c1", data_array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data_array)]).unwrap();
let columns = Chunk::try_new(vec![Arc::new(data_array)]).unwrap();

c.bench_function("filter single record batch", |b| {
b.iter(|| filter_record_batch(&batch, &filter_array))
b.iter(|| filter_record_batch(&columns, &filter_array))
});
}

Expand Down
6 changes: 3 additions & 3 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
assert_eq!(batch.num_rows(), size);
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert_eq!(columns.len(), size);
}
Ok(())
}
Expand Down
17 changes: 9 additions & 8 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,45 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

fn write_batch(batch: &RecordBatch) -> Result<()> {
fn write_batch(columns: &Chunk) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);

write::write_header(writer, batch.schema())?;
assert_eq!(columns.arrays().len(), 1);
write::write_header(writer, &["a"])?;

let options = write::SerializeOptions::default();
write::write_batch(writer, batch, &options)
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
RecordBatch::try_from_iter([("a", Arc::new(array) as Arc<dyn Array>)]).unwrap()
fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array)])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_batch(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_batch(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
Expand Down
8 changes: 4 additions & 4 deletions benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write::*;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?;
let columns = Chunk::try_new(vec![clone(array).into()])?;

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema, Default::default())?;
let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?;

writer.write(&batch)
writer.write(&columns, None)
}

fn add_benchmark(c: &mut Criterion) {
Expand Down
24 changes: 12 additions & 12 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,53 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::write;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

fn write_batch(batch: &RecordBatch) -> Result<()> {
fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();

let batches = vec![Ok(batch.clone())].into_iter();
let batches = vec![Ok(columns.clone())].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);
let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
RecordBatch::try_from_iter([("a", Arc::new(array) as Arc<dyn Array>)]).unwrap()
fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_batch(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_batch(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});
});
}
Expand Down
6 changes: 3 additions & 3 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::io::Cursor;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let batch = RecordBatch::try_from_iter([("c1", clone(array).into())])?;
let columns = Chunk::new(vec![clone(array).into()]);
let schema = batch.schema().clone();

let options = WriteOptions {
Expand All @@ -19,7 +19,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
};

let row_groups = RowGroupIterator::try_new(
vec![Ok(batch)].into_iter(),
vec![Ok(columns)].into_iter(),
&schema,
options,
vec![encoding],
Expand Down
9 changes: 4 additions & 5 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::avro::read;
Expand All @@ -20,12 +19,12 @@ fn main() -> Result<()> {
let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
schema.fields,
);

for batch in reader {
let batch = batch?;
assert!(batch.num_rows() > 0);
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert!(!columns.is_empty());
}
Ok(())
}
5 changes: 2 additions & 3 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ async fn main() -> Result<()> {
let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let schema = Arc::new(schema);
let avro_schemas = Arc::new(avro_schemas);

let blocks = block_stream(&mut reader, marker).await;
Expand All @@ -32,10 +31,10 @@ async fn main() -> Result<()> {
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, schema, &avro_schemas)
deserialize(&decompressed, schema.fields(), &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.num_rows() > 0);
assert!(!batch.is_empty());
}

Ok(())
Expand Down
7 changes: 5 additions & 2 deletions examples/csv_read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;
use arrow2::record_batch::RecordBatch;

fn read_path(path: &str, projection: Option<&[usize]>) -> Result<RecordBatch> {
fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn Array>>> {
// Create a CSV reader. This is typically created on the thread that reads the file and
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;
Expand Down
4 changes: 2 additions & 2 deletions examples/csv_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ async fn main() -> Result<()> {
let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

let batch = deserialize_batch(
let columns = deserialize_batch(
&rows[..rows_read],
schema.fields(),
None,
0,
deserialize_column,
)?;
println!("{:?}", batch.column(0));
println!("{:?}", columns.arrays()[0]);
Ok(())
}
8 changes: 5 additions & 3 deletions examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::{error::Result, io::csv::read, record_batch::RecordBatch};
use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::{error::Result, io::csv::read};

fn parallel_read(path: &str) -> Result<Vec<RecordBatch>> {
fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let batch_size = 100;
let has_header = true;
let projection = None;
Expand Down Expand Up @@ -78,7 +80,7 @@ fn main() -> Result<()> {

let batches = parallel_read(file_path)?;
for batch in batches {
println!("{}", batch.num_rows())
println!("{}", batch.len())
}
Ok(())
}
Loading

0 comments on commit 9b54146

Please sign in to comment.