Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Growable union (#902)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Mar 10, 2022
1 parent 31c8ec6 commit 4b893b7
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 6 deletions.
13 changes: 11 additions & 2 deletions src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::datatypes::*;

mod binary;
pub use binary::GrowableBinary;
mod union;
pub use union::GrowableUnion;
mod boolean;
pub use boolean::GrowableBoolean;
mod fixed_binary;
Expand All @@ -28,7 +30,7 @@ pub use dictionary::GrowableDictionary;
mod utils;

/// Describes a struct that can be extended from slices of other pre-existing [`Array`]s.
/// This is used in operations where a new array is built out of other arrays such
/// This is used in operations where a new array is built out of other arrays, such
/// as filter and concatenation.
pub trait Growable<'a> {
/// Extends this [`Growable`] with elements from the bounded [`Array`] at index `index` from
Expand Down Expand Up @@ -110,7 +112,14 @@ pub fn make_growable<'a>(
use_validity,
capacity
),
Union | Map => todo!(),
Union => {
let arrays = arrays
.iter()
.map(|array| array.as_any().downcast_ref().unwrap())
.collect::<Vec<_>>();
Box::new(union::GrowableUnion::new(arrays, capacity))
}
Map => todo!(),
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
let arrays = arrays
Expand Down
111 changes: 111 additions & 0 deletions src/array/growable/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::sync::Arc;

use crate::array::{Array, UnionArray};

use super::{make_growable, Growable};

/// Concrete [`Growable`] for the [`UnionArray`].
pub struct GrowableUnion<'a> {
arrays: Vec<&'a UnionArray>,
types: Vec<i8>,
offsets: Option<Vec<i32>>,
fields: Vec<Box<dyn Growable<'a> + 'a>>,
}

impl<'a> GrowableUnion<'a> {
/// Creates a new [`GrowableUnion`] bound to `arrays` with a pre-allocated `capacity`.
/// # Panics
/// Panics iff
/// * `arrays` is empty.
/// * any of the arrays has a different
pub fn new(arrays: Vec<&'a UnionArray>, capacity: usize) -> Self {
let first = arrays[0].data_type();
assert!(arrays.iter().all(|x| x.data_type() == first));

let has_offsets = arrays[0].offsets().is_some();

let fields = (0..arrays[0].fields().len())
.map(|i| {
make_growable(
&arrays
.iter()
.map(|x| x.fields()[i].as_ref())
.collect::<Vec<_>>(),
false,
capacity,
)
})
.collect::<Vec<Box<dyn Growable>>>();

Self {
arrays,
fields,
offsets: if has_offsets {
Some(Vec::with_capacity(capacity))
} else {
None
},
types: Vec::with_capacity(capacity),
}
}

fn to(&mut self) -> UnionArray {
let types = std::mem::take(&mut self.types);
let fields = std::mem::take(&mut self.fields);
let offsets = std::mem::take(&mut self.offsets);
let fields = fields.into_iter().map(|mut x| x.as_arc()).collect();

UnionArray::new(
self.arrays[0].data_type().clone(),
types.into(),
fields,
offsets.map(|x| x.into()),
)
}
}

impl<'a> Growable<'a> for GrowableUnion<'a> {
fn extend(&mut self, index: usize, start: usize, len: usize) {
let array = self.arrays[index];

let types = &array.types()[start..start + len];
self.types.extend(types);
if let Some(x) = self.offsets.as_mut() {
let offsets = &array.offsets().unwrap()[start..start + len];

x.extend(offsets);
// in a dense union, each slot has its own offset. We extend the fields accordingly.
for (&type_, &offset) in types.iter().zip(offsets.iter()) {
self.fields[type_ as usize].extend(index, offset as usize, 1);
}
} else {
// in a sparse union, every field has the same length => extend all fields equally
self.fields
.iter_mut()
.for_each(|field| field.extend(index, start, len))
}
}

fn extend_validity(&mut self, _additional: usize) {}

fn as_arc(&mut self) -> Arc<dyn Array> {
Arc::new(self.to())
}

fn as_box(&mut self) -> Box<dyn Array> {
Box::new(self.to())
}
}

impl<'a> From<GrowableUnion<'a>> for UnionArray {
fn from(val: GrowableUnion<'a>) -> Self {
let fields = val.fields.into_iter().map(|mut x| x.as_arc()).collect();

UnionArray::new(
val.arrays[0].data_type().clone(),
val.types.into(),
fields,
val.offsets.map(|x| x.into()),
)
}
}
4 changes: 2 additions & 2 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ impl UnionArray {
}

/// The optional offsets.
pub fn offsets(&self) -> &Option<Buffer<i32>> {
&self.offsets
pub fn offsets(&self) -> Option<&Buffer<i32>> {
self.offsets.as_ref()
}

/// The fields.
Expand Down
1 change: 1 addition & 0 deletions tests/it/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod list;
mod null;
mod primitive;
mod struct_;
mod union;
mod utf8;

/*
Expand Down
72 changes: 72 additions & 0 deletions tests/it/array/growable/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::sync::Arc;

use arrow2::{
array::{
growable::{Growable, GrowableUnion},
*,
},
datatypes::*,
error::Result,
};

#[test]
fn sparse() -> Result<()> {
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
];
let data_type = DataType::Union(fields, None, UnionMode::Sparse);
let types = vec![0, 0, 1].into();
let fields = vec![
Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from(&[Some("a"), Some("b"), Some("c")])) as Arc<dyn Array>,
];
let array = UnionArray::from_data(data_type, types, fields, None);

for length in 1..2 {
for index in 0..(array.len() - length + 1) {
let mut a = GrowableUnion::new(vec![&array], 10);

a.extend(0, index, length);
let expected = array.slice(index, length);

let result: UnionArray = a.into();

assert_eq!(expected, result);
}
}

Ok(())
}

#[test]
fn dense() -> Result<()> {
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
];
let data_type = DataType::Union(fields, None, UnionMode::Dense);
let types = vec![0, 0, 1].into();
let fields = vec![
Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from(&[Some("c")])) as Arc<dyn Array>,
];
let offsets = Some(vec![0, 1, 0].into());

let array = UnionArray::from_data(data_type, types, fields, offsets);

for length in 1..2 {
for index in 0..(array.len() - length + 1) {
let mut a = GrowableUnion::new(vec![&array], 10);

a.extend(0, index, length);
let expected = array.slice(index, length);

let result: UnionArray = a.into();

assert_eq!(expected, result);
}
}

Ok(())
}
25 changes: 23 additions & 2 deletions tests/it/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use arrow2::{
};

#[test]
fn debug() -> Result<()> {
fn sparse_debug() -> Result<()> {
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
];
let data_type = DataType::Union(fields, None, UnionMode::Sparse);
let types = Buffer::from_slice([0, 0, 1]);
let types = vec![0, 0, 1].into();
let fields = vec![
Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from(&[Some("a"), Some("b"), Some("c")])) as Arc<dyn Array>,
Expand All @@ -28,6 +28,27 @@ fn debug() -> Result<()> {
Ok(())
}

#[test]
fn dense_debug() -> Result<()> {
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
];
let data_type = DataType::Union(fields, None, UnionMode::Dense);
let types = vec![0, 0, 1].into();
let fields = vec![
Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from(&[Some("c")])) as Arc<dyn Array>,
];
let offsets = Some(vec![0, 1, 0].into());

let array = UnionArray::from_data(data_type, types, fields, offsets);

assert_eq!(format!("{:?}", array), "UnionArray[1, None, c]");

Ok(())
}

#[test]
fn slice() -> Result<()> {
let fields = vec![
Expand Down

0 comments on commit 4b893b7

Please sign in to comment.