Skip to content

Commit

Permalink
Merge pull request #2433 from dathere/2424-joinp-ignore-leading-zeros…
Browse files Browse the repository at this point in the history
…-handling

`joinp`: refactor `--ignore-leading-zeros` handling
  • Loading branch information
jqnatividad authored Jan 12, 2025
2 parents 58efe9e + 600a6bc commit 5bcf49b
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 150 deletions.
226 changes: 93 additions & 133 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ joinp arguments:
Note that <input1> is the left CSV data set and <input2> is the right CSV data set.
joinp options:
-i, --ignore-case When set, joins are done case insensitively.
-z, --ignore-leading-zeros When set, joins are done ignoring leading zeros.
Note that this is only applied to the join keys for
both numeric and string columns. The output columns
will not have leading zeros.
--left Do a 'left outer' join. This returns all rows in
first CSV data set, including rows with no
corresponding row in the second data set. When no
Expand Down Expand Up @@ -220,6 +215,15 @@ joinp options:
--null-value <arg> The string to use when writing null values.
(default: <empty string>)
-i, --ignore-case When set, joins are done case insensitively.
-z, --ignore-leading-zeros When set, joins are done ignoring leading zeros.
Note that this is only applied to the join keys for
both numeric and string columns. Also note that
Polars will automatically remove leading zeros from
numeric columns when it infers the schema.
To force the schema to be all String types,
set --cache-schema to -1 or -2.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
Expand All @@ -237,7 +241,7 @@ use std::{
str,
};

use polars::{datatypes::AnyValue, prelude as pl, prelude::*, sql::SQLContext};
use polars::prelude::*;
use serde::Deserialize;
use tempfile::tempdir;

Expand Down Expand Up @@ -544,52 +548,83 @@ impl JoinStruct {
.map(polars::lazy::dsl::col)
.collect();

// If ignore_case is enabled, create lowercase versions of the join columns
// but only if ignore_leading_zeros is not enabled
// because leading zeros handling does its own lowercase conversion
if self.ignore_case && !self.ignore_leading_zeros {
// Create temporary lowercase versions of join columns in left dataframe
// Handle ignore_case and ignore_leading_zeros transformations
if self.ignore_case || self.ignore_leading_zeros {
// Create temporary versions of join columns in left dataframe
for col in &left_selcols {
self.left_lf = self
.left_lf
.with_column(col.clone().str().to_lowercase().alias(format!(
"_qsv-{}-lower",
col.to_string()
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
)));
let col_str = col.to_string();
let col_name = col_str
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
.to_string();
let temp_col_name = format!("_qsv-{col_name}-transformed");

// Create a temporary transformed column for joining
// we need to cast to string as the original column may be of a different type
// and we need a string type for the transformations
let transformed = col.clone().cast(DataType::String);
let transformed = if self.ignore_leading_zeros {
transformed
.str()
.replace_all(lit(r"^0+"), lit(""), false)
.str()
.to_lowercase()
} else if self.ignore_case {
transformed.str().to_lowercase()
} else {
transformed
};

// Add transformed column for joining - keep original untouched
self.left_lf = self.left_lf.with_column(transformed.alias(&temp_col_name));
}

// Create temporary lowercase versions of join columns in right dataframe
// Create temporary versions of join columns in right dataframe
for col in &right_selcols {
self.right_lf = self
.right_lf
.with_column(col.clone().str().to_lowercase().alias(format!(
"_qsv-{}-lower",
col.to_string()
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
)));
let col_str = col.to_string();
let col_name = col_str
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
.to_string();
let temp_col_name = format!("_qsv-{col_name}-transformed");

// Create a temporary transformed column for joining
let transformed = col.clone().cast(DataType::String);
let transformed = if self.ignore_leading_zeros {
transformed
.str()
.replace_all(lit(r"^0+"), lit(""), false)
.str()
.to_lowercase()
} else if self.ignore_case {
transformed.str().to_lowercase()
} else {
transformed
};

// Add transformed column for joining - keep original untouched
self.right_lf = self.right_lf.with_column(transformed.alias(&temp_col_name));
}

// Create new vectors for the lowercase column names
// Update selcols to use the transformed columns for joining
let left_selcols_w: Vec<_> = left_selcols
.iter()
.map(|col| {
polars::lazy::dsl::col(format!(
"_qsv-{}-lower",
"_qsv-{}-transformed",
col.to_string()
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
))
})
.collect();
left_selcols = left_selcols_w;

let right_selcols_w: Vec<_> = right_selcols
.iter()
.map(|col| {
polars::lazy::dsl::col(format!(
"_qsv-{}-lower",
"_qsv-{}-transformed",
col.to_string()
.trim_start_matches(r#"col(""#)
.trim_end_matches(r#"")"#)
Expand Down Expand Up @@ -705,25 +740,42 @@ impl JoinStruct {
};

let mut results_df = if let Some(sql_filter) = &self.sql_filter {
let mut ctx = SQLContext::new();
let mut ctx = polars::sql::SQLContext::new();
ctx.register("join_result", join_results.lazy());
ctx.execute(sql_filter)
.and_then(polars::prelude::LazyFrame::collect)?
} else {
join_results
};

// if self.ignore_case, remove the temporary lowercase columns from the dataframe
if self.ignore_case && !self.ignore_leading_zeros {
// Get all column names
if self.ignore_case || self.ignore_leading_zeros {
// Remove temporary transformed columns and
// duplicate right-side join columns if coalesce is true
let cols = results_df.get_column_names();
// Filter out the lowercase columns (those with "_qsv-*-lower" pattern)
let keep_cols: Vec<String> = cols
.iter()
.filter(|&col| !(col.starts_with("_qsv-") && col.ends_with("-lower")))
.map(|&s| s.to_string())
let mut keep_cols: Vec<String> = Vec::new();

let left_join_cols: Vec<String> = self
.left_sel
.split(',')
.map(std::string::ToString::to_string)
.collect();
// Select only the non-lowercase columns

for col in cols {
if col.contains("-transformed") {
continue;
}

// For join columns, only keep the left version if coalesce is true
if self.coalesce && col.ends_with("_right") {
let base_col = col.trim_end_matches("_right");
if left_join_cols.contains(&base_col.to_string()) {
continue;
}
}

keep_cols.push(col.to_string());
}

results_df = results_df.select(keep_cols)?;
}

Expand Down Expand Up @@ -1060,98 +1112,6 @@ impl Args {
right_lf = right_lf.filter(filter_right_expr);
}

if self.flag_ignore_leading_zeros {
// Transform the join keys in the left and right dataframe to handle leading zeros
// For each join key column:
// 1. Cast to string type
// 2. If ignore_case is true, convert to lowercase
// 3. Trim leading zeros, regardless of the original type
// This allows joins to match on values like "001" and "1", "0001ABZ" and "1ABZ".
// Note that the output columns will not have leading zeros for the join keys.
//
// We had to add ignore_case handling to ignore_leading_zeros in the join keys because
// the existing code for ignore_case was not working for leading zeros if they are both
// used at the same time.
// The ignore_case code was creating temporary join key columns while
// ignore_leading_zeros was using the existing join key columns.
let ignore_case = self.flag_ignore_case;
left_lf = left_lf.with_columns(
self.arg_columns1
.split(',')
.map(|col| {
let col_name = col.to_string();
pl::col(&col_name).cast(pl::DataType::String).map(
move |s| {
Ok(Some(
pl::Series::new(
col_name.clone().into(),
s.str()?
.into_iter()
.map(|x| {
x.map(|val| {
let v = if ignore_case {
val.to_lowercase()
} else {
val.to_string()
};
if v.starts_with('0') {
v.trim_start_matches('0').to_string()
} else {
v
}
})
.unwrap_or_default()
})
.collect::<Vec<String>>(),
)
.into(),
))
},
pl::GetOutput::from_type(pl::DataType::String),
)
})
.collect::<Vec<_>>(),
);

right_lf = right_lf.with_columns(
self.arg_columns2
.split(',')
.map(|col| {
let col_name = col.to_string();
pl::col(&col_name).cast(pl::DataType::String).map(
move |s| {
Ok(Some(
pl::Series::new(
col_name.clone().into(),
s.str()?
.into_iter()
.map(|x| {
x.map(|val| {
let v = if ignore_case {
val.to_lowercase()
} else {
val.to_string()
};
if v.starts_with('0') {
v.trim_start_matches('0').to_string()
} else {
v
}
})
.unwrap_or_default()
})
.collect::<Vec<String>>(),
)
.into(),
))
},
pl::GetOutput::from_type(pl::DataType::String),
)
})
.collect::<Vec<_>>(),
);
}

Ok(JoinStruct {
left_lf,
left_sel: self.arg_columns1.clone(),
Expand Down
Loading

0 comments on commit 5bcf49b

Please sign in to comment.