-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Allow nested is_in()
in when()/then()
for full-streaming
#20052
fix: Allow nested is_in()
in when()/then()
for full-streaming
#20052
Conversation
is_in()
in when()/then()
for streaming sinkis_in()
in when()/then()
for full-streaming
pub(crate) fn groups_sensitive(&self) -> bool { | ||
|
||
/// Checks whether this expression is elementwise. This only checks the top level expression. | ||
pub(crate) fn is_elementwise_top_level(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_elementwise_top_level()
replaces groups_sensitive()
. It is also changed to consider Explode
and Filter
as non-elementwise
} | ||
/// Checks if the top-level expression node is elementwise. If this is the case, then `stack` will | ||
/// be extended further with any nested expression nodes. | ||
pub fn is_elementwise(stack: &mut Vec<Node>, ae: &AExpr, expr_arena: &Arena<AExpr>) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Base is_elementwise()
function - recursion is done through a nodes stack
which allows the caller to inspect the exprs.
} | ||
if matches!(expr_arena.get(rhs), AExpr::Literal { .. }) { | ||
stack.push_node(input.first().unwrap().node()); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix for the when(A.is_in(B))
issue - we don't traverse into RHS literals of is_in()
during recursive checking. We had this already in predicate pushdown but it has been moved here.
.. | ||
} => { | ||
assert!(options.is_elementwise()); | ||
opts.strict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule was moved from new-streaming below
if cfg!(debug_assertions) { | ||
for v in acc_predicates.values() { | ||
let ae = expr_arena.get(v.node()); | ||
assert!(permits_filter_pushdown( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by - predicates that hit here should already satisfy these requirements - otherwise they should have been caught at the Filter
node
options.collect_groups, | ||
ApplyOptions::ElementWise | ApplyOptions::ApplyList | ||
), | ||
Context::Aggregation => matches!(options.collect_groups, ApplyOptions::ElementWise), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function was never called with Context::Aggregation
.contains(FunctionFlags::CHANGES_LENGTH | FunctionFlags::RETURNS_SCALAR) | ||
matches!( | ||
self.collect_groups, | ||
ApplyOptions::ElementWise | ApplyOptions::ApplyList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this, but we apparently consider ApplyList
as elementwise according to the test suite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should see when we hit this. I believe only in a few python udf cases. I think we would never hit the streaming engine with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is hit by map_elements()
/ map_batches(.., agg_list=True)
. This also hits the streaming engines because FunctionOptions::is_elementwise()
gets used by all of the engines during physical plan creation / IR lowering (indirectly through fn is_elementwise(_rec)
).
I don't think ApplyList
is strictly elementwise, but both the in-memory and existing streaming engine currently expect it to be identified as such, so I've made it the default here. For the new-streaming engine I added an override below.
8adc9dd
to
55ec6b6
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #20052 +/- ##
==========================================
- Coverage 79.52% 79.52% -0.01%
==========================================
Files 1563 1563
Lines 217194 217173 -21
Branches 2464 2464
==========================================
- Hits 172729 172701 -28
- Misses 43905 43912 +7
Partials 560 560 ☔ View full report in Codecov by Sentry. |
a57466f
to
66c30d4
Compare
Alias(_, _) | BinaryExpr { .. } | Column(_) | Ternary { .. } | Cast { .. } => true, | ||
|
||
Agg { .. } | ||
| Explode(_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter, gather, sort and explode don't seem elementwise. They are group sensitive but not elementwise. Maybe this is an unlucky name?
.contains(FunctionFlags::CHANGES_LENGTH | FunctionFlags::RETURNS_SCALAR) | ||
matches!( | ||
self.collect_groups, | ||
ApplyOptions::ElementWise | ApplyOptions::ApplyList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should see when we hit this. I believe only in a few python udf cases. I think we would never hit the streaming engine with this.
66c30d4
to
5c45bf1
Compare
Fixes #15767
This PR also does some refactoring to consolidate and improve the way we identify expressions as elementwise.
Replacing
streamable
withis_elementwise
A lot of places where we currently use the
streamable
terminology actually needed to use an even stricterelementwise
requirement (e.g. filter / slice pushdown.streamable
expressions include all elementwise expressions and more. For example,filter()
/explode()
expressions are streamable, but they are not elementwise.Where this matters is if the result of a streamable expression needs to be projected next to other result columns, which is almost always the case. The existing in-memory engine and evaluators cannot do this properly as it projects the result columns independently within every chunk, which could lead to height mismatches. The only place that properly supports this is the new-streaming engine, where the
ZipNode
performs the projection properly.