-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399
Changes from all commits
c5f1123
e70dc76
7b5f884
16f3ca3
b658aaa
7c17dd1
569e966
326ea24
92dfc55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { | |
l, | ||
projects, | ||
filters, | ||
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil | ||
(requestedColumns, allPredicates, _) => | ||
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil | ||
|
||
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) => | ||
pruneFilterProject( | ||
|
@@ -266,47 +267,81 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { | |
relation, | ||
projects, | ||
filterPredicates, | ||
(requestedColumns, pushedFilters) => { | ||
scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray) | ||
(requestedColumns, _, pushedFilters) => { | ||
scanBuilder(requestedColumns, pushedFilters.toArray) | ||
}) | ||
} | ||
|
||
// Based on Catalyst expressions. | ||
// Based on Catalyst expressions. The `scanBuilder` function accepts three arguments: | ||
// | ||
// 1. A `Seq[Attribute]`, containing all required column attributes. Used to handle relation | ||
// traits that support column pruning (e.g. `PrunedScan` and `PrunedFilteredScan`). | ||
// | ||
// 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, only used for | ||
// `CatalystScan`. | ||
// | ||
// 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a | ||
// subset of) Catalyst filter expressions and can be handled by `relation`. Used to handle | ||
// relation traits (`CatalystScan` excluded) that support filter push-down (e.g. | ||
// `PrunedFilteredScan` and `HadoopFsRelation`). | ||
// | ||
// Note that 2 and 3 shouldn't be used together. | ||
protected def pruneFilterProjectRaw( | ||
relation: LogicalRelation, | ||
projects: Seq[NamedExpression], | ||
filterPredicates: Seq[Expression], | ||
scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = { | ||
relation: LogicalRelation, | ||
projects: Seq[NamedExpression], | ||
filterPredicates: Seq[Expression], | ||
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = { | ||
|
||
val projectSet = AttributeSet(projects.flatMap(_.references)) | ||
val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) | ||
val filterCondition = filterPredicates.reduceLeftOption(expressions.And) | ||
|
||
val pushedFilters = filterPredicates.map { _ transform { | ||
val candidatePredicates = filterPredicates.map { _ transform { | ||
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. | ||
}} | ||
|
||
val (unhandledPredicates, pushedFilters) = | ||
selectFilters(relation.relation, candidatePredicates) | ||
|
||
// A set of column attributes that are only referenced by pushed down filters. We can eliminate | ||
// them from requested columns. | ||
val handledSet = { | ||
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) | ||
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) | ||
AttributeSet(handledPredicates.flatMap(_.references)) -- | ||
(projectSet ++ unhandledSet).map(relation.attributeMap) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add comments? |
||
|
||
// Combines all Catalyst filter `Expression`s that are either not convertible to data source | ||
// `Filter`s or cannot be handled by `relation`. | ||
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) | ||
|
||
if (projects.map(_.toAttribute) == projects && | ||
projectSet.size == projects.size && | ||
filterSet.subsetOf(projectSet)) { | ||
// When it is possible to just use column pruning to get the right projection and | ||
// when the columns of this projection are enough to evaluate all filter conditions, | ||
// just do a scan followed by a filter, with no extra project. | ||
val requestedColumns = | ||
projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. | ||
.map(relation.attributeMap) // Match original case of attributes. | ||
val requestedColumns = projects | ||
// Safe due to if above. | ||
.asInstanceOf[Seq[Attribute]] | ||
// Match original case of attributes. | ||
.map(relation.attributeMap) | ||
// Don't request columns that are only referenced by pushed filters. | ||
.filterNot(handledSet.contains) | ||
|
||
val scan = execution.PhysicalRDD.createFromDataSource( | ||
projects.map(_.toAttribute), | ||
scanBuilder(requestedColumns, pushedFilters), | ||
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At here, it is not really necessary to pass in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I understand what's going on. Actually, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah right, it's a little bit tricky. Adding comment to explain this. |
||
relation.relation) | ||
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) | ||
} else { | ||
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq | ||
// Don't request columns that are only referenced by pushed filters. | ||
val requestedColumns = | ||
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq | ||
|
||
val scan = execution.PhysicalRDD.createFromDataSource( | ||
requestedColumns, | ||
scanBuilder(requestedColumns, pushedFilters), | ||
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
relation.relation) | ||
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) | ||
} | ||
|
@@ -334,11 +369,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { | |
} | ||
|
||
/** | ||
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s, | ||
* and convert them. | ||
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. | ||
* | ||
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. | ||
*/ | ||
protected[sql] def selectFilters(filters: Seq[Expression]) = { | ||
def translate(predicate: Expression): Option[Filter] = predicate match { | ||
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { | ||
predicate match { | ||
case expressions.EqualTo(a: Attribute, Literal(v, t)) => | ||
Some(sources.EqualTo(a.name, convertToScala(v, t))) | ||
case expressions.EqualTo(Literal(v, t), a: Attribute) => | ||
|
@@ -387,16 +423,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { | |
Some(sources.IsNotNull(a.name)) | ||
|
||
case expressions.And(left, right) => | ||
(translate(left) ++ translate(right)).reduceOption(sources.And) | ||
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) | ||
|
||
case expressions.Or(left, right) => | ||
for { | ||
leftFilter <- translate(left) | ||
rightFilter <- translate(right) | ||
leftFilter <- translateFilter(left) | ||
rightFilter <- translateFilter(right) | ||
} yield sources.Or(leftFilter, rightFilter) | ||
|
||
case expressions.Not(child) => | ||
translate(child).map(sources.Not) | ||
translateFilter(child).map(sources.Not) | ||
|
||
case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => | ||
Some(sources.StringStartsWith(a.name, v.toString)) | ||
|
@@ -409,7 +445,52 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { | |
|
||
case _ => None | ||
} | ||
} | ||
|
||
/** | ||
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s | ||
* and can be handled by `relation`. | ||
* | ||
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst | ||
* predicate [[Expression]]s that are either not convertible or cannot be handled by | ||
* `relation`. The second element contains all converted data source [[Filter]]s that can | ||
* be handled by `relation`. | ||
*/ | ||
protected[sql] def selectFilters( | ||
relation: BaseRelation, | ||
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { | ||
|
||
// For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are | ||
// called `predicate`s, while all data source filters of type `sources.Filter` are simply called | ||
// `filter`s. | ||
|
||
val translated: Seq[(Expression, Filter)] = | ||
for { | ||
predicate <- predicates | ||
filter <- translateFilter(predicate) | ||
} yield predicate -> filter | ||
|
||
// A map from original Catalyst expressions to corresponding translated data source filters. | ||
val translatedMap: Map[Expression, Filter] = translated.toMap | ||
|
||
// Catalyst predicate expressions that cannot be translated to data source filters. | ||
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains) | ||
|
||
// Data source filters that cannot be handled by `relation` | ||
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet | ||
|
||
val (unhandled, handled) = translated.partition { | ||
case (predicate, filter) => | ||
unhandledFilters.contains(filter) | ||
} | ||
|
||
// Catalyst predicate expressions that can be translated to data source filters, but cannot be | ||
// handled by `relation`. | ||
val (unhandledPredicates, _) = unhandled.unzip | ||
|
||
// Translated data source filters that can be handled by `relation` | ||
val (_, handledFilters) = handled.unzip | ||
|
||
filters.flatMap(translate) | ||
(unrecognizedPredicates ++ unhandledPredicates, handledFilters) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not obvious that we need both
Seq[Expression]
andSeq[Filter]
. Can you add comments to explain what are these?