Skip to content

Commit

Permalink
Struct layout eval with sub-expression slicing and push down (#1893)
Browse files Browse the repository at this point in the history
Co-authored-by: Nicholas Gates <nick@nickgates.com>
  • Loading branch information
joseph-isaacs and gatesn authored Jan 11, 2025
1 parent e8228c0 commit cedcb24
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 28 deletions.
8 changes: 8 additions & 0 deletions vortex-expr/src/transform/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ use crate::{get_item, ident, pack, ExprRef, GetItem, Identity, Select, SelectFie
/// The results of each partition can then be recombined to reproduce the result of the original
/// expression.
///
/// ## Note
///
/// This function currently respects the validity of each field in the scope, but the not validity
/// of the scope itself. The fix would be for the returned `PartitionedExpr` to include a partition
/// expression for computing the validity, or to include that expression as part of the root.
///
/// See </~https://github.com/spiraldb/vortex/issues/1907>.
///
// TODO(ngates): document the behaviour of conflicting `Field::Index` and `Field::Name`.
pub fn partition(expr: ExprRef, scope_dtype: &StructDType) -> VortexResult<PartitionedExpr> {
StructFieldExpressionSplitter::split(expr, scope_dtype)
Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/chunked/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_scalar::Scalar;
use vortex_scan::RowMask;

use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::LayoutScanExt;
use crate::reader::LayoutReaderExt;
use crate::ExprEvaluator;

#[async_trait(?Send)]
Expand Down Expand Up @@ -128,7 +128,7 @@ mod test {
}

#[test]
fn test_chunked_scan() {
fn test_chunked_evaluator() {
block_on(async {
let (segments, layout) = chunked_layout();

Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vortex_error::VortexResult;
use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::reader::{LayoutReader, LayoutReaderExt};
use crate::segments::AsyncSegmentReader;
use crate::CHUNKED_LAYOUT_ID;

Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ impl ChunkedReader {
}

// Construct a lazy scan for each chunk of the layout.
let chunk_scans = (0..nchunks).map(|_| OnceLock::new()).collect();
let chunk_readers = (0..nchunks).map(|_| OnceLock::new()).collect();

Ok(Self {
layout,
ctx,
segments,
stats_table: Arc::new(OnceCell::new()),
chunk_readers: chunk_scans,
chunk_readers,
})
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_flatbuffers::{array as fba, FlatBuffer};
use vortex_scan::RowMask;

use crate::layouts::flat::reader::FlatReader;
use crate::reader::LayoutScanExt;
use crate::reader::LayoutReaderExt;
use crate::{ExprEvaluator, LayoutReader};

#[async_trait(?Send)]
Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vortex_error::VortexResult;

use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::flat::reader::FlatReader;
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::reader::{LayoutReader, LayoutReaderExt};
use crate::segments::AsyncSegmentReader;
use crate::{LayoutData, FLAT_LAYOUT_ID};

Expand Down
150 changes: 145 additions & 5 deletions vortex-layout/src/layouts/struct_/eval_expr.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,155 @@
use async_trait::async_trait;
use vortex_array::ArrayData;
use futures::future::try_join_all;
use itertools::Itertools;
use vortex_array::array::StructArray;
use vortex_array::validity::Validity;
use vortex_array::{ArrayData, IntoArrayData};
use vortex_error::VortexResult;
use vortex_expr::transform::partition::partition;
use vortex_expr::ExprRef;
use vortex_scan::RowMask;

use crate::layouts::struct_::reader::StructScan;
use crate::layouts::struct_::reader::StructReader;
use crate::ExprEvaluator;

#[async_trait(?Send)]
impl ExprEvaluator for StructScan {
async fn evaluate_expr(&self, _row_mask: RowMask, _expr: ExprRef) -> VortexResult<ArrayData> {
todo!()
impl ExprEvaluator for StructReader {
async fn evaluate_expr(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
// Partition the expression into expressions that can be evaluated over individual fields
let partitioned = partition(expr, self.struct_dtype())?;
let field_readers: Vec<_> = partitioned
.partitions
.iter()
.map(|partition| self.child(&partition.field))
.try_collect()?;

let arrays = try_join_all(
field_readers
.iter()
.zip_eq(partitioned.partitions.iter())
.map(|(reader, partition)| {
reader.evaluate_expr(row_mask.clone(), partition.expr.clone())
}),
)
.await?;

let row_count = row_mask.true_count();
debug_assert!(arrays.iter().all(|a| a.len() == row_count));

let root_scope = StructArray::try_new(
partitioned
.partitions
.iter()
.map(|p| p.name.clone())
.collect::<Vec<_>>()
.into(),
arrays,
row_count,
Validity::NonNullable,
)?
.into_array();

// Recombine the partitioned expressions into a single expression
partitioned.root.evaluate(&root_scope)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use futures::executor::block_on;
use vortex_array::array::StructArray;
use vortex_array::compute::FilterMask;
use vortex_array::{IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_dtype::PType::I32;
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_expr::{get_item, gt, ident};
use vortex_scan::RowMask;

use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::layouts::struct_::writer::StructLayoutWriter;
use crate::segments::test::TestSegments;
use crate::strategies::LayoutWriterExt;
use crate::LayoutData;

/// Create a chunked layout with three chunks of primitive arrays.
fn struct_layout() -> (Arc<TestSegments>, LayoutData) {
let mut segments = TestSegments::default();

let layout = StructLayoutWriter::new(
DType::Struct(
StructDType::new(
vec!["a".into(), "b".into(), "c".into()].into(),
vec![I32.into(), I32.into(), I32.into()],
),
Nullability::NonNullable,
),
vec![
Box::new(FlatLayoutWriter::new(I32.into())),
Box::new(FlatLayoutWriter::new(I32.into())),
Box::new(FlatLayoutWriter::new(I32.into())),
],
)
.push_all(
&mut segments,
[StructArray::from_fields(
[
("a", buffer![7, 2, 3].into_array()),
("b", buffer![4, 5, 6].into_array()),
("c", buffer![4, 5, 6].into_array()),
]
.as_slice(),
)
.map(IntoArrayData::into_array)],
)
.unwrap();
(Arc::new(segments), layout)
}

#[test]
fn test_struct_layout() {
let (segments, layout) = struct_layout();

let reader = layout.reader(segments, Default::default()).unwrap();
let expr = gt(get_item("a", ident()), get_item("b", ident()));
let result =
block_on(reader.evaluate_expr(RowMask::new_valid_between(0, 3), expr)).unwrap();
assert_eq!(
vec![true, false, false],
result
.into_bool()
.unwrap()
.boolean_buffer()
.iter()
.collect::<Vec<_>>()
);
}

#[test]
fn test_struct_layout_row_mask() {
let (segments, layout) = struct_layout();

let reader = layout.reader(segments, Default::default()).unwrap();
let expr = gt(get_item("a", ident()), get_item("b", ident()));
let result = block_on(reader.evaluate_expr(
// Take rows 0 and 1, skip row 2, and anything after that
RowMask::new(FilterMask::from_iter([true, true, false]), 0),
expr,
))
.unwrap();

assert_eq!(result.len(), 2);

assert_eq!(
vec![true, false],
result
.into_bool()
.unwrap()
.boolean_buffer()
.iter()
.collect::<Vec<_>>()
);
}
}
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/struct_/eval_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use vortex_array::stats::{Stat, StatsSet};
use vortex_dtype::FieldPath;
use vortex_error::VortexResult;

use crate::layouts::struct_::reader::StructScan;
use crate::layouts::struct_::reader::StructReader;
use crate::StatsEvaluator;

#[async_trait(?Send)]
impl StatsEvaluator for StructScan {
impl StatsEvaluator for StructReader {
async fn evaluate_stats(
&self,
field_paths: &[FieldPath],
Expand Down
8 changes: 4 additions & 4 deletions vortex-layout/src/layouts/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ pub mod writer;
use std::collections::BTreeSet;
use std::sync::Arc;

use reader::StructScan;
use reader::StructReader;
use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::reader::{LayoutReader, LayoutReaderExt};
use crate::segments::AsyncSegmentReader;
use crate::COLUMNAR_LAYOUT_ID;

Expand All @@ -28,9 +28,9 @@ impl LayoutEncoding for StructLayout {
&self,
layout: LayoutData,
ctx: ContextRef,
_segments: Arc<dyn AsyncSegmentReader>,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(StructScan::try_new(layout, ctx)?.into_arc())
Ok(StructReader::try_new(layout, segments, ctx)?.into_arc())
}

fn register_splits(
Expand Down
75 changes: 67 additions & 8 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,86 @@
use std::sync::{Arc, OnceLock};

use vortex_array::aliases::hash_map::HashMap;
use vortex_array::ContextRef;
use vortex_error::{vortex_panic, VortexResult};
use vortex_dtype::{DType, Field, FieldName, StructDType};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::layouts::struct_::StructLayout;
use crate::{LayoutData, LayoutEncoding, LayoutReader};
use crate::segments::AsyncSegmentReader;
use crate::{LayoutData, LayoutEncoding, LayoutReader, LayoutReaderExt};

#[derive(Debug)]
pub struct StructScan {
#[derive(Clone)]
pub struct StructReader {
layout: LayoutData,
ctx: ContextRef,

segments: Arc<dyn AsyncSegmentReader>,

field_readers: Arc<[OnceLock<Arc<dyn LayoutReader>>]>,
field_lookup: HashMap<FieldName, usize>,
}

impl StructScan {
pub(super) fn try_new(layout: LayoutData, _ctx: ContextRef) -> VortexResult<Self> {
impl StructReader {
pub(super) fn try_new(
layout: LayoutData,
segments: Arc<dyn AsyncSegmentReader>,
ctx: ContextRef,
) -> VortexResult<Self> {
if layout.encoding().id() != StructLayout.id() {
vortex_panic!("Mismatched layout ID")
}

let dtype = layout.dtype();
let DType::Struct(struct_dt, _) = dtype else {
vortex_panic!("Mismatched dtype {} for struct layout", dtype);
};

let field_readers = struct_dt.names().iter().map(|_| OnceLock::new()).collect();

let field_lookup = struct_dt
.names()
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();

// This is where we need to do some complex things with the scan in order to split it into
// different scans for different fields.
Ok(Self { layout })
Ok(Self {
layout,
ctx,
segments,
field_readers,
field_lookup,
})
}

/// Return the [`StructDType`] of this layout.
pub(crate) fn struct_dtype(&self) -> &StructDType {
self.dtype()
.as_struct()
.vortex_expect("Struct layout must have a struct DType, verified at construction")
}

/// Return the child reader for the chunk.
pub(crate) fn child(&self, field: &Field) -> VortexResult<&Arc<dyn LayoutReader>> {
let idx = match field {
Field::Name(n) => *self
.field_lookup
.get(n)
.ok_or_else(|| vortex_err!("Field {} not found in struct layout", n))?,
Field::Index(idx) => *idx,
};
self.field_readers[idx].get_or_try_init(|| {
let child_layout = self
.layout
.child(idx, self.struct_dtype().field_dtype(idx)?)?;
child_layout.reader(self.segments.clone(), self.ctx.clone())
})
}
}

impl LayoutReader for StructScan {
impl LayoutReader for StructReader {
fn layout(&self) -> &LayoutData {
&self.layout
}
Expand Down
1 change: 1 addition & 0 deletions vortex-layout/src/layouts/struct_/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl LayoutWriter for StructLayoutWriter {
self.row_count += struct_array.len() as u64;

for i in 0..struct_array.nfields() {
// TODO(joe): handle struct validity
let column = chunk
.as_struct_array()
.vortex_expect("batch is a struct array")
Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl StatsEvaluator for Arc<dyn LayoutReader + 'static> {
}
}

pub trait LayoutScanExt: LayoutReader {
pub trait LayoutReaderExt: LayoutReader {
/// Box the layout scan.
fn into_arc(self) -> Arc<dyn LayoutReader>
where
Expand All @@ -78,4 +78,4 @@ pub trait LayoutScanExt: LayoutReader {
}
}

impl<L: LayoutReader> LayoutScanExt for L {}
impl<L: LayoutReader> LayoutReaderExt for L {}

0 comments on commit cedcb24

Please sign in to comment.