Skip to content

Commit

Permalink
Merge pull request #2089 from jqnatividad/optimize_polars_optflags
Browse files Browse the repository at this point in the history
  • Loading branch information
jqnatividad authored Aug 28, 2024
2 parents 17ace73 + 44f2937 commit bb8b6e2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 63 deletions.
40 changes: 20 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ local-encoding = { git = "/~https://github.com/slonopotamus/local-encoding-rs", br
# BUILD NOTE: Be sure to set QSV_POLARS_REV below to the latest commit short hash or tag
# of polars/py-polars before building qsv. This allows us to show the polars rev/tag in --version.
# if we are using a release version of Rust Polars, leave QSV_POLARS_REV empty
# QSV_POLARS_REV=d12131a
# QSV_POLARS_REV=37a492e
# polars = { git = "/~https://github.com/pola-rs/polars", tag = "py-1.4.1" }
polars = { git = "/~https://github.com/pola-rs/polars", rev = "d12131a" }
polars = { git = "/~https://github.com/pola-rs/polars", rev = "37a492e" }


[features]
Expand Down
27 changes: 14 additions & 13 deletions src/cmd/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub fn polars_count_input(
low_memory: bool,
) -> Result<(u64, usize), crate::clitypes::CliError> {
use polars::{
lazy::frame::{LazyFrame, OptState},
lazy::frame::{LazyFrame, OptFlags},
prelude::*,
sql::SQLContext,
};
Expand Down Expand Up @@ -240,18 +240,19 @@ pub fn polars_count_input(
return Ok((count_regular, 0));
},
};
let mut optimization_state = OptState::default();
optimization_state |= OptState::PROJECTION_PUSHDOWN
| OptState::PREDICATE_PUSHDOWN
| OptState::CLUSTER_WITH_COLUMNS
| OptState::TYPE_COERCION
| OptState::SIMPLIFY_EXPR
| OptState::FILE_CACHING
| OptState::SLICE_PUSHDOWN
| OptState::COMM_SUBEXPR_ELIM
| OptState::FAST_PROJECTION
| OptState::STREAMING;
ctx.register("sql_lf", lazy_df.with_optimizations(optimization_state));
let optflags = OptFlags::from_bits_truncate(0)
| OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::FAST_PROJECTION
| OptFlags::STREAMING;
ctx.register("sql_lf", lazy_df.with_optimizations(optflags));
"SELECT COUNT(*) FROM sql_lf".to_string()
};

Expand Down
31 changes: 21 additions & 10 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,33 @@ impl JoinStruct {
JoinCoalesce::JoinSpecific
};

let mut optimization_state = polars::lazy::frame::OptState::default();
if self.streaming {
optimization_state |= OptState::STREAMING;
}
let mut optflags = OptFlags::from_bits_truncate(0);
if self.no_optimizations {
optimization_state = OptState::from_bits_truncate(0) | OptState::TYPE_COERCION;
optflags |= OptFlags::TYPE_COERCION;
} else {
optflags |= OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::ROW_ESTIMATE
| OptFlags::FAST_PROJECTION;
}

log::debug!("Optimization state: {optimization_state:?}");
optflags.set(OptFlags::STREAMING, self.streaming);

// log::debug!("Optimization state: {optimization_state:?}");

let join_results = if jointype == JoinType::Cross {
// cross join doesn't need join columns
self.left_lf
.with_optimizations(optimization_state)
.with_optimizations(optflags)
.join_builder()
.with(self.right_lf.with_optimizations(optimization_state))
.with(self.right_lf.with_optimizations(optflags))
.how(JoinType::Cross)
.coalesce(coalesce_flag)
.allow_parallel(true)
Expand All @@ -453,9 +464,9 @@ impl JoinStruct {
}

self.left_lf
.with_optimizations(optimization_state)
.with_optimizations(optflags)
.join_builder()
.with(self.right_lf.with_optimizations(optimization_state))
.with(self.right_lf.with_optimizations(optflags))
.left_on(left_selcols)
.right_on(right_selcols)
.how(jointype)
Expand Down
36 changes: 18 additions & 18 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ use polars::{
io::avro::{AvroWriter, Compression as AvroCompression},
prelude::{
CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter,
LazyCsvReader, LazyFileListReader, NullValues, OptState, ParquetCompression, ParquetWriter,
LazyCsvReader, LazyFileListReader, NullValues, OptFlags, ParquetCompression, ParquetWriter,
SerWriter, StatisticsOptions, ZstdLevel,
},
sql::SQLContext,
Expand Down Expand Up @@ -596,24 +596,24 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
None
};

let mut optimization_state = if args.flag_no_optimizations {
OptState::empty()
let mut optflags = OptFlags::from_bits_truncate(0);
if args.flag_no_optimizations {
optflags |= OptFlags::TYPE_COERCION;
} else {
OptState::PROJECTION_PUSHDOWN
| OptState::PREDICATE_PUSHDOWN
| OptState::CLUSTER_WITH_COLUMNS
| OptState::TYPE_COERCION
| OptState::SIMPLIFY_EXPR
| OptState::FILE_CACHING
| OptState::SLICE_PUSHDOWN
| OptState::COMM_SUBEXPR_ELIM
| OptState::FAST_PROJECTION
| OptState::ROW_ESTIMATE
optflags |= OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::ROW_ESTIMATE
| OptFlags::FAST_PROJECTION;
};

if args.flag_streaming {
optimization_state |= OptState::STREAMING;
}
optflags.set(OptFlags::STREAMING, args.flag_streaming);

// check if the input is a SQL script (ends with .sql)
let is_sql_script = std::path::Path::new(&args.arg_sql)
Expand All @@ -638,7 +638,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// for the optimization state struct
let debuglog_flag = log::log_enabled!(log::Level::Debug);
if debuglog_flag {
log::debug!("Optimization state: {optimization_state:?}");
log::debug!("Optimization state: {optflags:?}");
log::debug!(
"Delimiter: {delim} Infer_schema_len: {infer_len} try_parse_dates: {parse_dates} \
ignore_errors: {ignore_errors}, low_memory: {low_memory}, float_precision: \
Expand Down Expand Up @@ -747,7 +747,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory)
.finish()?;
ctx.register(table_name, lf.with_optimizations(optimization_state));
ctx.register(table_name, lf.with_optimizations(optflags));
}
}

Expand Down

0 comments on commit bb8b6e2

Please sign in to comment.