Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized template #2273

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
| [sqlp](/src/cmd/sqlp.rs#L2)<br>✨🚀🐻‍❄️🗄️🪄 | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. Query results can be saved in CSV, JSON, JSONL, Parquet, Apache Arrow IPC and Apache Avro formats. |
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️👆🪄 | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, sort order, min/max/sum/avg length, mean, standard error of the mean (SEM), stddev, variance, Coefficient of Variation (CV), nullcount, max precision, sparsity, quartiles, Interquartile Range (IQR), lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV ([more info](/~https://github.com/jqnatividad/qsv/wiki/Supplemental#stats-command-output-explanation)).<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 7.3 seconds!). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](/~https://github.com/BurntSushi/tabwriter). To interactively view a CSV, use the `lens` command. |
| [template](/src/cmd/template.rs#L2)<br> | Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. |
| [template](/src/cmd/template.rs#L2)<br>🚀 | Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. |
| [to](/src/cmd/to.rs#L2)<br>✨🚀🗄️ | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX and [Data Package](https://datahub.io/docs/data-packages/tabular). |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🚀🔣🪄 | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. |
| [transpose](/src/cmd/transpose.rs#L2)<br>🤯 | Transpose rows/columns of a CSV. |
Expand Down
202 changes: 139 additions & 63 deletions src/cmd/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ template options:
Note that the QSV_ROWNO variable is also available in the context
if you want to use it in the filename template.
[default: QSV_ROWNO]
--customfilter-error <arg> The value to return when a custom filter returns an error.
--customfilter-error <msg> The value to return when a custom filter returns an error.
Use "<empty string>" to return an empty string.
[default: <FILTER_ERROR>]
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
[default: 50000]

Common options:
-h, --help Display this message
Expand All @@ -53,6 +58,10 @@ use std::{
};

use minijinja::Environment;
use rayon::{
iter::{IndexedParallelIterator, ParallelIterator},
prelude::IntoParallelRefIterator,
};
use serde::Deserialize;

use crate::{
Expand All @@ -71,6 +80,8 @@ struct Args {
flag_output: Option<String>,
flag_outfilename: String,
flag_customfilter_error: String,
flag_jobs: Option<usize>,
flag_batch: usize,
flag_delimiter: Option<Delimiter>,
flag_no_headers: bool,
}
Expand All @@ -90,7 +101,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let template_content = match (args.flag_template_file, args.flag_template) {
(Some(path), None) => fs::read_to_string(path)?,
(None, Some(template)) => template,
_ => return fail_clierror!("Must provide either --template or --template-string"),
_ => {
return fail_incorrectusage_clierror!(
"Must provide either --template or --template-file"
)
},
};

// Initialize FILTER_ERROR from args.flag_customfilter_error
Expand Down Expand Up @@ -126,26 +141,29 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let rconfig = Config::new(args.arg_input.as_ref())
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers);

let mut rdr = rconfig.reader()?;

// read headers
let headers = if args.flag_no_headers {
csv::StringRecord::new()
} else {
let headers = rdr.headers()?.clone();
let sanitized_headers: Vec<String> = headers
let mut sanitized_headers: Vec<String> = headers
.iter()
.map(|h| {
h.chars()
.map(|c| if c.is_alphanumeric() { c } else { '_' })
.collect()
})
.collect();
// add a column named QSV_ROWNO at the end
sanitized_headers.push(QSV_ROWNO.to_owned());
csv::StringRecord::from(sanitized_headers)
};

// Set up output handling
let output_to_dir = args.arg_outdir.is_some();
let mut row_number = 0_u64;
let mut row_no = 0_u64;
let mut rowcount = 0;

// Create filename environment once if needed
Expand All @@ -158,6 +176,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
None
};
// Get width of rowcount for padding leading zeroes
// when rendering --outfilename
let width = rowcount.to_string().len();

if output_to_dir {
Expand All @@ -179,69 +198,126 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
})
};

// amortize allocations
let mut curr_record = csv::StringRecord::new();
#[allow(unused_assignments)]
let mut rendered = String::new();
let num_jobs = util::njobs(args.flag_jobs);
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);

// reuse batch buffers
#[allow(unused_assignments)]
let mut outfilename = String::new();
let mut context = simd_json::owned::Object::default();

// Process each record
for record in rdr.records() {
row_number += 1;
curr_record.clone_from(&record?);

if args.flag_no_headers {
// Use numeric, column 1-based indices (e.g. _c1, _c2, etc.)
for (i, field) in curr_record.iter().enumerate() {
context.insert(
format!("_c{}", i + 1),
simd_json::OwnedValue::String(field.to_owned()),
);
}
} else {
// Use header names
for (header, field) in headers.iter().zip(curr_record.iter()) {
context.insert(
header.to_string(),
simd_json::OwnedValue::String(field.to_owned()),
);
let mut batch_record = csv::StringRecord::new();
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

let no_headers = args.flag_no_headers;

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
row_no += 1;
batch_record.push_field(itoa::Buffer::new().format(row_no));
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
}
}
// Always add row number to context
context.insert(
QSV_ROWNO.to_string(),
simd_json::OwnedValue::from(row_number),
);

// Render template with record data
rendered = template.render(&context)?;

if output_to_dir {
outfilename = if args.flag_outfilename == QSV_ROWNO {
// Pad row number with required number of leading zeroes
format!("{row_number:0width$}.txt")
} else {
filename_env
.as_ref()
.unwrap()
.get_template("filename")?
.render(&context)?
};
let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap()).join(outfilename);
let mut writer = BufWriter::new(fs::File::create(outpath)?);
write!(writer, "{rendered}")?;
writer.flush()?;
} else if let Some(ref mut w) = wtr {
w.write_all(rendered.as_bytes())?;

if batch.is_empty() {
// break out of infinite loop when at EOF
break 'batch_loop;
}
context.clear();
}

if let Some(mut w) = wtr {
w.flush()?;
}
// do actual template rendering via Rayon parallel iterator
batch
.par_iter()
.with_min_len(1024)
.map(|record| {
let curr_record = record;

let mut context = simd_json::owned::Object::default();
let mut row_number = 0_u64;

if no_headers {
// Use numeric, column 1-based indices (e.g. _c1, _c2, etc.)
let headers_len = curr_record.len();

for (i, field) in curr_record.iter().enumerate() {
if i == headers_len - 1 {
// set the last field to QSV_ROWNO
row_number = atoi_simd::parse::<u64>(field.as_bytes()).unwrap();
context.insert(
QSV_ROWNO.to_owned(),
simd_json::OwnedValue::String(field.to_owned()),
);
} else {
context.insert(
format!("_c{}", i + 1),
simd_json::OwnedValue::String(field.to_owned()),
);
}
}
} else {
// Use header names
for (header, field) in headers.iter().zip(curr_record.iter()) {
context.insert(
header.to_string(),
simd_json::OwnedValue::String(field.to_owned()),
);
// when headers are defined, the last one is QSV_ROWNO
if header == QSV_ROWNO {
row_number = atoi_simd::parse::<u64>(field.as_bytes()).unwrap();
}
}
}

// Render template with record data
let rendered = template
.render(&context)
.unwrap_or_else(|_| "RENDERING ERROR".to_owned());

if output_to_dir {
let outfilename = if args.flag_outfilename == QSV_ROWNO {
// Pad row number with required number of leading zeroes
format!("{row_number:0width$}.txt")
} else {
filename_env
.as_ref()
.unwrap()
.get_template("filename")
.unwrap()
.render(&context)
.unwrap_or_else(|_| "FILENAME RENDERING ERROR".to_owned())
};
(outfilename, rendered)
} else {
(String::new(), rendered)
}
})
.collect_into_vec(&mut batch_results);

for result_record in &batch_results {
if output_to_dir {
let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap())
.join(result_record.0.clone());
let mut writer = BufWriter::new(fs::File::create(outpath)?);
write!(writer, "{}", result_record.1)?;
writer.flush()?;
} else if let Some(ref mut w) = wtr {
w.write_all(result_record.1.as_bytes())?;
}
}

batch.clear();
} // end batch loop

Ok(())
}
Expand Down
72 changes: 72 additions & 0 deletions tests/test_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,75 @@ Alice is a minor.
Bob is an adult.";
assert_eq!(got, expected);
}

#[test]
fn template_render_error() {
let wrk = Workdir::new("template_render_error");
wrk.create(
"data.csv",
vec![
svec!["name", "age"],
svec!["Alice", "25"],
svec!["Bob", "30"],
],
);

// Test invalid template syntax with default error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("Hello {{name}, invalid syntax!")
.arg("data.csv");

wrk.assert_err(&mut *&mut cmd);
let got: String = wrk.output_stderr(&mut cmd);
let expected =
"syntax error: unexpected `}}`, expected end of variable block (in template:1)\n";
assert_eq!(got, expected);
}

#[test]
fn template_filter_error() {
let wrk = Workdir::new("template_filter_error");
wrk.create(
"data.csv",
vec![
svec!["name", "amount"],
svec!["Alice", "not_a_number"],
svec!["Bob", "123.45"],
],
);

// Test filter error with default error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: <FILTER_ERROR>\nBob: 123.45";
assert_eq!(got, expected);

// Test custom filter error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("--customfilter-error")
.arg("INVALID NUMBER")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: INVALID NUMBER\nBob: 123.45";
assert_eq!(got, expected);

// Test empty string as filter error
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("--customfilter-error")
.arg("<empty string>")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: \nBob: 123.45";
assert_eq!(got, expected);
}
Loading