Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement row group skipping for the default engine parquet readers #362

Merged
merged 33 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
715f233
WIP - first pass at the code
ryan-johnson-databricks Sep 25, 2024
ef71f1a
split out a trait, add more type support
ryan-johnson-databricks Sep 25, 2024
39b8927
support short circuit junction eval
ryan-johnson-databricks Sep 25, 2024
b5c3a52
Merge remote-tracking branch 'oss/main' into row-group-skipping
scovich Sep 25, 2024
e71571e
add tests, fix bugs
scovich Sep 26, 2024
cbca3b3
support SQL WHERE semantics, finished adding tests for skipping logic
scovich Sep 27, 2024
e7d87eb
Mark block text as not rust code doctest should run
scovich Sep 27, 2024
beeb6e8
add missing tests identified by codecov
scovich Sep 27, 2024
519acbd
Wire up row group skipping
scovich Sep 27, 2024
18b33cf
delete for split - parquet reader uses row group skipping
scovich Sep 27, 2024
6c98441
parquet reader now uses row group skipping
scovich Sep 27, 2024
0fdaf0a
add stats-getter test; review comments
scovich Oct 3, 2024
8ac33f8
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 3, 2024
1cf03dc
improve test coverage; clippy
scovich Oct 3, 2024
bc8b344
yet more test coverage
scovich Oct 3, 2024
0971002
improve test coverage even more
scovich Oct 4, 2024
375a380
Add a query level test as well
scovich Oct 4, 2024
6236874
Fix broken sync json parsing and harmonize file reading
scovich Oct 4, 2024
9efcbf7
fmt
scovich Oct 4, 2024
46d19e3
remove spurious TODO
scovich Oct 7, 2024
7666512
Revert "Fix broken sync json parsing and harmonize file reading"
scovich Oct 7, 2024
f3865d0
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 7, 2024
a4dc3da
review comments
scovich Oct 7, 2024
40131db
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 8, 2024
bf65904
Infer null count stat for missing columns; add more tests
scovich Oct 8, 2024
cce762d
One last test
scovich Oct 8, 2024
c7d6bb0
test cleanup
scovich Oct 8, 2024
4f92ed7
code comment tweak
scovich Oct 8, 2024
08a305b
remove unneeded test
scovich Oct 8, 2024
e8a947e
Merge remote-tracking branch 'oss' into use-row-group-skipping
scovich Oct 9, 2024
bf1e3a8
fix two nullcount stat bugs
scovich Oct 9, 2024
9d632e7
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 9, 2024
4a77f3a
review nits
scovich Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,15 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> {
// Parquet nullcount stats always have the same type (u64), so we can directly return the value
// instead of wrapping it in a Scalar. We can safely cast it from u64 to i64, because the
// nullcount can never be larger than the rowcount, and the parquet rowcount stat is i64.
//
// NOTE: Stats for any given column are optional, which may produce a NULL nullcount. But if
// the column itself is missing, then we know all values are implied to be NULL.
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> {
Some(self.get_stats(col)?.null_count_opt()? as i64)
let nullcount = match self.get_stats(col) {
Some(s) => s.null_count_opt()? as i64,
None => self.get_rowcount_stat_value(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new find, exposed on accident by me hacking two more parts into the checkpoint so we could test transaction app id filtering (the "checkpoint" schema was truncated, which prevented the P&M query from skipping those parts)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the statistics() method on ColumnChunkMetadata returns None, that just means that there are no stats for that column, but doesn't necessarily imply that all values are null does it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch. I didn't put the check deep enough. There are three levels of None here:

  • The column chunk doesn't even exist (infer nullcount = rowcount)
  • The column chunk doesn't have stats (should not infer anything clever)
  • The stats object lacks a particular stat

To make things even more "fun", we have the following warning in Statistics::null_count_opt 🤦:

this API returns Some(0) even if the null count was not present in the statistics

So I have two problems to work around now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both fixed.

};
Some(nullcount)
}

fn get_rowcount_stat_value(&self) -> i64 {
Expand All @@ -200,7 +207,7 @@ pub(crate) fn compute_field_indices(
let mut recurse = |expr| do_recurse(expr, cols); // simplifies the call sites below
match expression {
Literal(_) => {}
Column(name) => cols.extend([col_name_to_path(name)]),
Column(name) => cols.extend([col_name_to_path(name)]), // returns `()`, unlike `insert`
Struct(fields) => fields.iter().for_each(recurse),
UnaryOperation { expr, .. } => recurse(expr),
BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)),
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ impl Expression {
Self::unary(UnaryOperator::IsNull, self)
}

/// Create a new expression `self IS NOT NULL`
pub fn is_not_null(self) -> Self {
scovich marked this conversation as resolved.
Show resolved Hide resolved
!Self::is_null(self)
}

/// Create a new expression `self == other`
pub fn eq(self, other: Self) -> Self {
Self::binary(BinaryOperator::Equal, self, other)
Expand Down
86 changes: 73 additions & 13 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,26 +217,29 @@ impl Scan {
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>>> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
let log_iter = self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
None,
)?;

Ok(scan_action_iter(
engine,
log_iter,
self.replay_for_scan_data(engine)?,
&self.logical_schema,
&self.predicate,
))
}

// Factored out to facilitate testing
fn replay_for_scan_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
self.snapshot
.log_segment
.replay(engine, commit_read_schema, checkpoint_read_schema, None)
}

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
/// only be called once per scan.
pub fn global_scan_state(&self) -> GlobalScanState {
Expand Down Expand Up @@ -716,6 +719,63 @@ mod tests {
}
}

#[test]
fn test_replay_for_scan_data() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let data: Vec<_> = scan
.replay_for_scan_data(&engine)
.unwrap()
.try_collect()
.unwrap();
// No predicate pushdown attempted, because at most one part of a multi-part checkpoint
// could be skipped when looking for adds/removes.
assert_eq!(data.len(), 5);
}

#[test]
fn test_data_row_group_skipping() {
scovich marked this conversation as resolved.
Show resolved Hide resolved
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());

// No predicate pushdown attempted, so the one data file should be returned.
let scan = snapshot.clone().scan_builder().build().unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Ineffective predicate pushdown attempted, so the one data file should be returned.
let int_col = Expression::column("numeric.ints.int32");
let value = Expression::literal(1000i32);
let predicate = int_col.clone().gt(value.clone());
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Effective predicate pushdown, so no data files should be returned.
let predicate = int_col.lt(value);
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 0);
}

#[test_log::test]
fn test_scan_with_checkpoint() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
63 changes: 53 additions & 10 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//!

use std::cmp::Ordering;
use std::ops::Not;
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -78,15 +77,7 @@ impl LogSegment {
}

fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<Option<(Metadata, Protocol)>> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let meta_predicate = Some(Expr::or(
Expr::not(Expr::is_null(Expr::column("metaData.id"))),
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))),
));
// read the same protocol and metadata schema for both commits and checkpoints
let data_batches = self.replay(engine, schema.clone(), schema, meta_predicate)?;
let data_batches = self.replay_for_metadata(engine)?;
let mut metadata_opt: Option<Metadata> = None;
let mut protocol_opt: Option<Protocol> = None;
for batch in data_batches {
Expand All @@ -109,6 +100,22 @@ impl LogSegment {
_ => Err(Error::MissingMetadataAndProtocol),
}
}

// Factored out to facilitate testing
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let meta_predicate = Some(Expr::or(
Expr::column("metaData.id").is_not_null(),
Expr::column("protocol.minReaderVersion").is_not_null(),
));
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, meta_predicate)
}
}

// TODO expose methods for accessing the files of a table (with file pruning).
Expand Down Expand Up @@ -175,6 +182,10 @@ impl Snapshot {
if let Some(version) = version {
commit_files.retain(|log_path| log_path.version <= version);
}
// only keep commit files above the checkpoint we found
if let Some(checkpoint_file) = checkpoint_files.first() {
scovich marked this conversation as resolved.
Show resolved Hide resolved
commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

// get the effective version from chosen files
let version_eff = commit_files
Expand Down Expand Up @@ -452,6 +463,7 @@ mod tests {
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::schema::StructType;
use crate::Table;

#[test]
fn test_snapshot_read_metadata() {
Expand Down Expand Up @@ -623,6 +635,37 @@ mod tests {
assert!(invalid.is_none())
}

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
// that the parquet reader properly infers nullcount = rowcount for missing columns. The two
// checkpoint part files that contain transaction app ids have truncated schemas that would
// otherwise fail skipping due to their missing nullcount stat:
//
// Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B
// --------------------------------------------------------------------------------
// type nulls min / max
// txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..."
// txn.version INT64 0 "4390" / "4390"
#[test]
fn test_replay_for_metadata() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An accidentally clever test :P
(see other comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();
// The checkpoint has five parts, each containing one action. The P&M come from first and
// third parts, respectively. The parquet reader skips the second part; it would also skip
// the last two parts, but the actual `read_metadata` will anyway skip them because it
// terminates the iteration immediately after finding both P&M.
assert_eq!(data.len(), 2);
}

#[test_log::test]
fn test_read_table_with_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
84 changes: 71 additions & 13 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, TRANSACTION_NAME};
use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::Engine;
use crate::{actions::Transaction, DeltaResult};
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef};

pub use crate::actions::visitors::TransactionMap;
pub struct TransactionScanner {
Expand All @@ -22,17 +21,9 @@ impl TransactionScanner {
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<TransactionMap> {
let schema = get_log_schema().project(&[TRANSACTION_NAME])?;

let schema = Self::get_txn_schema()?;
let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned()));

// when all ids are requested then a full scan of the log to the latest checkpoint is required
let iter =
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema.clone(), None)?;

for maybe_data in iter {
for maybe_data in self.replay_for_app_ids(engine, schema.clone(), application_id)? {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
// if a specific id is requested and a transaction was found, then return
Expand All @@ -44,6 +35,29 @@ impl TransactionScanner {
Ok(visitor.transactions)
}

// Factored out to facilitate testing
fn get_txn_schema() -> DeltaResult<SchemaRef> {
get_log_schema().project(&[TRANSACTION_NAME])
}

// Factored out to facilitate testing
fn replay_for_app_ids(
&self,
engine: &dyn Engine,
schema: SchemaRef,
application_id: Option<&str>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// when all ids are requested then a full scan of the log to the latest checkpoint is required
let app_id_col = Expr::column("txn.appId");
let meta_predicate = match application_id {
Some(id) => app_id_col.eq(Expr::literal(id)),
None => app_id_col.is_not_null(),
};
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema, Some(meta_predicate))
}

/// Scan the Delta Log for the latest transaction entry of an application
pub fn application_transaction(
&self,
Expand All @@ -67,6 +81,7 @@ mod tests {
use super::*;
use crate::engine::sync::SyncEngine;
use crate::Table;
use itertools::Itertools;

fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
Expand Down Expand Up @@ -117,4 +132,47 @@ mod tests {
.as_ref()
);
}

#[test]
fn test_replay_for_app_ids() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn = TransactionScanner::new(snapshot.into());
let txn_schema = TransactionScanner::get_txn_schema().unwrap();

// The checkpoint has five parts, each containing one action. There are two app ids.
let data: Vec<_> = txn
.replay_for_app_ids(&engine, txn_schema.clone(), None)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 2);

let data: Vec<_> = txn
.replay_for_app_ids(
&engine,
txn_schema.clone(),
Some("3ae45b72-24e1-865a-a211-34987ae02f2a"),
)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);

// This one will not be found (missing character)
let data: Vec<_> = txn
.replay_for_app_ids(
&engine,
txn_schema,
Some("3ae45b72-24e1-865a-a211-34987ae02f2"),
)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 0);
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
{"commitInfo":{"timestamp":1728065844007,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"4959"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.2.1","txnId":"d46d4bca-ab50-4075-977f-80a5b3844afa"}}
{"add":{"path":"part-00000-b92e017a-50ba-4676-8322-48fc371c2b59-c000.snappy.parquet","partitionValues":{},"size":4959,"modificationTime":1728065843972,"dataChange":true,"stats":"{\"numRecords\":5}"}}
{"txn":{"appId":"3ae45b72-24e1-865a-a211-34987ae02f2a","version":4390}}
{"txn":{"appId":"b42b951f-f5d1-4f6e-be2a-0d11d1543029","version":1235}}
Loading
Loading