Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
pruneFilterProject(
Expand Down Expand Up @@ -266,47 +267,81 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
relation,
projects,
filterPredicates,
(requestedColumns, pushedFilters) => {
scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
(requestedColumns, _, pushedFilters) => {
scanBuilder(requestedColumns, pushedFilters.toArray)
})
}

// Based on Catalyst expressions.
// Based on Catalyst expressions. The `scanBuilder` function accepts three arguments:
//
// 1. A `Seq[Attribute]`, containing all required column attributes. Used to handle relation
// traits that support column pruning (e.g. `PrunedScan` and `PrunedFilteredScan`).
//
// 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, only used for
// `CatalystScan`.
//
// 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a
// subset of) Catalyst filter expressions and can be handled by `relation`. Used to handle
// relation traits (`CatalystScan` excluded) that support filter push-down (e.g.
// `PrunedFilteredScan` and `HadoopFsRelation`).
//
// Note that 2 and 3 shouldn't be used together.
protected def pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not obvious that we need both Seq[Expression] and Seq[Filter]. Can you add comments to explain what are these?


val projectSet = AttributeSet(projects.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition = filterPredicates.reduceLeftOption(expressions.And)

val pushedFilters = filterPredicates.map { _ transform {
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}

val (unhandledPredicates, pushedFilters) =
selectFilters(relation.relation, candidatePredicates)

// A set of column attributes that are only referenced by pushed down filters. We can eliminate
// them from requested columns.
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comments?


// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)

if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val requestedColumns =
projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
.map(relation.attributeMap) // Match original case of attributes.
val requestedColumns = projects
// Safe due to if above.
.asInstanceOf[Seq[Attribute]]
// Match original case of attributes.
.map(relation.attributeMap)
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)

val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At here, it is not really necessary to pass in candidatePredicates because a data source may reject some filters. We can just pass in the equivalent forms of pushedFilters, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean pushedFilters contains all filters in the Filter form that can be handled by the data source, why not change candidatePredicates to catalyst filters that can be handled by the data source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what's going on. Actually, pushedFilters also contains those filters that cannot be handled by a data source and candidatePredicates contains filters that cannot be converted to public Filter interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right, it's a little bit tricky. Adding comment to explain this.

relation.relation)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, pushedFilters),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
Expand Down Expand Up @@ -334,11 +369,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}

/**
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
* and convert them.
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def selectFilters(filters: Seq[Expression]) = {
def translate(predicate: Expression): Option[Filter] = predicate match {
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
predicate match {
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
Expand Down Expand Up @@ -387,16 +423,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
Some(sources.IsNotNull(a.name))

case expressions.And(left, right) =>
(translate(left) ++ translate(right)).reduceOption(sources.And)
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)

case expressions.Or(left, right) =>
for {
leftFilter <- translate(left)
rightFilter <- translate(right)
leftFilter <- translateFilter(left)
rightFilter <- translateFilter(right)
} yield sources.Or(leftFilter, rightFilter)

case expressions.Not(child) =>
translate(child).map(sources.Not)
translateFilter(child).map(sources.Not)

case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString))
Expand All @@ -409,7 +445,52 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

case _ => None
}
}

/**
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s
* and can be handled by `relation`.
*
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst
* predicate [[Expression]]s that are either not convertible or cannot be handled by
* `relation`. The second element contains all converted data source [[Filter]]s that can
* be handled by `relation`.
*/
protected[sql] def selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {

// For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
// called `predicate`s, while all data source filters of type `sources.Filter` are simply called
// `filter`s.

val translated: Seq[(Expression, Filter)] =
for {
predicate <- predicates
filter <- translateFilter(predicate)
} yield predicate -> filter

// A map from original Catalyst expressions to corresponding translated data source filters.
val translatedMap: Map[Expression, Filter] = translated.toMap

// Catalyst predicate expressions that cannot be translated to data source filters.
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)

// Data source filters that cannot be handled by `relation`
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet

val (unhandled, handled) = translated.partition {
case (predicate, filter) =>
unhandledFilters.contains(filter)
}

// Catalyst predicate expressions that can be translated to data source filters, but cannot be
// handled by `relation`.
val (unhandledPredicates, _) = unhandled.unzip

// Translated data source filters that can be handled by `relation`
val (_, handledFilters) = handled.unzip

filters.flatMap(translate)
(unrecognizedPredicates ++ unhandledPredicates, handledFilters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ abstract class BaseRelation {
* @since 1.4.0
*/
def needConversion: Boolean = true

/**
* Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation
* cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this
* data source relation.
*
* @since 1.6.0
*/
def unhandledFilters(filters: Array[Filter]): Array[Filter] = filters
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}.flatten
assert(analyzedPredicate.nonEmpty)

val selectedFilters = DataSourceStrategy.selectFilters(analyzedPredicate)
val selectedFilters = analyzedPredicate.flatMap(DataSourceStrategy.translateFilter)
assert(selectedFilters.nonEmpty)

selectedFilters.foreach { pred =>
Expand Down
Loading