-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Utility trait for stats-based skipping logic #357
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #357 +/- ##
==========================================
+ Coverage 74.71% 76.37% +1.65%
==========================================
Files 43 45 +2
Lines 8361 9240 +879
Branches 8361 9240 +879
==========================================
+ Hits 6247 7057 +810
- Misses 1727 1786 +59
- Partials 387 397 +10 ☔ View full report in Codecov by Sentry. |
} | ||
|
||
pub(crate) fn compute_field_indices( | ||
fields: &[ColumnDescPtr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have similar functionality in my PR, but I just need to extract the columns (I don't have access to a &[ColumnDescPtr]
). It may be a good idea to split out the expression_to_column
part so I could reuse your implementation once you merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, this is really cool. Had a few comments but overall looks great.
hope the unit tests aren't too much of pain to write :)
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false)); | ||
keep.then_some(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this a little easier to read:
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false)); | |
keep.then_some(index) | |
RowGroupFilter::apply(filter, row_group).and_then(|result| (!result).then_some(index)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure those are equivalent? The intent is to keep unless it's Some(false)
. So None
and Some(true)
should both produce the same result. Maybe this, but it's not shorter and it has a double-negative (= confusing and error-prone)
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false)); | |
keep.then_some(index) | |
let keep = !RowGroupFilter::apply(filter, row_group).is_some_and(|v| !v); | |
keep.then_some(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a nice place to use Option::is_none_or
, but that's not stable rust yet:
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false)); | |
keep.then_some(index) | |
RowGroupFilter::apply(filter, row_group).is_none_or(|v| v).then_some(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a nice place to use
Option::is_none_or
, but that's not stable rust yet:
You could use just a let chain
if let Some(false) = RowGroupFilter::apply(filter, row_group) {
Some(index)
} else {
None
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has shifted around quite a bit. I still use matches!(...)
macro, but the result is a plain bool now instead of Option<bool>
:
mpl<'a> RowGroupFilter<'a> {
/// Applies a filtering expression to a row group. Return value false means to skip it.
fn apply(filter: &Expression, row_group: &'a RowGroupMetaData) -> bool {
let field_indices = compute_field_indices(row_group.schema_descr().columns(), filter);
let result = Self {
row_group,
field_indices,
}
.apply_sql_where(filter);
!matches!(result, Some(false))
}
What do you think?
match op { | ||
Equal => skipping_eq(inverted), | ||
NotEqual => skipping_eq(!inverted), | ||
LessThan => self.partial_cmp_min_stat(&col, val, Ordering::Less, inverted), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we re-wrote all the partial_cmp_[min/max]_stat
calls like:
LessThan => partial_cmp_scalars(
&self.get_min_stat_value(&col, &val.data_type())?,
val,
Ordering::Less,
inverted,
),
We could remove those functions. It's a little more code at the call site though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was trying to reduce redundancy as much as possible. There's too much anyway that can't be removed.
.filter_map(|(index, row_group)| { | ||
// We can only skip a row group if the filter is false (true/null means keep) | ||
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false)); | ||
keep.then_some(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, my bad missing the None
case (which yours also misses btw @hntd187)
keep.then_some(index) | |
RowGroupFilter::apply(filter, row_group).unwrap_or(true).then_some(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ryan noted that Some(true) and None produced the same results so I assumed it would be okay basically there is only one success case Some(false), unless I misunderstood?
I just pushed lots of changes:
At this point, the PR is no longer WIP. We can decide whether wiring up the row group skipping should be done in this PR or as follow-up work. |
Expression::Struct(fields) => { | ||
for field in fields { | ||
recurse(field, columns); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We technically don't need this one, because the skipping logic ignores Struct
expressions... but I don't know that the restriction is fundamental, so somebody might choose to implement it some day?
This build/coverage failure doesn't look good?
|
Ah i'll look into the coverage token issue tomorrow! |
Must have been some weird timing race -- a retry succeeded immediately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are some impressive tests I have to say. I'm good with all of this, my preference though is that we wire it up in this PR otherwise it's just dead code until that PR lands and we know priorities change and such. But I don't have a strong enough opinion to hold up.
I started down that path, but it's a big enough change of its own (and needing tests of its own) that I ended up swinging the other way. All the actual parquet reader logic has been split into a separate PR #362, and this PR is now completely self-contained and tested. We can keep iterating on the other PR while this one merges. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, wow that's quite a test suite :)
left a few comments but I'm generally good with merging this as is and then wiring it up later.
Higher level comment. It's great that you've extracted out the stuff needed from the engine so that this PR doesn't need to reference arrow. We have a few other things (and will have more for say variant
) that fall into this category of "you might want this in your engine no matter what your data format is, so here's some utilities to help you, just implement these traits".
I'm wondering if we want to think about creating a module for that either as a sub-module of engine
(engine/utils
?), or as an engine_utils
standalone module. Probably not in this PR though.
Some(skip != inverted) | ||
}; | ||
match op { | ||
// Given `col == val`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the comments here, will be useful when debugging :)
fn test_binary_lt_ge() { | ||
use BinaryOperator::*; | ||
|
||
const LO: Scalar = Scalar::Long(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use some other scalar types between say the eq_be tests and the lt_gt ones? Probably should be minimal change here but we can exercise the scalar stuff for more than just Long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is only trying to verify the conversion regular predicate to data skipping predicate. The different scalar comparisons (by both matched and mismatched types) are already exercised exhaustively by test_binary_scalars
.
Is there a particular corner case you worry about, that would make the data skipping code misbehave based on the type of scalar involved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Added a negative test, where the literal and column types mismatch.
//! An implementation of data skipping that leverages parquet stats from the file footer. | ||
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator}; | ||
use crate::schema::DataType; | ||
use parquet::schema::types::ColumnPath; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From #357 (review):
It's great that you've extracted out the stuff needed from the engine so that this PR doesn't need to reference arrow.
We do still have this one dependency on arrow-parquet ColumnPath. But we already knew we needed to define a similar struct in kernel, in order to support nested column paths. Once that struct exists, we can use it instead and push the dependency back into the concrete implementation that anyway has to know about arrow-parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(also added it as a code comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added #379
re
I know from past bad experience that this is NOT fun stuff to debug. Better to test it near-exhaustively up front and hopefully save some of that pain later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. thanks!
Parquet footer stats allow data skipping, very similar to Delta file stats. Except parquet isn't quite as convenient to work with and arrow-parquet doesn't even try to help (it can't, because arrow-compute expressions are opaque, so there's no way to traverse and rewrite them into stats-based skipping predicates). We implement row group skipping support by traversing the same push-down predicate that delta-kernel already uses to extract a for Delta file skipping predicate. But instead of rewriting the expression, we evaluate it bottom-up (no-copy, O(n) work where n is the number of nodes in the expression). This PR does not attempt to actually incorporate the new skipping logic into the default reader. That (plus testing the integration) should be a follow-up PR.
pub fn null_literal(data_type: DataType) -> Self { | ||
Self::Literal(Scalar::Null(data_type)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah found it - I can use this in the write PR :)
Parquet footer stats allow data skipping, very similar to Delta file stats. Except parquet isn't quite as convenient to work with and arrow-parquet doesn't even try to help (it can't, because arrow-compute expressions are opaque, so there's no way to traverse and rewrite them into stats-based skipping predicates).
We implement row group skipping support by traversing the same push-down predicate that delta-kernel already uses to extract a for Delta file skipping predicate. But instead of rewriting the expression, we evaluate it bottom-up (no-copy, O(n) work where n is the number of nodes in the expression).
This PR does not attempt to actually incorporate the new skipping logic into the default reader. That (plus testing the integration) should likely be a follow-up PR.