Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for projection pushdown on IPC files (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 11, 2021
1 parent d988539 commit 79ce377
Show file tree
Hide file tree
Showing 23 changed files with 1,104 additions and 676 deletions.
3 changes: 2 additions & 1 deletion arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub fn flight_data_to_arrow_batch(
.map(|batch| {
read_record_batch(
batch,
schema,
schema.clone(),
None,
is_little_endian,
&dictionaries_by_field,
&mut reader,
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
let metadata = read_file_metadata(&mut file)?;

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(&mut file, metadata);
let reader = FileReader::new(&mut file, metadata, None);

reader.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(&mut f, metadata);
let mut reader = read::FileReader::new(&mut f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();

// compare schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ async fn record_batch_from_message(
let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
schema_ref,
None,
true,
&dictionaries_by_field,
&mut reader,
Expand Down
67 changes: 67 additions & 0 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::{Read, Seek};

use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;
use crate::types::NativeType;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<BinaryArray<O>>
where
Vec<u8>: TryInto<O::Bytes> + TryInto<<u8 as NativeType>::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
reader,
block_offset,
is_little_endian,
compression,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(&[O::default()])))?;

let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize();
let values = read_buffer(
buffers,
last_offset,
reader,
block_offset,
is_little_endian,
compression,
)?;

Ok(BinaryArray::<O>::from_data(offsets, values, validity))
}

pub fn skip_binary(field_nodes: &mut VecDeque<Node>, buffers: &mut VecDeque<&gen::Schema::Buffer>) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
49 changes: 49 additions & 0 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::BooleanArray;
use crate::error::Result;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let length = field_node.length() as usize;
let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
None,
)?;

let values = read_bitmap(
buffers,
length,
reader,
block_offset,
is_little_endian,
None,
)?;
Ok(BooleanArray::from_data(values, validity))
}

pub fn skip_boolean(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
42 changes: 42 additions & 0 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::{Read, Seek};

use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::Result;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::{read_primitive, skip_primitive};

pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
{
let values = field_nodes.front().unwrap().1.as_ref().unwrap();

let keys = read_primitive(
field_nodes,
T::DATA_TYPE,
buffers,
reader,
block_offset,
is_little_endian,
None,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values.clone()))
}

pub fn skip_dictionary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
skip_primitive(field_nodes, buffers)
}
55 changes: 55 additions & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::FixedSizeBinaryArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let length =
field_node.length() as usize * (*FixedSizeBinaryArray::get_size(&data_type) as usize);
let values = read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
)?;

Ok(FixedSizeBinaryArray::from_data(data_type, values, validity))
}

pub fn skip_fixed_size_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
59 changes: 59 additions & 0 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::FixedSizeListArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;

use super::super::super::gen;
use super::super::deserialize::{read, skip, Node};
use super::super::read_basic::*;

pub fn read_fixed_size_list<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type);

let values = read(
field_nodes,
value_data_type.clone(),
buffers,
reader,
block_offset,
is_little_endian,
compression,
)?;
Ok(FixedSizeListArray::from_data(data_type, values, validity))
}

pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();

let (data_type, _) = FixedSizeListArray::get_child_and_size(data_type);

skip(field_nodes, data_type, buffers)
}
Loading

0 comments on commit 79ce377

Please sign in to comment.