From 79ce37784bfbc86a72f559f41e544b987c72e93d Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 11 Aug 2021 08:10:09 +0100 Subject: [PATCH] Added support for projection pushdown on IPC files (#264) --- arrow-flight/src/utils.rs | 3 +- examples/ipc_file_read.rs | 2 +- .../src/bin/arrow-file-to-stream.rs | 2 +- .../src/bin/arrow-json-integration-test.rs | 4 +- .../integration_test.rs | 1 + src/io/ipc/read/array/binary.rs | 67 ++ src/io/ipc/read/array/boolean.rs | 49 ++ src/io/ipc/read/array/dictionary.rs | 42 + src/io/ipc/read/array/fixed_size_binary.rs | 55 ++ src/io/ipc/read/array/fixed_size_list.rs | 59 ++ src/io/ipc/read/array/list.rs | 76 ++ src/io/ipc/read/array/mod.rs | 20 + src/io/ipc/read/array/null.rs | 13 + src/io/ipc/read/array/primitive.rs | 55 ++ src/io/ipc/read/array/struct_.rs | 67 ++ src/io/ipc/read/array/utf8.rs | 67 ++ src/io/ipc/read/common.rs | 145 +++- src/io/ipc/read/deserialize.rs | 717 ++---------------- src/io/ipc/read/mod.rs | 2 + src/io/ipc/read/read_basic.rs | 255 +++++++ src/io/ipc/read/reader.rs | 74 +- src/io/ipc/read/stream.rs | 1 + src/io/ipc/write/writer.rs | 4 +- 23 files changed, 1104 insertions(+), 676 deletions(-) create mode 100644 src/io/ipc/read/array/binary.rs create mode 100644 src/io/ipc/read/array/boolean.rs create mode 100644 src/io/ipc/read/array/dictionary.rs create mode 100644 src/io/ipc/read/array/fixed_size_binary.rs create mode 100644 src/io/ipc/read/array/fixed_size_list.rs create mode 100644 src/io/ipc/read/array/list.rs create mode 100644 src/io/ipc/read/array/mod.rs create mode 100644 src/io/ipc/read/array/null.rs create mode 100644 src/io/ipc/read/array/primitive.rs create mode 100644 src/io/ipc/read/array/struct_.rs create mode 100644 src/io/ipc/read/array/utf8.rs create mode 100644 src/io/ipc/read/read_basic.rs diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 94b66bd56e6..9ea425983eb 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -164,7 +164,8 @@ pub fn flight_data_to_arrow_batch( .map(|batch| { read_record_batch( batch, - schema, + schema.clone(), + None, is_little_endian, &dictionaries_by_field, &mut reader, diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index fd1210223de..6cd3df9a617 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result> { let metadata = read_file_metadata(&mut file)?; // Simplest way: use the reader, an iterator over batches. - let reader = FileReader::new(&mut file, metadata); + let reader = FileReader::new(&mut file, metadata, None); reader.collect() } diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index a0a5a681baa..9b1ae23a0b5 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -27,7 +27,7 @@ fn main() -> Result<()> { let filename = &args[1]; let mut f = File::open(filename)?; let metadata = read::read_file_metadata(&mut f)?; - let mut reader = read::FileReader::new(&mut f, metadata); + let mut reader = read::FileReader::new(&mut f, metadata, None); let schema = reader.schema(); let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?; diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index ba2a158c8b8..2c378cb938a 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(&mut arrow_file, metadata); + let reader = read::FileReader::new(&mut arrow_file, metadata, None); let mut fields: Vec = vec![]; for f in reader.schema().fields() { @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { // open Arrow file let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(&mut arrow_file, metadata); + let reader = read::FileReader::new(&mut arrow_file, metadata, None); let arrow_schema = reader.schema().as_ref().to_owned(); // compare schemas diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 118564cf8a8..dc80de6ba86 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -292,6 +292,7 @@ async fn record_batch_from_message( let arrow_batch_result = ipc::read::read_record_batch( ipc_batch, schema_ref, + None, true, &dictionaries_by_field, &mut reader, diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs new file mode 100644 index 00000000000..0e1ed3cd193 --- /dev/null +++ b/src/io/ipc/read/array/binary.rs @@ -0,0 +1,67 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::io::{Read, Seek}; + +use crate::array::{BinaryArray, Offset}; +use crate::buffer::Buffer; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; +use crate::types::NativeType; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::super::read_basic::*; + +pub fn read_binary( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> +where + Vec: TryInto + TryInto<::Bytes>, +{ + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let offsets: Buffer = read_buffer( + buffers, + 1 + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + ) + // Older versions of the IPC format sometimes do not report an offset + .or_else(|_| Result::Ok(Buffer::::from(&[O::default()])))?; + + let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); + let values = read_buffer( + buffers, + last_offset, + reader, + block_offset, + is_little_endian, + compression, + )?; + + Ok(BinaryArray::::from_data(offsets, values, validity)) +} + +pub fn skip_binary(field_nodes: &mut VecDeque, buffers: &mut VecDeque<&gen::Schema::Buffer>) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); +} diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs new file mode 100644 index 00000000000..866b11e9022 --- /dev/null +++ b/src/io/ipc/read/array/boolean.rs @@ -0,0 +1,49 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; + +use crate::array::BooleanArray; +use crate::error::Result; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::super::read_basic::*; + +pub fn read_boolean( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, +) -> Result { + let field_node = field_nodes.pop_front().unwrap().0; + + let length = field_node.length() as usize; + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + None, + )?; + + let values = read_bitmap( + buffers, + length, + reader, + block_offset, + is_little_endian, + None, + )?; + Ok(BooleanArray::from_data(values, validity)) +} + +pub fn skip_boolean( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); +} diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs new file mode 100644 index 00000000000..3cf5cc435f5 --- /dev/null +++ b/src/io/ipc/read/array/dictionary.rs @@ -0,0 +1,42 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::io::{Read, Seek}; + +use crate::array::{DictionaryArray, DictionaryKey}; +use crate::error::Result; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::{read_primitive, skip_primitive}; + +pub fn read_dictionary( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, +) -> Result> +where + Vec: TryInto, +{ + let values = field_nodes.front().unwrap().1.as_ref().unwrap(); + + let keys = read_primitive( + field_nodes, + T::DATA_TYPE, + buffers, + reader, + block_offset, + is_little_endian, + None, + )?; + + Ok(DictionaryArray::::from_data(keys, values.clone())) +} + +pub fn skip_dictionary( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + skip_primitive(field_nodes, buffers) +} diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs new file mode 100644 index 00000000000..ebd4872bb41 --- /dev/null +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -0,0 +1,55 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; + +use crate::array::FixedSizeBinaryArray; +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::super::read_basic::*; + +pub fn read_fixed_size_binary( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result { + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let length = + field_node.length() as usize * (*FixedSizeBinaryArray::get_size(&data_type) as usize); + let values = read_buffer( + buffers, + length, + reader, + block_offset, + is_little_endian, + compression, + )?; + + Ok(FixedSizeBinaryArray::from_data(data_type, values, validity)) +} + +pub fn skip_fixed_size_binary( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); +} diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs new file mode 100644 index 00000000000..a7416cd9ca3 --- /dev/null +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -0,0 +1,59 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; + +use crate::array::FixedSizeListArray; +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; + +use super::super::super::gen; +use super::super::deserialize::{read, skip, Node}; +use super::super::read_basic::*; + +pub fn read_fixed_size_list( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result { + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type); + + let values = read( + field_nodes, + value_data_type.clone(), + buffers, + reader, + block_offset, + is_little_endian, + compression, + )?; + Ok(FixedSizeListArray::from_data(data_type, values, validity)) +} + +pub fn skip_fixed_size_list( + field_nodes: &mut VecDeque, + data_type: &DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + + let (data_type, _) = FixedSizeListArray::get_child_and_size(data_type); + + skip(field_nodes, data_type, buffers) +} diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs new file mode 100644 index 00000000000..a876576fa27 --- /dev/null +++ b/src/io/ipc/read/array/list.rs @@ -0,0 +1,76 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::io::{Read, Seek}; + +use crate::array::{ListArray, Offset}; +use crate::buffer::Buffer; +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; + +use super::super::super::gen; +use super::super::deserialize::{read, skip, Node}; +use super::super::read_basic::*; + +pub fn read_list( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> +where + Vec: TryInto, +{ + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let offsets = read_buffer::( + buffers, + 1 + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + ) + // Older versions of the IPC format sometimes do not report an offset + .or_else(|_| Result::Ok(Buffer::::from(&[O::default()])))?; + + let value_data_type = ListArray::::get_child_type(&data_type).clone(); + + let values = read( + field_nodes, + value_data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + )?; + Ok(ListArray::from_data(data_type, offsets, values, validity)) +} + +pub fn skip_list( + field_nodes: &mut VecDeque, + data_type: &DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); + + let data_type = ListArray::::get_child_type(data_type); + + skip(field_nodes, data_type, buffers) +} diff --git a/src/io/ipc/read/array/mod.rs b/src/io/ipc/read/array/mod.rs new file mode 100644 index 00000000000..458c62123fb --- /dev/null +++ b/src/io/ipc/read/array/mod.rs @@ -0,0 +1,20 @@ +mod primitive; +pub use primitive::*; +mod boolean; +pub use boolean::*; +mod utf8; +pub use utf8::*; +mod binary; +pub use binary::*; +mod fixed_size_binary; +pub use fixed_size_binary::*; +mod list; +pub use list::*; +mod fixed_size_list; +pub use fixed_size_list::*; +mod struct_; +pub use struct_::*; +mod null; +pub use null::*; +mod dictionary; +pub use dictionary::*; diff --git a/src/io/ipc/read/array/null.rs b/src/io/ipc/read/array/null.rs new file mode 100644 index 00000000000..7d937cc6494 --- /dev/null +++ b/src/io/ipc/read/array/null.rs @@ -0,0 +1,13 @@ +use std::collections::VecDeque; + +use crate::array::NullArray; + +use super::super::deserialize::Node; + +pub fn read_null(field_nodes: &mut VecDeque) -> NullArray { + NullArray::from_data(field_nodes.pop_front().unwrap().0.length() as usize) +} + +pub fn skip_null(field_nodes: &mut VecDeque) { + let _ = field_nodes.pop_front(); +} diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs new file mode 100644 index 00000000000..8839b06e7a9 --- /dev/null +++ b/src/io/ipc/read/array/primitive.rs @@ -0,0 +1,55 @@ +use std::io::{Read, Seek}; +use std::{collections::VecDeque, convert::TryInto}; + +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; +use crate::{array::PrimitiveArray, types::NativeType}; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::super::read_basic::*; + +pub fn read_primitive( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> +where + Vec: TryInto, +{ + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let values = read_buffer( + buffers, + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + )?; + Ok(PrimitiveArray::::from_data(data_type, values, validity)) +} + +pub fn skip_primitive( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); +} diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs new file mode 100644 index 00000000000..95274459731 --- /dev/null +++ b/src/io/ipc/read/array/struct_.rs @@ -0,0 +1,67 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; + +use crate::array::StructArray; +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; + +use super::super::super::gen; +use super::super::deserialize::{read, skip, Node}; +use super::super::read_basic::*; + +pub fn read_struct( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result { + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let fields = StructArray::get_fields(&data_type); + + let values = fields + .iter() + .map(|field| { + read( + field_nodes, + field.data_type().clone(), + buffers, + reader, + block_offset, + is_little_endian, + compression, + ) + }) + .collect::>>()?; + + Ok(StructArray::from_data(fields.to_vec(), values, validity)) +} + +pub fn skip_struct( + field_nodes: &mut VecDeque, + data_type: &DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + + let fields = StructArray::get_fields(data_type); + + fields + .iter() + .for_each(|field| skip(field_nodes, field.data_type(), buffers)) +} diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs new file mode 100644 index 00000000000..17ffed8da86 --- /dev/null +++ b/src/io/ipc/read/array/utf8.rs @@ -0,0 +1,67 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::io::{Read, Seek}; + +use crate::array::{Offset, Utf8Array}; +use crate::buffer::Buffer; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; +use crate::types::NativeType; + +use super::super::super::gen; +use super::super::deserialize::Node; +use super::super::read_basic::*; + +pub fn read_utf8( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> +where + Vec: TryInto + TryInto<::Bytes>, +{ + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let offsets: Buffer = read_buffer( + buffers, + 1 + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + ) + // Older versions of the IPC format sometimes do not report an offset + .or_else(|_| Result::Ok(Buffer::::from(&[O::default()])))?; + + let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); + let values = read_buffer( + buffers, + last_offset, + reader, + block_offset, + is_little_endian, + compression, + )?; + + Ok(Utf8Array::::from_data(offsets, values, validity)) +} + +pub fn skip_utf8(field_nodes: &mut VecDeque, buffers: &mut VecDeque<&gen::Schema::Buffer>) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); +} diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index e7d4a3c0c30..0cc9fc468ea 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -25,14 +25,75 @@ use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; use super::super::gen; -use super::deserialize::read; +use super::deserialize::{read, skip}; type ArrayRef = Arc; +#[derive(Debug, Eq, PartialEq, Hash)] +enum ProjectionResult { + Selected(A), + NotSelected(A), +} + +/// An iterator adapter that will return `Some(x)` or `None` +/// # Panics +/// The iterator panics iff the `projection` is not strictly increasing. +struct ProjectionIter<'a, A, I: Iterator> { + projection: &'a [usize], + iter: I, + current_count: usize, + current_projection: usize, +} + +impl<'a, A, I: Iterator> ProjectionIter<'a, A, I> { + /// # Panics + /// iff `projection` is empty + pub fn new(projection: &'a [usize], iter: I) -> Self { + Self { + projection: &projection[1..], + iter, + current_count: 0, + current_projection: projection[0], + } + } +} + +impl<'a, A, I: Iterator> Iterator for ProjectionIter<'a, A, I> { + type Item = ProjectionResult; + + fn next(&mut self) -> Option { + if let Some(item) = self.iter.next() { + let result = if self.current_count == self.current_projection { + if !self.projection.is_empty() { + assert!(self.projection[0] > self.current_projection); + self.current_projection = self.projection[0]; + self.projection = &self.projection[1..]; + } else { + self.current_projection = 0 // a value that most likely already passed + }; + Some(ProjectionResult::Selected(item)) + } else { + Some(ProjectionResult::NotSelected(item)) + }; + self.current_count += 1; + result + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} + /// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` +/// # Panic +/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) pub fn read_record_batch( batch: gen::Message::RecordBatch, schema: Arc, + projection: Option<(&[usize], Arc)>, is_little_endian: bool, dictionaries: &[Option], reader: &mut R, @@ -54,23 +115,49 @@ pub fn read_record_batch( .zip(dictionaries) .collect::>(); - let arrays = schema - .fields() - .iter() - .map(|field| { - read( - &mut field_nodes, - field.data_type().clone(), - &mut buffers, - reader, - block_offset, - is_little_endian, - batch.compression(), - ) - }) - .collect::>>()?; + let (schema, columns) = if let Some(projection) = projection { + let projected_schema = projection.1.clone(); + + let projection = ProjectionIter::new(projection.0, schema.fields().iter()); - RecordBatch::try_new(schema.clone(), arrays) + let arrays = projection + .map(|maybe_field| match maybe_field { + ProjectionResult::Selected(field) => Some(read( + &mut field_nodes, + field.data_type().clone(), + &mut buffers, + reader, + block_offset, + is_little_endian, + batch.compression(), + )), + ProjectionResult::NotSelected(field) => { + skip(&mut field_nodes, field.data_type(), &mut buffers); + None + } + }) + .flatten() + .collect::>>()?; + (projected_schema, arrays) + } else { + let arrays = schema + .fields() + .iter() + .map(|field| { + read( + &mut field_nodes, + field.data_type().clone(), + &mut buffers, + reader, + block_offset, + is_little_endian, + batch.compression(), + ) + }) + .collect::>>()?; + (schema.clone(), arrays) + }; + RecordBatch::try_new(schema, columns) } /// Read the dictionary from the buffer and provided metadata, @@ -109,6 +196,7 @@ pub fn read_dictionary( let record_batch = read_record_batch( batch.data().unwrap(), schema, + None, is_little_endian, dictionaries_by_field, reader, @@ -135,3 +223,26 @@ pub fn read_dictionary( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn project_iter() { + let iter = 1..6; + let iter = ProjectionIter::new(&[0, 2, 4], iter); + let result: Vec<_> = iter.collect(); + use ProjectionResult::*; + assert_eq!( + result, + vec![ + Selected(1), + NotSelected(2), + Selected(3), + NotSelected(4), + Selected(5) + ] + ) + } +} diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 8393a668c95..f244024bb0a 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -1,650 +1,23 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - //! Arrow IPC File and Stream Readers //! //! The `FileReader` and `StreamReader` have similar interfaces, //! however the `FileReader` expects a reader that supports `Seek`ing -use std::{collections::VecDeque, convert::TryInto}; +use std::collections::VecDeque; use std::{ - io::{Read, Seek, SeekFrom}, + io::{Read, Seek}, sync::Arc, }; -use crate::buffer::Buffer; use crate::datatypes::{DataType, IntervalUnit}; -use crate::endianess::is_native_little_endian; -use crate::error::{ArrowError, Result}; -use crate::io::ipc::gen::Message::{BodyCompression, CompressionType}; -use crate::{ - array::*, - bitmap::Bitmap, - buffer::MutableBuffer, - types::{days_ms, NativeType}, -}; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; +use crate::{array::*, types::days_ms}; -use super::super::compression; use super::super::gen; +use super::array::*; -type Node<'a> = (&'a gen::Message::FieldNode, &'a Option>); - -fn read_big_endian( - reader: &mut R, - bytes: usize, - buffer: &mut MutableBuffer, - is_little_endian: bool, -) -> Result<()> { - // slow case where we must reverse bits - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; - - if !is_little_endian { - let chunks = slice.chunks_exact(std::mem::size_of::()); - buffer - .as_mut_slice() - .iter_mut() - .zip(chunks) - .try_for_each(|(slot, chunk)| { - let a: T::Bytes = match chunk.try_into() { - Ok(a) => a, - Err(_) => unreachable!(), - }; - *slot = T::from_be_bytes(a); - Result::Ok(()) - }) - .unwrap(); - } - Ok(()) -} - -fn read_uncompressed_buffer( - reader: &mut R, - buffer_length: usize, - bytes: usize, - length: usize, - is_little_endian: bool, -) -> Result> { - if bytes > buffer_length { - return Err(ArrowError::Ipc( - format!("The slots of the array times the physical size must \ - be smaller or equal to the length of the IPC buffer. \ - However, this array reports {} slots, which, for physical type \"{}\", corresponds to {} bytes, \ - which is larger than the buffer length {}", - length, - std::any::type_name::(), - bytes, - buffer_length, - ), - )); - } - - // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read - // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 - let mut buffer = MutableBuffer::::from_len_zeroed(length); - - if is_native_little_endian() == is_little_endian { - // fast case where we can just copy the contents as is - unsafe { - // transmute T to bytes. - let slice = std::slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut u8, bytes); - reader.read_exact(slice)?; - } - } else { - read_big_endian(reader, bytes, &mut buffer, is_little_endian)?; - } - Ok(buffer) -} - -fn read_compressed_buffer( - reader: &mut R, - buffer_length: usize, - length: usize, - is_little_endian: bool, - compression: BodyCompression, -) -> Result> { - if is_little_endian != is_native_little_endian() { - return Err(ArrowError::NotYetImplemented( - "Reading compressed and big endian IPC".to_string(), - )); - } - - // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read - // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 - let mut buffer = MutableBuffer::::from_len_zeroed(length); - - // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; buffer_length]; - reader.read_exact(&mut slice)?; - - match compression.codec() { - CompressionType::LZ4_FRAME => { - // fast case where we can just copy the contents as is - unsafe { - // transmute T to bytes. - let out_slice = std::slice::from_raw_parts_mut( - buffer.as_mut_ptr() as *mut u8, - length * std::mem::size_of::(), - ); - compression::decompress_lz4(&slice[8..], out_slice)? - } - Ok(buffer) - } - CompressionType::ZSTD => { - // fast case where we can just copy the contents as is - unsafe { - // transmute T to bytes. - let out_slice = std::slice::from_raw_parts_mut( - buffer.as_mut_ptr() as *mut u8, - length * std::mem::size_of::(), - ); - compression::decompress_zstd(&slice[8..], out_slice)? - } - Ok(buffer) - } - _ => Err(ArrowError::NotYetImplemented( - "Non LZ4 compressed IPC".to_string(), - )), - } -} - -fn read_buffer( - buf: &mut VecDeque<&gen::Schema::Buffer>, - length: usize, // in slots - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> { - let buf = buf.pop_front().unwrap(); - - reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; - - let buffer_length = buf.length() as usize; - - let bytes = length * std::mem::size_of::(); - - if let Some(compression) = compression { - Ok( - read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)? - .into(), - ) - } else { - Ok( - read_uncompressed_buffer(reader, buffer_length, bytes, length, is_little_endian)? - .into(), - ) - } -} - -fn read_uncompressed_bitmap( - length: usize, - bytes: usize, - reader: &mut R, -) -> Result> { - // something is wrong if we can't `length` - assert!(length <= bytes * 8); - // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read - // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 - let mut buffer = MutableBuffer::::from_len_zeroed(bytes); - reader.read_exact(buffer.as_mut_slice())?; - - Ok(buffer) -} - -fn read_compressed_bitmap( - length: usize, - bytes: usize, - compression: BodyCompression, - reader: &mut R, -) -> Result> { - let mut buffer = MutableBuffer::::from_len_zeroed((length + 7) / 8); - match compression.codec() { - CompressionType::LZ4_FRAME => { - // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; - - compression::decompress_lz4(&slice[8..], &mut buffer)?; - - Ok(buffer) - } - CompressionType::ZSTD => { - // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; - - compression::decompress_zstd(&slice[8..], &mut buffer)?; - - Ok(buffer) - } - _ => Err(ArrowError::NotYetImplemented( - "Non LZ4 compressed IPC".to_string(), - )), - } -} - -fn read_bitmap( - buf: &mut VecDeque<&gen::Schema::Buffer>, - length: usize, - reader: &mut R, - block_offset: u64, - _: bool, - compression: Option, -) -> Result { - let buf = buf.pop_front().unwrap(); - - reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; - - let bytes = buf.length() as usize; - - let buffer = if let Some(compression) = compression { - read_compressed_bitmap(length, bytes, compression, reader) - } else { - read_uncompressed_bitmap(length, bytes, reader) - }?; - - Ok(Bitmap::from_bytes(buffer.into(), length)) -} - -fn read_validity( - buffers: &mut VecDeque<&gen::Schema::Buffer>, - field_node: &gen::Message::FieldNode, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> { - Ok(if field_node.null_count() > 0 { - Some(read_bitmap( - buffers, - field_node.length() as usize, - reader, - block_offset, - is_little_endian, - compression, - )?) - } else { - let _ = buffers.pop_front().unwrap(); - None - }) -} - -fn read_primitive( - field_nodes: &mut VecDeque, - data_type: DataType, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> -where - Vec: TryInto, -{ - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let values = read_buffer( - buffers, - field_node.length() as usize, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let array = PrimitiveArray::::from_data(data_type, values, validity); - - Ok(array) -} - -fn read_boolean( - field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, -) -> Result> { - let field_node = field_nodes.pop_front().unwrap().0; - - let length = field_node.length() as usize; - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - None, - )?; - - let values = read_bitmap( - buffers, - length, - reader, - block_offset, - is_little_endian, - None, - )?; - - let array = BooleanArray::from_data(values, validity); - Ok(Arc::new(array)) -} - -fn read_utf8( - field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> -where - Vec: TryInto + TryInto<::Bytes>, -{ - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let offsets: Buffer = read_buffer( - buffers, - 1 + field_node.length() as usize, - reader, - block_offset, - is_little_endian, - compression, - ) - // Older versions of the IPC format sometimes do not report an offset - .or_else(|_| Result::Ok(MutableBuffer::::from(&[O::default()]).into()))?; - - let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); - let values = read_buffer( - buffers, - last_offset, - reader, - block_offset, - is_little_endian, - compression, - )?; - - Ok(Utf8Array::::from_data(offsets, values, validity)) -} - -fn read_binary( - field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> -where - Vec: TryInto + TryInto<::Bytes>, -{ - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let offsets: Buffer = read_buffer( - buffers, - 1 + field_node.length() as usize, - reader, - block_offset, - is_little_endian, - compression, - ) - // Older versions of the IPC format sometimes do not report an offset - .or_else(|_| Result::Ok(MutableBuffer::::from(&[O::default()]).into()))?; - - let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); - let values = read_buffer( - buffers, - last_offset, - reader, - block_offset, - is_little_endian, - compression, - )?; - - Ok(BinaryArray::::from_data(offsets, values, validity)) -} - -fn read_fixed_size_binary( - field_nodes: &mut VecDeque, - data_type: DataType, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let length = - field_node.length() as usize * (*FixedSizeBinaryArray::get_size(&data_type) as usize); - let values = read_buffer( - buffers, - length, - reader, - block_offset, - is_little_endian, - compression, - )?; - - Ok(FixedSizeBinaryArray::from_data(data_type, values, validity)) -} - -fn read_null(field_nodes: &mut VecDeque) -> NullArray { - NullArray::from_data(field_nodes.pop_front().unwrap().0.length() as usize) -} - -fn read_list( - field_nodes: &mut VecDeque, - data_type: DataType, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> -where - Vec: TryInto, -{ - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let offsets = read_buffer::( - buffers, - 1 + field_node.length() as usize, - reader, - block_offset, - is_little_endian, - compression, - ) - // Older versions of the IPC format sometimes do not report an offset - .or_else(|_| Result::Ok(MutableBuffer::::from(&[O::default()]).into()))?; - - let value_data_type = ListArray::::get_child_type(&data_type).clone(); - - let values = read( - field_nodes, - value_data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - )?; - Ok(Arc::new(ListArray::from_data( - data_type, offsets, values, validity, - ))) -} - -fn read_fixed_size_list( - field_nodes: &mut VecDeque, - data_type: DataType, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> { - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type); - - let values = read( - field_nodes, - value_data_type.clone(), - buffers, - reader, - block_offset, - is_little_endian, - compression, - )?; - Ok(Arc::new(FixedSizeListArray::from_data( - data_type, values, validity, - ))) -} - -fn read_struct( - field_nodes: &mut VecDeque, - data_type: DataType, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, - compression: Option, -) -> Result> { - let field_node = field_nodes.pop_front().unwrap().0; - - let validity = read_validity( - buffers, - field_node, - reader, - block_offset, - is_little_endian, - compression, - )?; - - let fields = StructArray::get_fields(&data_type); - - let values = fields - .iter() - .map(|field| { - read( - field_nodes, - field.data_type().clone(), - buffers, - reader, - block_offset, - is_little_endian, - compression, - ) - }) - .collect::>>()?; - - Ok(Arc::new(StructArray::from_data( - fields.to_vec(), - values, - validity, - ))) -} - -/// Reads the correct number of buffers based on list type and null_count, and creates a -/// list array ref -pub fn read_dictionary( - field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&gen::Schema::Buffer>, - reader: &mut R, - block_offset: u64, - is_little_endian: bool, -) -> Result> -where - Vec: TryInto, -{ - let values = field_nodes.front().unwrap().1.as_ref().unwrap(); - - let keys = read_primitive( - field_nodes, - T::DATA_TYPE, - buffers, - reader, - block_offset, - is_little_endian, - None, - )?; - - Ok(Arc::new(DictionaryArray::::from_data( - keys, - values.clone(), - ))) -} +pub type Node<'a> = (&'a gen::Message::FieldNode, &'a Option>); pub fn read( field_nodes: &mut VecDeque, @@ -662,6 +35,7 @@ pub fn read( } DataType::Boolean => { read_boolean(field_nodes, buffers, reader, block_offset, is_little_endian) + .map(|x| Arc::new(x) as Arc) } DataType::Int8 => read_primitive::( field_nodes, @@ -855,7 +229,8 @@ pub fn read( block_offset, is_little_endian, compression, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::LargeList(_) => read_list::( field_nodes, data_type, @@ -864,7 +239,8 @@ pub fn read( block_offset, is_little_endian, compression, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::FixedSizeList(_, _) => read_fixed_size_list( field_nodes, data_type, @@ -873,7 +249,8 @@ pub fn read( block_offset, is_little_endian, compression, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::Struct(_) => read_struct( field_nodes, data_type, @@ -882,7 +259,8 @@ pub fn read( block_offset, is_little_endian, compression, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::Dictionary(ref key_type, _) => match key_type.as_ref() { DataType::Int8 => read_dictionary::( field_nodes, @@ -890,58 +268,105 @@ pub fn read( reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::Int16 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::Int32 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::Int64 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::UInt8 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::UInt16 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::UInt32 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), DataType::UInt64 => read_dictionary::( field_nodes, buffers, reader, block_offset, is_little_endian, - ), + ) + .map(|x| Arc::new(x) as Arc), _ => unreachable!(), }, DataType::Union(_) => unimplemented!(), } } + +pub fn skip( + field_nodes: &mut VecDeque, + data_type: &DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + match data_type { + DataType::Null => skip_null(field_nodes), + DataType::Boolean => skip_boolean(field_nodes, buffers), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Date32 + | DataType::Time32(_) + | DataType::Interval(_) + | DataType::Int64 + | DataType::Date64 + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) + | DataType::Decimal(_, _) + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float16 + | DataType::Float64 => skip_primitive(field_nodes, buffers), + DataType::LargeBinary | DataType::Binary => skip_binary(field_nodes, buffers), + DataType::LargeUtf8 | DataType::Utf8 => skip_utf8(field_nodes, buffers), + DataType::FixedSizeBinary(_) => skip_fixed_size_binary(field_nodes, buffers), + DataType::List(_) => skip_list::(field_nodes, data_type, buffers), + DataType::LargeList(_) => skip_list::(field_nodes, data_type, buffers), + DataType::FixedSizeList(_, _) => skip_fixed_size_list(field_nodes, data_type, buffers), + DataType::Struct(_) => skip_struct(field_nodes, data_type, buffers), + DataType::Dictionary(_, _) => skip_dictionary(field_nodes, buffers), + DataType::Union(_) => unimplemented!(), + } +} diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 79b3722470b..8ed127e2698 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +mod array; mod common; mod deserialize; +mod read_basic; mod reader; mod stream; diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs new file mode 100644 index 00000000000..421bf0d3656 --- /dev/null +++ b/src/io/ipc/read/read_basic.rs @@ -0,0 +1,255 @@ +use std::io::{Read, Seek, SeekFrom}; +use std::{collections::VecDeque, convert::TryInto}; + +use crate::buffer::Buffer; +use crate::endianess::is_native_little_endian; +use crate::error::{ArrowError, Result}; +use crate::io::ipc::gen::Message::{BodyCompression, CompressionType}; +use crate::{bitmap::Bitmap, buffer::MutableBuffer, types::NativeType}; + +use super::super::compression; +use super::super::gen; + +fn read_big_endian( + reader: &mut R, + bytes: usize, + buffer: &mut MutableBuffer, + is_little_endian: bool, +) -> Result<()> { + // slow case where we must reverse bits + let mut slice = vec![0u8; bytes]; + reader.read_exact(&mut slice)?; + + if !is_little_endian { + let chunks = slice.chunks_exact(std::mem::size_of::()); + buffer + .as_mut_slice() + .iter_mut() + .zip(chunks) + .try_for_each(|(slot, chunk)| { + let a: T::Bytes = match chunk.try_into() { + Ok(a) => a, + Err(_) => unreachable!(), + }; + *slot = T::from_be_bytes(a); + Result::Ok(()) + }) + .unwrap(); + } + Ok(()) +} + +fn read_uncompressed_buffer( + reader: &mut R, + buffer_length: usize, + bytes: usize, + length: usize, + is_little_endian: bool, +) -> Result> { + if bytes > buffer_length { + return Err(ArrowError::Ipc( + format!("The slots of the array times the physical size must \ + be smaller or equal to the length of the IPC buffer. \ + However, this array reports {} slots, which, for physical type \"{}\", corresponds to {} bytes, \ + which is larger than the buffer length {}", + length, + std::any::type_name::(), + bytes, + buffer_length, + ), + )); + } + + // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read + // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 + let mut buffer = MutableBuffer::::from_len_zeroed(length); + + if is_native_little_endian() == is_little_endian { + // fast case where we can just copy the contents as is + unsafe { + // transmute T to bytes. + let slice = std::slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut u8, bytes); + reader.read_exact(slice)?; + } + } else { + read_big_endian(reader, bytes, &mut buffer, is_little_endian)?; + } + Ok(buffer) +} + +fn read_compressed_buffer( + reader: &mut R, + buffer_length: usize, + length: usize, + is_little_endian: bool, + compression: BodyCompression, +) -> Result> { + if is_little_endian != is_native_little_endian() { + return Err(ArrowError::NotYetImplemented( + "Reading compressed and big endian IPC".to_string(), + )); + } + + // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read + // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 + let mut buffer = MutableBuffer::::from_len_zeroed(length); + + // decompress first + // todo: move this allocation to an external buffer for re-use + let mut slice = vec![0u8; buffer_length]; + reader.read_exact(&mut slice)?; + + match compression.codec() { + CompressionType::LZ4_FRAME => { + // fast case where we can just copy the contents as is + unsafe { + // transmute T to bytes. + let out_slice = std::slice::from_raw_parts_mut( + buffer.as_mut_ptr() as *mut u8, + length * std::mem::size_of::(), + ); + compression::decompress_lz4(&slice[8..], out_slice)? + } + Ok(buffer) + } + CompressionType::ZSTD => { + // fast case where we can just copy the contents as is + unsafe { + // transmute T to bytes. + let out_slice = std::slice::from_raw_parts_mut( + buffer.as_mut_ptr() as *mut u8, + length * std::mem::size_of::(), + ); + compression::decompress_zstd(&slice[8..], out_slice)? + } + Ok(buffer) + } + _ => Err(ArrowError::NotYetImplemented( + "Non LZ4 compressed IPC".to_string(), + )), + } +} + +pub fn read_buffer( + buf: &mut VecDeque<&gen::Schema::Buffer>, + length: usize, // in slots + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> { + let buf = buf.pop_front().unwrap(); + + reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; + + let buffer_length = buf.length() as usize; + + let bytes = length * std::mem::size_of::(); + + if let Some(compression) = compression { + Ok( + read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)? + .into(), + ) + } else { + Ok( + read_uncompressed_buffer(reader, buffer_length, bytes, length, is_little_endian)? + .into(), + ) + } +} + +fn read_uncompressed_bitmap( + length: usize, + bytes: usize, + reader: &mut R, +) -> Result> { + // something is wrong if we can't `length` + assert!(length <= bytes * 8); + // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read + // see also /~https://github.com/MaikKlein/ash/issues/354#issue-781730580 + let mut buffer = MutableBuffer::::from_len_zeroed(bytes); + reader.read_exact(buffer.as_mut_slice())?; + + Ok(buffer) +} + +fn read_compressed_bitmap( + length: usize, + bytes: usize, + compression: BodyCompression, + reader: &mut R, +) -> Result> { + let mut buffer = MutableBuffer::::from_len_zeroed((length + 7) / 8); + match compression.codec() { + CompressionType::LZ4_FRAME => { + // decompress first + // todo: move this allocation to an external buffer for re-use + let mut slice = vec![0u8; bytes]; + reader.read_exact(&mut slice)?; + + compression::decompress_lz4(&slice[8..], &mut buffer)?; + + Ok(buffer) + } + CompressionType::ZSTD => { + // decompress first + // todo: move this allocation to an external buffer for re-use + let mut slice = vec![0u8; bytes]; + reader.read_exact(&mut slice)?; + + compression::decompress_zstd(&slice[8..], &mut buffer)?; + + Ok(buffer) + } + _ => Err(ArrowError::NotYetImplemented( + "Non LZ4 compressed IPC".to_string(), + )), + } +} + +pub fn read_bitmap( + buf: &mut VecDeque<&gen::Schema::Buffer>, + length: usize, + reader: &mut R, + block_offset: u64, + _: bool, + compression: Option, +) -> Result { + let buf = buf.pop_front().unwrap(); + + reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; + + let bytes = buf.length() as usize; + + let buffer = if let Some(compression) = compression { + read_compressed_bitmap(length, bytes, compression, reader) + } else { + read_uncompressed_bitmap(length, bytes, reader) + }?; + + Ok(Bitmap::from_bytes(buffer.into(), length)) +} + +pub fn read_validity( + buffers: &mut VecDeque<&gen::Schema::Buffer>, + field_node: &gen::Message::FieldNode, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, +) -> Result> { + Ok(if field_node.null_count() > 0 { + Some(read_bitmap( + buffers, + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + )?) + } else { + let _ = buffers.pop_front().unwrap(); + None + }) +} diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 648c332a39c..e70498b26f3 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -66,6 +66,7 @@ pub struct FileReader<'a, R: Read + Seek> { reader: &'a mut R, metadata: FileMetadata, current_block: usize, + projection: Option<(Vec, Arc)>, } /// Read the IPC file's metadata @@ -173,6 +174,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result( reader: &mut R, metadata: &FileMetadata, + projection: Option<(&[usize], Arc)>, block: usize, ) -> Result> { let block = metadata.blocks[block]; @@ -212,6 +214,7 @@ pub fn read_batch( read_record_batch( batch, metadata.schema.clone(), + projection, metadata.is_little_endian, &metadata.dictionaries_by_field, reader, @@ -228,18 +231,43 @@ pub fn read_batch( } impl<'a, R: Read + Seek> FileReader<'a, R> { - /// Creates a new reader - pub fn new(reader: &'a mut R, metadata: FileMetadata) -> Self { + /// Creates a new [`FileReader`]. Use `projection` to only take certain columns. + /// # Panic + /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) + pub fn new(reader: &'a mut R, metadata: FileMetadata, projection: Option>) -> Self { + if let Some(projection) = projection.as_ref() { + let _ = projection.iter().fold(0, |mut acc, v| { + assert!( + *v > acc, + "The projection on IPC must be ordered and non-overlapping" + ); + acc = *v; + acc + }); + } + let projection = projection.map(|projection| { + let fields = metadata.schema().fields(); + let fields = projection.iter().map(|x| fields[*x].clone()).collect(); + let schema = Arc::new(Schema { + fields, + metadata: metadata.schema().metadata().clone(), + }); + (projection, schema) + }); Self { reader, metadata, + projection, current_block: 0, } } /// Return the schema of the file pub fn schema(&self) -> &Arc { - &self.metadata.schema + self.projection + .as_ref() + .map(|x| &x.1) + .unwrap_or(&self.metadata.schema) } } @@ -251,7 +279,15 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> { if self.current_block < self.metadata.total_blocks { let block = self.current_block; self.current_block += 1; - read_batch(&mut self.reader, &self.metadata, block).transpose() + read_batch( + &mut self.reader, + &self.metadata, + self.projection + .as_ref() + .map(|x| (x.0.as_ref(), x.1.clone())), + block, + ) + .transpose() } else { None } @@ -260,7 +296,7 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> { impl<'a, R: Read + Seek> RecordBatchReader for FileReader<'a, R> { fn schema(&self) -> &Schema { - &self.metadata.schema + self.schema().as_ref() } } @@ -281,7 +317,7 @@ mod tests { ))?; let metadata = read_file_metadata(&mut file)?; - let reader = FileReader::new(&mut file, metadata); + let reader = FileReader::new(&mut file, metadata, None); // read expected JSON output let (schema, batches) = read_gzip_json(version, file_name); @@ -388,4 +424,30 @@ mod tests { fn read_generated_200_compression_zstd() -> Result<()> { test_file("2.0.0-compression", "generated_zstd") } + + fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> { + let testdata = crate::util::test_util::arrow_test_data(); + let mut file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, file_name + ))?; + + let metadata = read_file_metadata(&mut file)?; + let mut reader = FileReader::new(&mut file, metadata, Some(vec![column])); + + assert_eq!(reader.schema().fields().len(), 1); + + reader.try_for_each(|rhs| { + assert_eq!(rhs?.num_columns(), 1); + Result::Ok(()) + })?; + Ok(()) + } + + #[test] + fn read_projected() -> Result<()> { + test_projection("1.0.0-littleendian", "generated_primitive", 1)?; + test_projection("1.0.0-littleendian", "generated_dictionary", 2)?; + test_projection("1.0.0-littleendian", "generated_nested", 1) + } } diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index fe40bf22335..01d10f54dfe 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -131,6 +131,7 @@ pub fn read_next( read_record_batch( batch, metadata.schema.clone(), + None, metadata.is_little_endian, dictionaries_by_field, &mut reader, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 002fe03c3a3..1f108dd94d8 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -190,7 +190,7 @@ mod tests { let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); - let reader = FileReader::new(&mut reader, metadata); + let reader = FileReader::new(&mut reader, metadata, None); // read expected JSON output let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch]); @@ -221,7 +221,7 @@ mod tests { let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); - let reader = FileReader::new(&mut reader, metadata); + let reader = FileReader::new(&mut reader, metadata, None); // read expected JSON output let (expected_schema, expected_batches) = read_gzip_json(version, file_name);