Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utility trait for stats-based skipping logic #357

Merged
merged 12 commits into from
Oct 3, 2024

Conversation

scovich
Copy link
Collaborator

@scovich scovich commented Sep 25, 2024

Parquet footer stats allow data skipping, very similar to Delta file stats. Except parquet isn't quite as convenient to work with and arrow-parquet doesn't even try to help (it can't, because arrow-compute expressions are opaque, so there's no way to traverse and rewrite them into stats-based skipping predicates).

We implement row group skipping support by traversing the same push-down predicate that delta-kernel already uses to extract a for Delta file skipping predicate. But instead of rewriting the expression, we evaluate it bottom-up (no-copy, O(n) work where n is the number of nodes in the expression).

This PR does not attempt to actually incorporate the new skipping logic into the default reader. That (plus testing the integration) should likely be a follow-up PR.

@scovich scovich added the merge hold Don't allow the PR to merge label Sep 25, 2024
Copy link

codecov bot commented Sep 25, 2024

Codecov Report

Attention: Patch coverage is 87.59865% with 110 lines in your changes missing coverage. Please review.

Project coverage is 76.37%. Comparing base (da206ed) to head (efeb248).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/engine/parquet_stats_skipping/tests.rs 87.08% 84 Missing and 2 partials ⚠️
kernel/src/engine/parquet_stats_skipping.rs 89.94% 11 Missing and 6 partials ⚠️
kernel/src/expressions/scalars.rs 91.89% 3 Missing ⚠️
kernel/src/scan/mod.rs 40.00% 3 Missing ⚠️
kernel/src/schema.rs 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #357      +/-   ##
==========================================
+ Coverage   74.71%   76.37%   +1.65%     
==========================================
  Files          43       45       +2     
  Lines        8361     9240     +879     
  Branches     8361     9240     +879     
==========================================
+ Hits         6247     7057     +810     
- Misses       1727     1786      +59     
- Partials      387      397      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

}

pub(crate) fn compute_field_indices(
fields: &[ColumnDescPtr],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have similar functionality in my PR, but I just need to extract the columns (I don't have access to a &[ColumnDescPtr]). It may be a good idea to split out the expression_to_column part so I could reuse your implementation once you merge.

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this is really cool. Had a few comments but overall looks great.

hope the unit tests aren't too much of pain to write :)

kernel/src/engine/arrow_footer_skipping.rs Outdated Show resolved Hide resolved
Comment on lines 22 to 23
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false));
keep.then_some(index)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a little easier to read:

Suggested change
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false));
keep.then_some(index)
RowGroupFilter::apply(filter, row_group).and_then(|result| (!result).then_some(index))

Copy link
Collaborator Author

@scovich scovich Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure those are equivalent? The intent is to keep unless it's Some(false). So None and Some(true) should both produce the same result. Maybe this, but it's not shorter and it has a double-negative (= confusing and error-prone)

Suggested change
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false));
keep.then_some(index)
let keep = !RowGroupFilter::apply(filter, row_group).is_some_and(|v| !v);
keep.then_some(index)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a nice place to use Option::is_none_or, but that's not stable rust yet:

Suggested change
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false));
keep.then_some(index)
RowGroupFilter::apply(filter, row_group).is_none_or(|v| v).then_some(index)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a nice place to use Option::is_none_or, but that's not stable rust yet:

You could use just a let chain

if let Some(false) = RowGroupFilter::apply(filter, row_group) {
  Some(index)
} else {
  None
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has shifted around quite a bit. I still use matches!(...) macro, but the result is a plain bool now instead of Option<bool>:

mpl<'a> RowGroupFilter<'a> {
    /// Applies a filtering expression to a row group. Return value false means to skip it. 
    fn apply(filter: &Expression, row_group: &'a RowGroupMetaData) -> bool {
        let field_indices = compute_field_indices(row_group.schema_descr().columns(), filter);
        let result = Self {
            row_group,
            field_indices,
        }
        .apply_sql_where(filter);
        !matches!(result, Some(false))
    }

What do you think?

kernel/src/engine/arrow_footer_skipping.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_footer_skipping.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_footer_skipping.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_footer_skipping.rs Outdated Show resolved Hide resolved
match op {
Equal => skipping_eq(inverted),
NotEqual => skipping_eq(!inverted),
LessThan => self.partial_cmp_min_stat(&col, val, Ordering::Less, inverted),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we re-wrote all the partial_cmp_[min/max]_stat calls like:

 LessThan => partial_cmp_scalars(
                &self.get_min_stat_value(&col, &val.data_type())?,
                val,
                Ordering::Less,
                inverted,
            ),

We could remove those functions. It's a little more code at the call site though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was trying to reduce redundancy as much as possible. There's too much anyway that can't be removed.

.filter_map(|(index, row_group)| {
// We can only skip a row group if the filter is false (true/null means keep)
let keep = !matches!(RowGroupFilter::apply(filter, row_group), Some(false));
keep.then_some(index)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my bad missing the None case (which yours also misses btw @hntd187)

Suggested change
keep.then_some(index)
RowGroupFilter::apply(filter, row_group).unwrap_or(true).then_some(index)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ryan noted that Some(true) and None produced the same results so I assumed it would be okay basically there is only one success case Some(false), unless I misunderstood?

@scovich
Copy link
Collaborator Author

scovich commented Sep 27, 2024

I just pushed lots of changes:

  • Added complete set of tests for the data skipping logic
  • Doc comments everywhere
  • Significant changes and additions to the data skipping logic itself (partly to fix bugs the tests uncovered)
  • Split the code into two files to match the structure that emerged.

At this point, the PR is no longer WIP. We can decide whether wiring up the row group skipping should be done in this PR or as follow-up work.

@scovich scovich changed the title [WIP] Implement parquet row group skipping in the default reader Implement parquet row group skipping in the default client Sep 27, 2024
@scovich scovich removed the merge hold Don't allow the PR to merge label Sep 27, 2024
Comment on lines 154 to 158
Expression::Struct(fields) => {
for field in fields {
recurse(field, columns);
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We technically don't need this one, because the skipping logic ignores Struct expressions... but I don't know that the restriction is fundamental, so somebody might choose to implement it some day?

@scovich
Copy link
Collaborator Author

scovich commented Sep 27, 2024

This build/coverage failure doesn't look good?

The provided token has expired. Request signature expired at: 2024-09-27T05:46:37+00:00

@zachschuermann
Copy link
Collaborator

Ah i'll look into the coverage token issue tomorrow!

@scovich
Copy link
Collaborator Author

scovich commented Sep 27, 2024

Ah i'll look into the coverage token issue tomorrow!

Must have been some weird timing race -- a retry succeeded immediately.

Copy link
Collaborator

@hntd187 hntd187 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are some impressive tests I have to say. I'm good with all of this, my preference though is that we wire it up in this PR otherwise it's just dead code until that PR lands and we know priorities change and such. But I don't have a strong enough opinion to hold up.

@zachschuermann zachschuermann self-requested a review September 27, 2024 20:55
@scovich
Copy link
Collaborator Author

scovich commented Sep 27, 2024

my preference though is that we wire it up in this PR otherwise it's just dead code until that PR lands and we know priorities change and such. But I don't have a strong enough opinion to hold up.

I started down that path, but it's a big enough change of its own (and needing tests of its own) that I ended up swinging the other way. All the actual parquet reader logic has been split into a separate PR #362, and this PR is now completely self-contained and tested. We can keep iterating on the other PR while this one merges.

@scovich scovich changed the title Implement parquet row group skipping in the default client Utility trait for stats-based skipping logic Sep 27, 2024
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, wow that's quite a test suite :)

left a few comments but I'm generally good with merging this as is and then wiring it up later.

Higher level comment. It's great that you've extracted out the stuff needed from the engine so that this PR doesn't need to reference arrow. We have a few other things (and will have more for say variant) that fall into this category of "you might want this in your engine no matter what your data format is, so here's some utilities to help you, just implement these traits".

I'm wondering if we want to think about creating a module for that either as a sub-module of engine (engine/utils?), or as an engine_utils standalone module. Probably not in this PR though.

kernel/src/engine/parquet_stats_skipping.rs Outdated Show resolved Hide resolved
Some(skip != inverted)
};
match op {
// Given `col == val`:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the comments here, will be useful when debugging :)

kernel/src/engine/parquet_stats_skipping.rs Outdated Show resolved Hide resolved
kernel/src/engine/parquet_stats_skipping.rs Outdated Show resolved Hide resolved
fn test_binary_lt_ge() {
use BinaryOperator::*;

const LO: Scalar = Scalar::Long(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use some other scalar types between say the eq_be tests and the lt_gt ones? Probably should be minimal change here but we can exercise the scalar stuff for more than just Long

Copy link
Collaborator Author

@scovich scovich Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only trying to verify the conversion regular predicate to data skipping predicate. The different scalar comparisons (by both matched and mismatched types) are already exercised exhaustively by test_binary_scalars.

Is there a particular corner case you worry about, that would make the data skipping code misbehave based on the type of scalar involved?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Added a negative test, where the literal and column types mismatch.

//! An implementation of data skipping that leverages parquet stats from the file footer.
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
use crate::schema::DataType;
use parquet::schema::types::ColumnPath;
Copy link
Collaborator Author

@scovich scovich Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From #357 (review):

It's great that you've extracted out the stuff needed from the engine so that this PR doesn't need to reference arrow.

We do still have this one dependency on arrow-parquet ColumnPath. But we already knew we needed to define a similar struct in kernel, in order to support nested column paths. Once that struct exists, we can use it instead and push the dependency back into the concrete implementation that anyway has to know about arrow-parquet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also added it as a code comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added #379

@scovich
Copy link
Collaborator Author

scovich commented Oct 1, 2024

re

Those are some impressive tests I have to say.
and
wow that's quite a test suite :)

I know from past bad experience that this is NOT fun stuff to debug. Better to test it near-exhaustively up front and hopefully save some of that pain later.

@scovich scovich requested a review from nicklan October 1, 2024 16:46
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. thanks!

@scovich scovich merged commit 092ee67 into delta-io:main Oct 3, 2024
12 checks passed
OussamaSaoudi-db pushed a commit to OussamaSaoudi-db/delta-kernel-rs that referenced this pull request Oct 7, 2024
Parquet footer stats allow data skipping, very similar to Delta file
stats. Except parquet isn't quite as convenient to work with and
arrow-parquet doesn't even try to help (it can't, because arrow-compute
expressions are opaque, so there's no way to traverse and rewrite them
into stats-based skipping predicates).

We implement row group skipping support by traversing the same push-down
predicate that delta-kernel already uses to extract a for Delta file
skipping predicate. But instead of rewriting the expression, we evaluate
it bottom-up (no-copy, O(n) work where n is the number of nodes in the
expression).

This PR does not attempt to actually incorporate the new skipping logic
into the default reader. That (plus testing the integration) should
be a follow-up PR.
Comment on lines +232 to +234
pub fn null_literal(data_type: DataType) -> Self {
Self::Literal(Scalar::Null(data_type))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah found it - I can use this in the write PR :)

scovich added a commit that referenced this pull request Oct 9, 2024
…362)

Previous PR #357
implemented the logic of stats-based skipping for a parquet reader, but
in abstract form that doesn't actually depend on parquet footers. With
that in place, we can now wire up the kernel default parquet readers to
use row group skipping.

Also fixes
#380.
@scovich scovich deleted the row-group-skipping branch November 8, 2024 21:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants