-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399
Conversation
Test build #44768 has finished for PR 9399 at commit
|
relation: LogicalRelation, | ||
projects: Seq[NamedExpression], | ||
filterPredicates: Seq[Expression], | ||
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not obvious that we need both Seq[Expression]
and Seq[Filter]
. Can you add comments to explain what are these?
Test build #44776 has finished for PR 9399 at commit
|
One more consideration for this improvement, as we probably need to optimize the filters by folding the expression, as the partition keys are actually are the constant value in execution, simply adding the |
@chenghao-intel Can you give an example showing |
Actually I am talking that it probably give us some troubles in getting the Sorry if I missed something. |
@chenghao-intel Could you please give an example? |
Oh, for example: let's say we have the table src (key, value) partition (p1) And we assume the p1 candidates are 10, 100, and the I mean it will be confused to the new data source developers, how to define the On the other hand, I am not sure if it's really necessary to expose the |
Sorry, I am challenging this as it's about the API, which probably difficult to change back once it's released, and we'd better think further, by adding the partition key cases. |
@chenghao-intel Thanks for the comment. That's a good point and I didn't consider this situation when writing this PR. However, fortunately we don't even try to push down predicates that reference any partition columns (see here). In general, when implementing a data source, developers shouldn't worry about partitioning. A |
fec7d25
to
b658aaa
Compare
Test build #44811 has finished for PR 9399 at commit
|
Test build #44814 has finished for PR 9399 at commit
|
Ok, actually I was planning to optimize the expression with partition key, which will introduce the I know, for DataSource developer, the |
|
Ok, thanks for explanation. |
// 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, used by | ||
// `CatalystScan`. | ||
// 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a | ||
// subset of) Catalyst filter expressions and can be handled by `relation`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, Seq[Expression]
is used for data source that understand catalyst expressions. Seq[Filter]
is used for data sources that only understand Filter
API? If so, can we make it clear that 2
and 3
will not not used together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first Seq[Expression]
argument is only used to handle CatalystScan
, which is only left for experimenting purposes, no built-in concrete data sources implement CatalystScan
now. The second Seq[Filter]
argument is used to handle all other relation traits that support filter push-down, e.g. PrunedFilteredScan
and HadoopFsRelation
. Added comments to explain this.
|
||
def testPushDown(sqlString: String, expectedCount: Int): Unit = { | ||
def testPushDown( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add checks to make sure the Filter
operator added by Spark SQL only contains unhandled predicates, unconvertible predicates, and predicates involving partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleFilteredScan
doesn't support partitioning. Updated SimpleTextRelation
to make it support column pruning and filter push-down, and implemented unhandledFilters
there to add these tests.
Overall LGTM. Once we update the |
ddac7ac
to
326ea24
Compare
retest this please |
test this please |
Test build #44928 has finished for PR 9399 at commit
|
Test build #44927 has finished for PR 9399 at commit
|
I will merge it once it passes jenkins. Let's have a test to make sure those handled filters will not show up in the Filter operator. |
Test build #44933 has finished for PR 9399 at commit
|
Thanks! Merging! |
Let's also have some test cases that having a column that is used in handled filters as well as in unhandled/unconvertible filters. |
Test build #44934 has finished for PR 9399 at commit
|
This PR adds test cases that test various column pruning and filter push-down cases. Author: Cheng Lian <lian@databricks.com> Closes #9468 from liancheng/spark-10978.follow-up. (cherry picked from commit c048929) Signed-off-by: Yin Huai <yhuai@databricks.com>
This PR adds test cases that test various column pruning and filter push-down cases. Author: Cheng Lian <lian@databricks.com> Closes #9468 from liancheng/spark-10978.follow-up.
Spark 1.6 extends `BaseRelation` with a new API which allows data sources to tell Spark which filters they handle, allowing Spark to eliminate its own defensive filtering for filters that are handled by the data source (see apache/spark#9399 for more details). This patch implements this new API in `spark-redshift` and adds tests. Author: Josh Rosen <joshrosen@databricks.com> Closes #128 from JoshRosen/support-filter-skipping-in-spark-1.6.
This PR adds a new method
unhandledFilters
toBaseRelation
. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.