Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399

Closed

Conversation

liancheng
Copy link
Contributor

This PR adds a new method unhandledFilters to BaseRelation. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.

@SparkQA
Copy link

SparkQA commented Nov 1, 2015

Test build #44768 has finished for PR 9399 at commit 16f3ca3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not obvious that we need both Seq[Expression] and Seq[Filter]. Can you add comments to explain what are these?

@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44776 has finished for PR 9399 at commit fec7d25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor

One more consideration for this improvement, as we probably need to optimize the filters by folding the expression, as the partition keys are actually are the constant value in execution, simply adding the unhandledFilters probably does not work for partition based data source. So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan.

@yhuai
Copy link
Contributor

yhuai commented Nov 2, 2015

@chenghao-intel Can you give an example showing unhandledFilters is insufficient? Also, regarding "So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan", can you explain it?

@chenghao-intel
Copy link
Contributor

Actually I am talking that it probably give us some troubles in getting the unhandledFilters if we planned to optimize the cases where partition keys combined in the filter, as the partition key will be constant value during the execution for EACH PARTITION, and we may not able to make it filterable in planning stage, at least the code will be more complicated, and besides, I don't see too much benefit if we exposed the unhandledFilters for DataSourceStrategy, so I am suggesting if we can leave the unhandledFilters for BaseRelation.buildScan, as unhandledFilters can be considered as a private/protected method in BaseRelation, or we can provide some common operation helper functions to simplify the implementation for the new data source developers.

Sorry if I missed something.

@liancheng
Copy link
Contributor Author

@chenghao-intel Could you please give an example?

@chenghao-intel
Copy link
Contributor

Oh, for example: let's say we have the table src (key, value) partition (p1)
For the query like "SELECT value FROM src WHERE key > p1",

And we assume the p1 candidates are 10, 100, and the key range is (0, 50).
-- unhandledFilter = Array.empty
This probably fail in key > 10 (p1 = 10), as we may not able to filter records during the scan, before we taking out all of the records, or in buildScan, we should add an extra filter operation on RDD[Row].
-- unhandledFilter = key > p1
We will loss the optimization for partition (p1 = 100), since the concrete filter is key > 100, and we should always return RDD[Row].empty, as the range of key is (0, 50).

I mean it will be confused to the new data source developers, how to define the unhandledFilter. as the partition key is not treated like the normal attributes, at least it requires more work in getting the concrete value and multiple filter in the planning stage for different partition keys, what's the unhandledFilter supposed to retrieve?

On the other hand, I am not sure if it's really necessary to expose the unhandledFilter, as it's will be new API for data source that the developer should be aware for optimization purpose, but, we we pass down the filters via API def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] and its variants already. Splitting the filter expressions into 2 parts, and executed in different operators (DataSourceStrategy and DataSource impelementation) seems making thing more complicated, despite we will do the splitting in the data source implementation, but probably not wise enough to expose that externally.

@chenghao-intel
Copy link
Contributor

Sorry, I am challenging this as it's about the API, which probably difficult to change back once it's released, and we'd better think further, by adding the partition key cases.

@liancheng
Copy link
Contributor Author

@chenghao-intel Thanks for the comment. That's a good point and I didn't consider this situation when writing this PR. However, fortunately we don't even try to push down predicates that reference any partition columns (see here). In general, when implementing a data source, developers shouldn't worry about partitioning. A HadoopFsRelation data source is only responsible for returning data within a single partition, the query planner does the rest including partition pruning. So I think this is fine?

@liancheng liancheng force-pushed the spark-10978.unhandled-filters branch from fec7d25 to b658aaa Compare November 2, 2015 13:03
@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44811 has finished for PR 9399 at commit b658aaa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44814 has finished for PR 9399 at commit 7c17dd1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor

Ok, actually I was planning to optimize the expression with partition key, which will introduce the ConstantFolding, as the partition key will be a constant value in runtime.

I know, for DataSource developer, the HadoopFsRelation will only return the single partition data as the RDD, that's what my question, how to define the unhandledFilter, will the filter with partition key always be part of the unhandledFilter?

@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

unhandledFilter will not see filters using partitioning columns.

@chenghao-intel
Copy link
Contributor

Ok, thanks for explanation.

// 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, used by
// `CatalystScan`.
// 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a
// subset of) Catalyst filter expressions and can be handled by `relation`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, Seq[Expression] is used for data source that understand catalyst expressions. Seq[Filter] is used for data sources that only understand Filter API? If so, can we make it clear that 2 and 3 will not not used together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first Seq[Expression] argument is only used to handle CatalystScan, which is only left for experimenting purposes, no built-in concrete data sources implement CatalystScan now. The second Seq[Filter] argument is used to handle all other relation traits that support filter push-down, e.g. PrunedFilteredScan and HadoopFsRelation. Added comments to explain this.


def testPushDown(sqlString: String, expectedCount: Int): Unit = {
def testPushDown(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add checks to make sure the Filter operator added by Spark SQL only contains unhandled predicates, unconvertible predicates, and predicates involving partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleFilteredScan doesn't support partitioning. Updated SimpleTextRelation to make it support column pruning and filter push-down, and implemented unhandledFilters there to add these tests.

@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

Overall LGTM. Once we update the FilteredScanSuite, we are good to go.

@liancheng liancheng force-pushed the spark-10978.unhandled-filters branch from ddac7ac to 326ea24 Compare November 3, 2015 14:52
@liancheng
Copy link
Contributor Author

retest this please

@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

test this please

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44928 has finished for PR 9399 at commit 326ea24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44927 has finished for PR 9399 at commit ddac7ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

I will merge it once it passes jenkins. Let's have a test to make sure those handled filters will not show up in the Filter operator.

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44933 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

Thanks! Merging!

@asfgit asfgit closed this in ebf8b0b Nov 3, 2015
@yhuai
Copy link
Contributor

yhuai commented Nov 3, 2015

Let's also have some test cases that having a column that is used in handled filters as well as in unhandled/unconvertible filters.

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44934 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng liancheng deleted the spark-10978.unhandled-filters branch November 4, 2015 00:54
liancheng added a commit to liancheng/spark that referenced this pull request Nov 4, 2015
asfgit pushed a commit that referenced this pull request Nov 6, 2015
This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes #9468 from liancheng/spark-10978.follow-up.

(cherry picked from commit c048929)
Signed-off-by: Yin Huai <yhuai@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 6, 2015
This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes #9468 from liancheng/spark-10978.follow-up.
JoshRosen added a commit to databricks/spark-redshift that referenced this pull request Nov 24, 2015
Spark 1.6 extends `BaseRelation` with a new API which allows data sources to tell Spark which filters they handle, allowing Spark to eliminate its own defensive filtering for filters that are handled by the data source (see apache/spark#9399 for more details).

This patch implements this new API in `spark-redshift` and adds tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #128 from JoshRosen/support-filter-skipping-in-spark-1.6.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants