diff --git a/vortex-expr/src/transform/partition.rs b/vortex-expr/src/transform/partition.rs index 0eb681f82f..d5c96c1a44 100644 --- a/vortex-expr/src/transform/partition.rs +++ b/vortex-expr/src/transform/partition.rs @@ -13,6 +13,14 @@ use crate::{get_item, ident, pack, ExprRef, GetItem, Identity, Select, SelectFie /// The results of each partition can then be recombined to reproduce the result of the original /// expression. /// +/// ## Note +/// +/// This function currently respects the validity of each field in the scope, but the not validity +/// of the scope itself. The fix would be for the returned `PartitionedExpr` to include a partition +/// expression for computing the validity, or to include that expression as part of the root. +/// +/// See . +/// // TODO(ngates): document the behaviour of conflicting `Field::Index` and `Field::Name`. pub fn partition(expr: ExprRef, scope_dtype: &StructDType) -> VortexResult { StructFieldExpressionSplitter::split(expr, scope_dtype) diff --git a/vortex-layout/src/layouts/chunked/eval_expr.rs b/vortex-layout/src/layouts/chunked/eval_expr.rs index 1eb2bc31ac..79739e9da0 100644 --- a/vortex-layout/src/layouts/chunked/eval_expr.rs +++ b/vortex-layout/src/layouts/chunked/eval_expr.rs @@ -10,7 +10,7 @@ use vortex_scalar::Scalar; use vortex_scan::RowMask; use crate::layouts::chunked::reader::ChunkedReader; -use crate::reader::LayoutScanExt; +use crate::reader::LayoutReaderExt; use crate::ExprEvaluator; #[async_trait(?Send)] @@ -128,7 +128,7 @@ mod test { } #[test] - fn test_chunked_scan() { + fn test_chunked_evaluator() { block_on(async { let (segments, layout) = chunked_layout(); diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index 99c5b98b1a..87a0c77078 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -13,7 +13,7 @@ use vortex_error::VortexResult; use crate::data::LayoutData; use crate::encoding::{LayoutEncoding, LayoutId}; use crate::layouts::chunked::reader::ChunkedReader; -use crate::reader::{LayoutReader, LayoutScanExt}; +use crate::reader::{LayoutReader, LayoutReaderExt}; use crate::segments::AsyncSegmentReader; use crate::CHUNKED_LAYOUT_ID; diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index 00596d792e..13c5d0a0e1 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -42,14 +42,14 @@ impl ChunkedReader { } // Construct a lazy scan for each chunk of the layout. - let chunk_scans = (0..nchunks).map(|_| OnceLock::new()).collect(); + let chunk_readers = (0..nchunks).map(|_| OnceLock::new()).collect(); Ok(Self { layout, ctx, segments, stats_table: Arc::new(OnceCell::new()), - chunk_readers: chunk_scans, + chunk_readers, }) } diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index de389b94a8..f403deabdb 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -10,7 +10,7 @@ use vortex_flatbuffers::{array as fba, FlatBuffer}; use vortex_scan::RowMask; use crate::layouts::flat::reader::FlatReader; -use crate::reader::LayoutScanExt; +use crate::reader::LayoutReaderExt; use crate::{ExprEvaluator, LayoutReader}; #[async_trait(?Send)] diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index 3ecb5366d4..6b340a0d9e 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -12,7 +12,7 @@ use vortex_error::VortexResult; use crate::encoding::{LayoutEncoding, LayoutId}; use crate::layouts::flat::reader::FlatReader; -use crate::reader::{LayoutReader, LayoutScanExt}; +use crate::reader::{LayoutReader, LayoutReaderExt}; use crate::segments::AsyncSegmentReader; use crate::{LayoutData, FLAT_LAYOUT_ID}; diff --git a/vortex-layout/src/layouts/struct_/eval_expr.rs b/vortex-layout/src/layouts/struct_/eval_expr.rs index 55dc6d219f..48691efd30 100644 --- a/vortex-layout/src/layouts/struct_/eval_expr.rs +++ b/vortex-layout/src/layouts/struct_/eval_expr.rs @@ -1,15 +1,155 @@ use async_trait::async_trait; -use vortex_array::ArrayData; +use futures::future::try_join_all; +use itertools::Itertools; +use vortex_array::array::StructArray; +use vortex_array::validity::Validity; +use vortex_array::{ArrayData, IntoArrayData}; use vortex_error::VortexResult; +use vortex_expr::transform::partition::partition; use vortex_expr::ExprRef; use vortex_scan::RowMask; -use crate::layouts::struct_::reader::StructScan; +use crate::layouts::struct_::reader::StructReader; use crate::ExprEvaluator; #[async_trait(?Send)] -impl ExprEvaluator for StructScan { - async fn evaluate_expr(&self, _row_mask: RowMask, _expr: ExprRef) -> VortexResult { - todo!() +impl ExprEvaluator for StructReader { + async fn evaluate_expr(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult { + // Partition the expression into expressions that can be evaluated over individual fields + let partitioned = partition(expr, self.struct_dtype())?; + let field_readers: Vec<_> = partitioned + .partitions + .iter() + .map(|partition| self.child(&partition.field)) + .try_collect()?; + + let arrays = try_join_all( + field_readers + .iter() + .zip_eq(partitioned.partitions.iter()) + .map(|(reader, partition)| { + reader.evaluate_expr(row_mask.clone(), partition.expr.clone()) + }), + ) + .await?; + + let row_count = row_mask.true_count(); + debug_assert!(arrays.iter().all(|a| a.len() == row_count)); + + let root_scope = StructArray::try_new( + partitioned + .partitions + .iter() + .map(|p| p.name.clone()) + .collect::>() + .into(), + arrays, + row_count, + Validity::NonNullable, + )? + .into_array(); + + // Recombine the partitioned expressions into a single expression + partitioned.root.evaluate(&root_scope) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::executor::block_on; + use vortex_array::array::StructArray; + use vortex_array::compute::FilterMask; + use vortex_array::{IntoArrayData, IntoArrayVariant}; + use vortex_buffer::buffer; + use vortex_dtype::PType::I32; + use vortex_dtype::{DType, Nullability, StructDType}; + use vortex_expr::{get_item, gt, ident}; + use vortex_scan::RowMask; + + use crate::layouts::flat::writer::FlatLayoutWriter; + use crate::layouts::struct_::writer::StructLayoutWriter; + use crate::segments::test::TestSegments; + use crate::strategies::LayoutWriterExt; + use crate::LayoutData; + + /// Create a chunked layout with three chunks of primitive arrays. + fn struct_layout() -> (Arc, LayoutData) { + let mut segments = TestSegments::default(); + + let layout = StructLayoutWriter::new( + DType::Struct( + StructDType::new( + vec!["a".into(), "b".into(), "c".into()].into(), + vec![I32.into(), I32.into(), I32.into()], + ), + Nullability::NonNullable, + ), + vec![ + Box::new(FlatLayoutWriter::new(I32.into())), + Box::new(FlatLayoutWriter::new(I32.into())), + Box::new(FlatLayoutWriter::new(I32.into())), + ], + ) + .push_all( + &mut segments, + [StructArray::from_fields( + [ + ("a", buffer![7, 2, 3].into_array()), + ("b", buffer![4, 5, 6].into_array()), + ("c", buffer![4, 5, 6].into_array()), + ] + .as_slice(), + ) + .map(IntoArrayData::into_array)], + ) + .unwrap(); + (Arc::new(segments), layout) + } + + #[test] + fn test_struct_layout() { + let (segments, layout) = struct_layout(); + + let reader = layout.reader(segments, Default::default()).unwrap(); + let expr = gt(get_item("a", ident()), get_item("b", ident())); + let result = + block_on(reader.evaluate_expr(RowMask::new_valid_between(0, 3), expr)).unwrap(); + assert_eq!( + vec![true, false, false], + result + .into_bool() + .unwrap() + .boolean_buffer() + .iter() + .collect::>() + ); + } + + #[test] + fn test_struct_layout_row_mask() { + let (segments, layout) = struct_layout(); + + let reader = layout.reader(segments, Default::default()).unwrap(); + let expr = gt(get_item("a", ident()), get_item("b", ident())); + let result = block_on(reader.evaluate_expr( + // Take rows 0 and 1, skip row 2, and anything after that + RowMask::new(FilterMask::from_iter([true, true, false]), 0), + expr, + )) + .unwrap(); + + assert_eq!(result.len(), 2); + + assert_eq!( + vec![true, false], + result + .into_bool() + .unwrap() + .boolean_buffer() + .iter() + .collect::>() + ); } } diff --git a/vortex-layout/src/layouts/struct_/eval_stats.rs b/vortex-layout/src/layouts/struct_/eval_stats.rs index d62452f36e..3add6a78c6 100644 --- a/vortex-layout/src/layouts/struct_/eval_stats.rs +++ b/vortex-layout/src/layouts/struct_/eval_stats.rs @@ -3,11 +3,11 @@ use vortex_array::stats::{Stat, StatsSet}; use vortex_dtype::FieldPath; use vortex_error::VortexResult; -use crate::layouts::struct_::reader::StructScan; +use crate::layouts::struct_::reader::StructReader; use crate::StatsEvaluator; #[async_trait(?Send)] -impl StatsEvaluator for StructScan { +impl StatsEvaluator for StructReader { async fn evaluate_stats( &self, field_paths: &[FieldPath], diff --git a/vortex-layout/src/layouts/struct_/mod.rs b/vortex-layout/src/layouts/struct_/mod.rs index a339c4b531..b2acbe501d 100644 --- a/vortex-layout/src/layouts/struct_/mod.rs +++ b/vortex-layout/src/layouts/struct_/mod.rs @@ -6,13 +6,13 @@ pub mod writer; use std::collections::BTreeSet; use std::sync::Arc; -use reader::StructScan; +use reader::StructReader; use vortex_array::ContextRef; use vortex_error::VortexResult; use crate::data::LayoutData; use crate::encoding::{LayoutEncoding, LayoutId}; -use crate::reader::{LayoutReader, LayoutScanExt}; +use crate::reader::{LayoutReader, LayoutReaderExt}; use crate::segments::AsyncSegmentReader; use crate::COLUMNAR_LAYOUT_ID; @@ -28,9 +28,9 @@ impl LayoutEncoding for StructLayout { &self, layout: LayoutData, ctx: ContextRef, - _segments: Arc, + segments: Arc, ) -> VortexResult> { - Ok(StructScan::try_new(layout, ctx)?.into_arc()) + Ok(StructReader::try_new(layout, segments, ctx)?.into_arc()) } fn register_splits( diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 3e33e87c74..2af0964016 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -1,27 +1,86 @@ +use std::sync::{Arc, OnceLock}; + +use vortex_array::aliases::hash_map::HashMap; use vortex_array::ContextRef; -use vortex_error::{vortex_panic, VortexResult}; +use vortex_dtype::{DType, Field, FieldName, StructDType}; +use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; use crate::layouts::struct_::StructLayout; -use crate::{LayoutData, LayoutEncoding, LayoutReader}; +use crate::segments::AsyncSegmentReader; +use crate::{LayoutData, LayoutEncoding, LayoutReader, LayoutReaderExt}; -#[derive(Debug)] -pub struct StructScan { +#[derive(Clone)] +pub struct StructReader { layout: LayoutData, + ctx: ContextRef, + + segments: Arc, + + field_readers: Arc<[OnceLock>]>, + field_lookup: HashMap, } -impl StructScan { - pub(super) fn try_new(layout: LayoutData, _ctx: ContextRef) -> VortexResult { +impl StructReader { + pub(super) fn try_new( + layout: LayoutData, + segments: Arc, + ctx: ContextRef, + ) -> VortexResult { if layout.encoding().id() != StructLayout.id() { vortex_panic!("Mismatched layout ID") } + let dtype = layout.dtype(); + let DType::Struct(struct_dt, _) = dtype else { + vortex_panic!("Mismatched dtype {} for struct layout", dtype); + }; + + let field_readers = struct_dt.names().iter().map(|_| OnceLock::new()).collect(); + + let field_lookup = struct_dt + .names() + .iter() + .enumerate() + .map(|(i, name)| (name.clone(), i)) + .collect(); + // This is where we need to do some complex things with the scan in order to split it into // different scans for different fields. - Ok(Self { layout }) + Ok(Self { + layout, + ctx, + segments, + field_readers, + field_lookup, + }) + } + + /// Return the [`StructDType`] of this layout. + pub(crate) fn struct_dtype(&self) -> &StructDType { + self.dtype() + .as_struct() + .vortex_expect("Struct layout must have a struct DType, verified at construction") + } + + /// Return the child reader for the chunk. + pub(crate) fn child(&self, field: &Field) -> VortexResult<&Arc> { + let idx = match field { + Field::Name(n) => *self + .field_lookup + .get(n) + .ok_or_else(|| vortex_err!("Field {} not found in struct layout", n))?, + Field::Index(idx) => *idx, + }; + self.field_readers[idx].get_or_try_init(|| { + let child_layout = self + .layout + .child(idx, self.struct_dtype().field_dtype(idx)?)?; + child_layout.reader(self.segments.clone(), self.ctx.clone()) + }) } } -impl LayoutReader for StructScan { +impl LayoutReader for StructReader { fn layout(&self) -> &LayoutData { &self.layout } diff --git a/vortex-layout/src/layouts/struct_/writer.rs b/vortex-layout/src/layouts/struct_/writer.rs index 2cc063430c..dda5a3cf77 100644 --- a/vortex-layout/src/layouts/struct_/writer.rs +++ b/vortex-layout/src/layouts/struct_/writer.rs @@ -63,6 +63,7 @@ impl LayoutWriter for StructLayoutWriter { self.row_count += struct_array.len() as u64; for i in 0..struct_array.nfields() { + // TODO(joe): handle struct validity let column = chunk .as_struct_array() .vortex_expect("batch is a struct array") diff --git a/vortex-layout/src/reader.rs b/vortex-layout/src/reader.rs index 76278cef9c..11f5e0d034 100644 --- a/vortex-layout/src/reader.rs +++ b/vortex-layout/src/reader.rs @@ -63,7 +63,7 @@ impl StatsEvaluator for Arc { } } -pub trait LayoutScanExt: LayoutReader { +pub trait LayoutReaderExt: LayoutReader { /// Box the layout scan. fn into_arc(self) -> Arc where @@ -78,4 +78,4 @@ pub trait LayoutScanExt: LayoutReader { } } -impl LayoutScanExt for L {} +impl LayoutReaderExt for L {}