Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made all panics in IPC read errors (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jan 1, 2022
1 parent 0311e22 commit b617331
Show file tree
Hide file tree
Showing 22 changed files with 457 additions and 208 deletions.
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ impl ArrowError {
pub fn from_external_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::External("".to_string(), Box::new(error))
}

pub(crate) fn oos<A: Into<String>>(msg: A) -> Self {
Self::OutOfSpec(msg.into())
}

pub(crate) fn nyi<A: Into<String>>(msg: A) -> Self {
Self::NotYetImplemented(msg.into())
}
}

impl From<::std::io::Error> for ArrowError {
Expand Down
2 changes: 1 addition & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa
pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
if let Ok(ipc) = ipc::Message::root_as_message(bytes) {
if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) {
Ok(schemas)
schemas
} else {
Err(ArrowError::OutOfSpec(
"Unable to get head as schema".to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()>
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap();

use crate::error::ArrowError;
let mut encoder = lz4::EncoderBuilder::new()
.build(output_buf)
.map_err(ArrowError::from)?;
encoder.write_all(input_buf)?;
encoder.finish().1.map_err(|e| e.into())
}
Expand Down
33 changes: 26 additions & 7 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_format::ipc;
use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -20,7 +20,12 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -57,10 +62,24 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
))
}

pub fn skip_binary(field_nodes: &mut VecDeque<Node>, buffers: &mut VecDeque<&ipc::Schema::Buffer>) {
let _ = field_nodes.pop_front().unwrap();
pub fn skip_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for binary. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
26 changes: 20 additions & 6 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::BooleanArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -19,7 +19,12 @@ pub fn read_boolean<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let length = field_node.length() as usize;
let validity = read_validity(
Expand All @@ -45,9 +50,18 @@ pub fn read_boolean<R: Read + Seek>(
pub fn skip_boolean(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for boolean. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ where
pub fn skip_dictionary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
) -> Result<()> {
skip_primitive(field_nodes, buffers)
}
26 changes: 20 additions & 6 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::FixedSizeBinaryArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -19,7 +19,12 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand All @@ -46,9 +51,18 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
pub fn skip_fixed_size_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for fixed-size binary. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
21 changes: 16 additions & 5 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::FixedSizeListArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -25,7 +25,12 @@ pub fn read_fixed_size_list<R: Read + Seek>(
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -57,10 +62,16 @@ pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for fixed-size list. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;

let (field, _) = FixedSizeListArray::get_child_and_size(data_type);

Expand Down
23 changes: 17 additions & 6 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_format::ipc;
use crate::array::{ListArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -30,7 +30,12 @@ pub fn read_list<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -73,11 +78,17 @@ pub fn skip_list<O: Offset>(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for list. The file or stream is corrupted.")
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;

let data_type = ListArray::<O>::get_child_type(data_type);

Expand Down
23 changes: 17 additions & 6 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_format::ipc;
use crate::array::MapArray;
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -26,7 +26,12 @@ pub fn read_map<R: Read + Seek>(
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -69,11 +74,17 @@ pub fn skip_map(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for map. The file or stream is corrupted.")
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;

let data_type = MapArray::get_field(data_type).data_type();

Expand Down
28 changes: 21 additions & 7 deletions src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
use std::collections::VecDeque;

use crate::{array::NullArray, datatypes::DataType};
use crate::{
array::NullArray,
datatypes::DataType,
error::{ArrowError, Result},
};

use super::super::deserialize::Node;

pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> NullArray {
NullArray::from_data(
pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Result<NullArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

Ok(NullArray::from_data(
data_type,
field_nodes.pop_front().unwrap().length() as usize,
)
field_node.length() as usize,
))
}

pub fn skip_null(field_nodes: &mut VecDeque<Node>) {
let _ = field_nodes.pop_front();
pub fn skip_null(field_nodes: &mut VecDeque<Node>) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for null. The file or stream is corrupted.")
})?;
Ok(())
}
Loading

0 comments on commit b617331

Please sign in to comment.