Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added .arced/.boxe (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 2, 2022
1 parent 2fcc522 commit 06f8f36
Show file tree
Hide file tree
Showing 46 changed files with 265 additions and 215 deletions.
2 changes: 1 addition & 1 deletion examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);
let columns = Chunk::new(vec![array.arced()]);

parallel_write("example.csv", [columns.clone(), columns])
}
2 changes: 1 addition & 1 deletion examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Resu

fn main() -> Result<()> {
// let's assume that we have an array:
let array = Arc::new(PrimitiveArray::<i32>::from([Some(1), None, Some(123)])) as Arc<dyn Array>;
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).arced();

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Chunk::try_new(vec![Arc::new(a) as Arc<dyn Array>, Arc::new(b)])?;
let batch = Chunk::try_new(vec![a.arced(), b.arced()])?;

// write it
write_batches(file_path, schema, &[batch])?;
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn main() -> Result<()> {
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);
let columns = Chunk::new(vec![array.arced()]);

write_batch("test.parquet", schema, columns)
}
5 changes: 1 addition & 4 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ fn create_batch(size: usize) -> Result<Chunk> {
})
.collect();

Chunk::try_new(vec![
Arc::new(c1) as Arc<dyn Array>,
Arc::new(c2) as Arc<dyn Array>,
])
Chunk::try_new(vec![c1.arced(), c2.arced()])
}

fn main() -> Result<()> {
Expand Down
10 changes: 10 additions & 0 deletions src/array/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ impl<O: Offset> BinaryArray<O> {
DataType::Binary
}
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// unsafe constructors
Expand Down
10 changes: 10 additions & 0 deletions src/array/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ impl BooleanArray {
let bitmap = Bitmap::new_zeroed(length);
Self::new(data_type, bitmap.clone(), Some(bitmap))
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// must use
Expand Down
10 changes: 10 additions & 0 deletions src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ impl<K: DictionaryKey> DictionaryArray<K> {
new_scalar(self.values.as_ref(), index)
}
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

impl<K: DictionaryKey> DictionaryArray<K> {
Expand Down
10 changes: 10 additions & 0 deletions src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ impl FixedSizeBinaryArray {
Some(Bitmap::new_zeroed(length)),
)
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// must use
Expand Down
10 changes: 10 additions & 0 deletions src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ impl FixedSizeListArray {
.into();
Self::new(data_type, values, Some(Bitmap::new_zeroed(length)))
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// must use
Expand Down
10 changes: 10 additions & 0 deletions src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ impl<O: Offset> ListArray<O> {
Some(Bitmap::new_zeroed(length)),
)
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// unsafe construtors
Expand Down
10 changes: 10 additions & 0 deletions src/array/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ impl MapArray {
let field = new_empty_array(Self::get_field(&data_type).data_type().clone()).into();
Self::new(data_type, Buffer::from(vec![0i32]), field, None)
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

impl MapArray {
Expand Down
10 changes: 10 additions & 0 deletions src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ impl NullArray {
pub fn new_null(data_type: DataType, length: usize) -> Self {
Self::new(data_type, length)
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

impl NullArray {
Expand Down
10 changes: 10 additions & 0 deletions src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,16 @@ impl<T: NativeType> PrimitiveArray<T> {
MutablePrimitiveArray::<T>::from_trusted_len_iter_unchecked(iter).into()
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}

/// Alias for `Self::try_new(..).unwrap()`.
/// # Panics
/// This function errors iff:
Expand Down
14 changes: 12 additions & 2 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ mod iterator;
/// use std::sync::Arc;
/// use arrow2::array::*;
/// use arrow2::datatypes::*;
/// let boolean = Arc::new(BooleanArray::from_slice(&[false, false, true, true])) as Arc<dyn Array>;
/// let int = Arc::new(Int32Array::from_slice(&[42, 28, 19, 31])) as Arc<dyn Array>;
/// let boolean = BooleanArray::from_slice(&[false, false, true, true]).arced();
/// let int = Int32Array::from_slice(&[42, 28, 19, 31]).arced();
///
/// let fields = vec![
/// Field::new("b", DataType::Boolean, false),
Expand Down Expand Up @@ -223,6 +223,16 @@ impl StructArray {
arr.validity = validity;
arr
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

// Accessors
Expand Down
10 changes: 10 additions & 0 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ impl UnionArray {
panic!("Union struct must be created with the corresponding Union DataType")
}
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}
}

impl UnionArray {
Expand Down
10 changes: 10 additions & 0 deletions src/array/utf8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ impl<O: Offset> Utf8Array<O> {
}
}

/// Boxes self into a [`Box<dyn Array>`].
pub fn boxed(self) -> Box<dyn Array> {
Box::new(self)
}

/// Boxes self into a [`std::sync::Arc<dyn Array>`].
pub fn arced(self) -> std::sync::Arc<dyn Array> {
std::sync::Arc::new(self)
}

/// Clones this [`Utf8Array`] and assigns it a new validity
/// # Panic
/// This function panics iff `validity.len() != self.len()`.
Expand Down
5 changes: 1 addition & 4 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ fn main() -> Result<()> {
]);

// declare chunk
let chunk = Chunk::new(vec![
Arc::new(a) as Arc<dyn Array>,
Arc::new(b) as Arc<dyn Array>,
]);
let chunk = Chunk::new(vec![a.arced(), b.arced()]);

// write to parquet (probably the fastest implementation of writing to parquet out there)

Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
//! // Setup the data
//! let x_data = Int32Array::from_slice([-1i32, 1]);
//! let y_data = Int32Array::from_slice([1i32, -1]);
//! let chunk = Chunk::try_new(
//! vec![Arc::new(x_data) as Arc<dyn Array>, Arc::new(y_data)]
//! )?;
//! let chunk = Chunk::try_new(vec![x_data.arced(), y_data.arced()])?;
//!
//! // Write the messages and finalize the stream
//! for _ in 0..5 {
Expand Down
20 changes: 10 additions & 10 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn read<R: Read + Seek>(
let data_type = field.data_type.clone();

match data_type.to_physical_type() {
Null => read_null(field_nodes, data_type).map(|x| Arc::new(x) as Arc<dyn Array>),
Null => read_null(field_nodes, data_type).map(|x| x.arced()),
Boolean => read_boolean(
field_nodes,
data_type,
Expand All @@ -42,7 +42,7 @@ pub fn read<R: Read + Seek>(
is_little_endian,
compression,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
read_primitive::<$T, _>(
field_nodes,
Expand All @@ -53,7 +53,7 @@ pub fn read<R: Read + Seek>(
is_little_endian,
compression,
)
.map(|x| Arc::new(x) as Arc<dyn Array>)
.map(|x| x.arced())
}),
Binary => {
let array = read_binary::<i32, _>(
Expand Down Expand Up @@ -127,7 +127,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
LargeList => read_list::<i64, _>(
field_nodes,
data_type,
Expand All @@ -140,7 +140,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
FixedSizeList => read_fixed_size_list(
field_nodes,
data_type,
Expand All @@ -153,7 +153,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
Struct => read_struct(
field_nodes,
data_type,
Expand All @@ -166,7 +166,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
read_dictionary::<$T, _>(
Expand All @@ -179,7 +179,7 @@ pub fn read<R: Read + Seek>(
compression,
is_little_endian,
)
.map(|x| Arc::new(x) as Arc<dyn Array>)
.map(|x| x.arced())
})
}
Union => read_union(
Expand All @@ -194,7 +194,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
Map => read_map(
field_nodes,
data_type,
Expand All @@ -207,7 +207,7 @@ pub fn read<R: Read + Seek>(
compression,
version,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
.map(|x| x.arced()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);
/// // Write chunks to file
/// for i in 0..3 {
/// let values = Int32Array::from(&[Some(i), None]);
/// let chunk = Chunk::new(vec![Arc::new(values) as Arc<dyn Array>]);
/// let chunk = Chunk::new(vec![values.arced()]);
/// sink.feed(chunk.into()).await?;
/// }
/// sink.close().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::error::{Error, Result};
///
/// for i in 0..3 {
/// let values = Int32Array::from(&[Some(i), None]);
/// let chunk = Chunk::new(vec![Arc::new(values) as Arc<dyn Array>]);
/// let chunk = Chunk::new(vec![values.arced()]);
/// sink.feed(chunk.into()).await?;
/// }
/// sink.close().await?;
Expand Down
Loading

0 comments on commit 06f8f36

Please sign in to comment.