Skip to content

Commit

Permalink
feat: evaluate full skipping predicate via evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 28, 2024
1 parent 1ee82cf commit dc5119e
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 75 deletions.
3 changes: 2 additions & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ version.workspace = true

[dependencies]
arrow-array = { version = "^49.0" }
arrow-arith = { version = "^49.0" }
arrow-select = { version = "^49.0" }
bytes = "1.4"
chrono = { version = "0.4" }
Expand All @@ -33,6 +32,7 @@ z85 = "3.0.5"
visibility = "0.1.0"

# Used in default client
arrow-arith = { version = "^49.0", optional = true }
arrow-json = { version = "^49.0", optional = true }
arrow-ord = { version = "^49.0", optional = true }
arrow-schema = { version = "^49.0", optional = true }
Expand All @@ -49,6 +49,7 @@ tokio = { version = "1", optional = true, features = ["rt-multi-thread"] }
[features]
default = ["default-client"]
default-client = [
"arrow-arith",
"arrow-json",
"arrow-ord",
"arrow-schema",
Expand Down
11 changes: 5 additions & 6 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ fn struct_array_to_map(arr: &StructArray) -> DeltaResult<HashMap<String, Option<
mod tests {
use std::sync::Arc;

use arrow_array::ArrayRef;
use object_store::local::LocalFileSystem;

use super::*;
Expand All @@ -534,13 +535,12 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
handler.parse_json(json_strings, output_schema).unwrap()
}
Expand Down Expand Up @@ -597,15 +597,14 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
let batch = handler.parse_json(json_strings, output_schema).unwrap();

Expand Down
25 changes: 17 additions & 8 deletions kernel/src/client/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arrow_array::{
};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use arrow_select::nullif::nullif;

use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
Expand Down Expand Up @@ -215,6 +216,13 @@ fn evaluate_expression(
.transpose()?
.ok_or(Error::Generic("empty expression".to_string()))
}
(NullIf { expr, if_expr }, _) => {
let expr_arr = evaluate_expression(expr.as_ref(), batch, None)?;
let if_expr_arr =
evaluate_expression(if_expr.as_ref(), batch, Some(&DataType::BOOLEAN))?;
let if_expr_arr = downcast_to_bool(&if_expr_arr)?;
Ok(nullif(&expr_arr, if_expr_arr)?)
}
}
}

Expand Down Expand Up @@ -245,14 +253,15 @@ pub struct DefaultExpressionEvaluator {

impl ExpressionEvaluator for DefaultExpressionEvaluator {
fn evaluate(&self, batch: &RecordBatch) -> DeltaResult<ArrayRef> {
let input_schema: ArrowSchema = self.input_schema.as_ref().try_into()?;
if batch.schema().as_ref() != &input_schema {
return Err(Error::Generic(format!(
"input schema does not match batch schema: {:?} != {:?}",
input_schema,
batch.schema()
)));
};
let _input_schema: ArrowSchema = self.input_schema.as_ref().try_into()?;
// TODO: make sure we have matching schemas for validation
// if batch.schema().as_ref() != &input_schema {
// return Err(Error::Generic(format!(
// "input schema does not match batch schema: {:?} != {:?}",
// input_schema,
// batch.schema()
// )));
// };
evaluate_expression(&self.expression, batch, Some(&self.output_type))
}
}
Expand Down
11 changes: 5 additions & 6 deletions kernel/src/client/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::task::{ready, Poll};

use arrow_array::{new_null_array, Array, RecordBatch, StringArray};
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StringArray};
use arrow_json::{reader::Decoder, ReaderBuilder};
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use arrow_select::concat::concat_batches;
Expand Down Expand Up @@ -105,7 +105,7 @@ fn get_reader(data: &[u8]) -> BufReader<Cursor<&[u8]>> {
impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: StringArray,
json_strings: ArrayRef,
output_schema: SchemaRef,
) -> DeltaResult<RecordBatch> {
let json_strings = json_strings
Expand All @@ -117,7 +117,6 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
json_strings
))
})?;

let output_schema: ArrowSchemaRef = Arc::new(output_schema.as_ref().try_into()?);
let mut decoder = ReaderBuilder::new(output_schema.clone())
.with_batch_size(self.batch_size)
Expand Down Expand Up @@ -276,6 +275,7 @@ impl FileOpener for JsonOpener {
mod tests {
use std::path::PathBuf;

use arrow_array::ArrayRef;
use arrow_schema::Schema as ArrowSchema;
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
Expand All @@ -288,13 +288,12 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());

let batch = handler.parse_json(json_strings, output_schema).unwrap();
Expand Down
29 changes: 29 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ pub enum Expression {
/// The expressions.
exprs: Vec<Expression>,
},
// TODO how to model required functions?
NullIf {
/// The expression to evaluate.
expr: Box<Expression>,
/// The expression to compare against.
if_expr: Box<Expression>,
},
// TODO: support more expressions, such as IS IN, LIKE, etc.
}

Expand Down Expand Up @@ -141,6 +148,7 @@ impl Display for Expression {
)
}
},
Self::NullIf { expr, if_expr } => write!(f, "NULLIF({}, {})", expr, if_expr),
}
}
}
Expand Down Expand Up @@ -169,6 +177,10 @@ impl Expression {
Self::Literal(value.into())
}

pub fn struct_expr(exprs: impl IntoIterator<Item = Self>) -> Self {
Self::Struct(exprs.into_iter().collect())
}

pub fn unary(op: UnaryOperator, expr: impl Into<Expression>) -> Self {
Self::UnaryOperation {
op,
Expand Down Expand Up @@ -250,6 +262,19 @@ impl Expression {
Self::variadic(VariadicOperator::Or, std::iter::once(self).chain(other))
}

/// Create a new expression `self IS NULL`
pub fn is_null(self) -> Self {
Self::unary(UnaryOperator::IsNull, self)
}

/// Create a new expression `NULLIF(self, other)`
pub fn null_if(self, other: Self) -> Self {
Self::NullIf {
expr: Box::new(self),
if_expr: Box::new(other),
}
}

fn walk(&self) -> impl Iterator<Item = &Self> + '_ {
let mut stack = vec![self];
std::iter::from_fn(move || {
Expand All @@ -272,6 +297,10 @@ impl Expression {
stack.extend(exprs.iter());
}
},
Self::NullIf { expr, if_expr } => {
stack.push(expr);
stack.push(if_expr);
}
}
Some(expr)
})
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
use std::ops::Range;
use std::sync::Arc;

use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_array::{ArrayRef, RecordBatch};
use bytes::Bytes;
use url::Url;

Expand Down Expand Up @@ -148,7 +148,7 @@ pub trait JsonHandler {
/// Parse the given json strings and return the fields requested by output schema as columns in a [`RecordBatch`].
fn parse_json(
&self,
json_strings: StringArray,
json_strings: ArrayRef,
output_schema: SchemaRef,
) -> DeltaResult<RecordBatch>;

Expand Down
95 changes: 55 additions & 40 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow_arith::boolean::is_null;
use arrow_array::{Array, BooleanArray, RecordBatch, StringArray, StructArray};
use arrow_array::{Array, BooleanArray, RecordBatch, StructArray};
use arrow_select::filter::filter_record_batch;
use arrow_select::nullif::nullif;
use tracing::debug;

use crate::error::{DeltaResult, Error};
Expand Down Expand Up @@ -105,7 +103,9 @@ fn as_data_skipping_predicate(expr: &Expr) -> Option<Expr> {

pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
evaluator: Arc<dyn ExpressionEvaluator>,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn ExpressionEvaluator>,
filter_evaluator: Arc<dyn ExpressionEvaluator>,
json_handler: Arc<dyn JsonHandler>,
}

Expand All @@ -120,6 +120,17 @@ impl DataSkippingFilter {
table_schema: &SchemaRef,
predicate: &Option<Expr>,
) -> Option<Self> {
lazy_static::lazy_static!(
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![
StructField::new("predicate", DataType::BOOLEAN, true),
]).into();
static ref FILTER_EXPR: Expr = Expr::is_null(Expr::null_if(
Expr::column("predicate"),
Expr::column("predicate"),
));
static ref STATS_EXPR: Expr = Expr::column("add.stats");
);

let predicate = match predicate {
Some(predicate) => predicate,
None => return None,
Expand All @@ -146,64 +157,68 @@ impl DataSkippingFilter {
StructField::new("maxValues", StructType::new(data_fields), true),
]));

let evaluator = table_client.get_expression_handler().get_evaluator(
// Skipping happens in several steps:
//
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
// value of true means the stats say we must keep the file, and null means we could not
// determine whether the file is safe to skip, because its stats were missing/null.
//
// 2. The nullif(skip, skip) converts true (= keep) to null, producing a result
// that contains only false (= skip) and null (= keep) values.
//
// 3. The is_null converts null to true, producing a result that contains only true (=
// keep) and false (= skip) values.
//
// 4. The filter discards every file whose selection vector entry is false.
let skipping_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_expr([as_data_skipping_predicate(predicate)?]),
PREDICATE_SCHEMA.clone(),
);

let filter_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
as_data_skipping_predicate(predicate)?,
FILTER_EXPR.clone(),
DataType::BOOLEAN,
);

let select_stats_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
STATS_EXPR.clone(),
DataType::STRING,
);

Some(Self {
stats_schema,
evaluator,
select_stats_evaluator,
skipping_evaluator,
filter_evaluator,
json_handler: table_client.get_json_handler(),
})
}

pub(crate) fn apply(&self, actions: &RecordBatch) -> DeltaResult<RecordBatch> {
let adds = actions
.column_by_name("add")
.ok_or(Error::MissingColumn("Column 'add' not found.".into()))?
let stats = self.select_stats_evaluator.evaluate(actions)?;
let parsed_stats = self
.json_handler
.parse_json(stats, self.stats_schema.clone())?;

let skipping_predicate = self.skipping_evaluator.evaluate(&parsed_stats)?;
let skipping_predicate = skipping_predicate
.as_any()
.downcast_ref::<StructArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StructArray'.".into(),
))?;
let stats = adds
.column_by_name("stats")
.ok_or(Error::MissingColumn("Column 'stats' not found.".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StringArray'.".into(),
))?;

let parsed_stats = self
.json_handler
.parse_json(stats.clone(), self.stats_schema.clone())?;

// Skipping happens in several steps:
//
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
// value of true means the stats say we must keep the file, and null means we could not
// determine whether the file is safe to skip, because its stats were missing/null.
//
// 2. The nullif(skip, skip) converts true (= keep) to null, producing a result
// that contains only false (= skip) and null (= keep) values.
//
// 3. The is_null converts null to true, producing a result that contains only true (=
// keep) and false (= skip) values.
//
// 4. The filter discards every file whose selection vector entry is false.
let skipping_vector = self.evaluator.evaluate(&parsed_stats)?;
))?
.into();
let skipping_vector = self.filter_evaluator.evaluate(&skipping_predicate)?;
let skipping_vector = skipping_vector
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'BooleanArray'.".into(),
))?;

let skipping_vector = &is_null(&nullif(skipping_vector, skipping_vector)?)?;

let before_count = actions.num_rows();
let after = filter_record_batch(actions, skipping_vector)?;
debug!(
Expand Down
Loading

0 comments on commit dc5119e

Please sign in to comment.