Skip to content

Commit

Permalink
Merge pull request #2055 from jqnatividad/2040-2053-stats-caching-ref…
Browse files Browse the repository at this point in the history
…actor

stats caching refactor
  • Loading branch information
jqnatividad authored Aug 16, 2024
2 parents 0c373d1 + ff0bbf4 commit 95dc988
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 195 deletions.
21 changes: 8 additions & 13 deletions src/cmd/frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ impl Args {
"_schema" => StatsMode::Schema, // only meant for internal use by schema command
_ => return fail_incorrectusage_clierror!("Invalid stats mode"),
};
let (csv_fields, csv_stats, stats_col_index_map) =
get_stats_records(&schema_args, stats_mode)?;
let (csv_fields, csv_stats) = get_stats_records(&schema_args, stats_mode)?;

if stats_mode == StatsMode::None || stats_mode == StatsMode::Schema || csv_fields.is_empty()
{
Expand All @@ -614,30 +613,24 @@ impl Args {
csv_fields.len() == csv_stats.len(),
"Mismatch between the number of fields and stats records"
);
let col_cardinality_vec: Vec<(String, usize)> = csv_stats
let col_cardinality_vec: Vec<(String, u64)> = csv_stats
.iter()
.enumerate()
.map(|(i, _record)| {
.map(|(i, stats_record)| {
// get the column name and stats record
// safety: we know that csv_fields and csv_stats have the same length
let col_name = csv_fields.get(i).unwrap();
let stats_record = csv_stats.get(i).unwrap().clone().to_record(4, false);

let col_cardinality = match stats_record.get(stats_col_index_map["cardinality"]) {
Some(s) => s.parse::<usize>().unwrap_or(0_usize),
None => 0_usize,
};
(
simdutf8::basic::from_utf8(col_name)
.unwrap_or(NON_UTF8_ERR)
.to_string(),
col_cardinality,
stats_record.cardinality,
)
})
.collect();

// now, get the unique headers, where cardinality == rowcount
let row_count = util::count_rows(&self.rconfig())? as usize;
let row_count = util::count_rows(&self.rconfig())?;
FREQ_ROW_COUNT.set(row_count as u64).unwrap();

let mut all_unique_headers_vec: Vec<usize> = Vec::with_capacity(5);
Expand All @@ -659,7 +652,9 @@ impl Args {
let headers = rdr.byte_headers()?;
let all_unique_headers_vec = self.get_unique_headers(headers)?;

UNIQUE_COLUMNS.set(all_unique_headers_vec).unwrap();
UNIQUE_COLUMNS
.set(all_unique_headers_vec)
.map_err(|_| "Cannot set UNIQUE_COLUMNS")?;

let sel = self.rconfig().selection(headers)?;
Ok((sel.select(headers).map(<[u8]>::to_vec).collect(), sel))
Expand Down
97 changes: 44 additions & 53 deletions src/cmd/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use rayon::slice::ParallelSliceMut;
use serde_json::{json, value::Number, Map, Value};
use stats::Frequencies;

use crate::{cmd::stats::Stats, config::Config, util, util::StatsMode, CliResult};
use crate::{cmd::stats::StatsData, config::Config, util, util::StatsMode, CliResult};

const STDIN_CSV: &str = "stdin.csv";

Expand Down Expand Up @@ -208,20 +208,18 @@ pub fn infer_schema_from_stats(
input_filename: &str,
) -> CliResult<Map<String, Value>> {
// invoke cmd::stats
let (csv_fields, csv_stats, stats_col_index_map) =
util::get_stats_records(args, StatsMode::Schema)?;
let (csv_fields, csv_stats) = util::get_stats_records(args, StatsMode::Schema)?;

// amortize memory allocation
let mut low_cardinality_column_indices: Vec<usize> =
Vec::with_capacity(args.flag_enum_threshold);
let mut low_cardinality_column_indices: Vec<u64> =
Vec::with_capacity(args.flag_enum_threshold as usize);

// build column selector arg to invoke cmd::frequency with
let column_select_arg: String = build_low_cardinality_column_selector_arg(
&mut low_cardinality_column_indices,
args.flag_enum_threshold,
&csv_fields,
&csv_stats,
&stats_col_index_map,
);

// invoke cmd::frequency to get unique values for each field
Expand All @@ -233,35 +231,33 @@ pub fn infer_schema_from_stats(
// amortize memory allocations
let mut field_map: Map<String, Value> = Map::with_capacity(10);
let mut type_list: Vec<Value> = Vec::with_capacity(4);
let mut enum_list: Vec<Value> = Vec::with_capacity(args.flag_enum_threshold);
let mut enum_list: Vec<Value> = Vec::with_capacity(args.flag_enum_threshold as usize);
let mut header_byte_slice;
let mut header_string;
let mut stats_record;
let mut col_type;
let mut col_null_count;

// generate definition for each CSV column/field and add to properties_map
#[allow(clippy::needless_range_loop)]
for i in 0..csv_fields.len() {
header_byte_slice = csv_fields.get(i).unwrap();

// convert csv header to string
header_string = convert_to_string(header_byte_slice)?;

// grab stats record for current column
stats_record = csv_stats.get(i).unwrap().clone().to_record(4, false);
stats_record = csv_stats[i].clone();

if log::log_enabled!(log::Level::Debug) {
debug!("stats[{header_string}]: {stats_record:?}");
}

// get Type from stats record
col_type = stats_record.get(stats_col_index_map["type"]).unwrap();
col_type = stats_record.r#type.clone();

// get NullCount
col_null_count = if let Some(s) = stats_record.get(stats_col_index_map["nullcount"]) {
s.parse::<usize>().unwrap_or(0_usize)
} else {
0_usize
};
col_null_count = stats_record.nullcount;

// debug!(
// "{header_string}: type={col_type}, optional={}",
Expand All @@ -277,27 +273,25 @@ pub fn infer_schema_from_stats(
type_list.clear();
enum_list.clear();

match col_type {
match col_type.as_str() {
"String" => {
type_list.push(Value::String("string".to_string()));

// minLength constraint
if let Some(min_length_str) = stats_record.get(stats_col_index_map["min_length"]) {
let min_length = min_length_str.parse::<u32>().unwrap();
if let Some(min_length) = stats_record.min_length {
field_map.insert(
"minLength".to_string(),
Value::Number(Number::from(min_length)),
);
};
}

// maxLength constraint
if let Some(max_length_str) = stats_record.get(stats_col_index_map["max_length"]) {
let max_length = max_length_str.parse::<u32>().unwrap();
if let Some(max_length) = stats_record.max_length {
field_map.insert(
"maxLength".to_string(),
Value::Number(Number::from(max_length)),
);
};
}

// enum constraint
if let Some(values) = unique_values_map.get(&header_string) {
Expand All @@ -309,15 +303,23 @@ pub fn infer_schema_from_stats(
"Integer" => {
type_list.push(Value::String("integer".to_string()));

if let Some(min_str) = stats_record.get(stats_col_index_map["min"]) {
let min = atoi_simd::parse::<i64>(min_str.as_bytes()).unwrap();
field_map.insert("minimum".to_string(), Value::Number(Number::from(min)));
};
if let Some(min) = stats_record.min {
field_map.insert(
"minimum".to_string(),
Value::Number(Number::from(
atoi_simd::parse::<i64>(min.as_bytes()).unwrap(),
)),
);
}

if let Some(max_str) = stats_record.get(stats_col_index_map["max"]) {
let max = atoi_simd::parse::<i64>(max_str.as_bytes()).unwrap();
field_map.insert("maximum".to_string(), Value::Number(Number::from(max)));
};
if let Some(max) = stats_record.max {
field_map.insert(
"maximum".to_string(),
Value::Number(Number::from(
atoi_simd::parse::<i64>(max.as_bytes()).unwrap(),
)),
);
}

// enum constraint
if let Some(values) = unique_values_map.get(&header_string) {
Expand All @@ -330,21 +332,19 @@ pub fn infer_schema_from_stats(
"Float" => {
type_list.push(Value::String("number".to_string()));

if let Some(min_str) = stats_record.get(stats_col_index_map["min"]) {
let min = min_str.parse::<f64>().unwrap();
if let Some(min) = stats_record.min {
field_map.insert(
"minimum".to_string(),
Value::Number(Number::from_f64(min).unwrap()),
Value::Number(Number::from_f64(min.parse::<f64>().unwrap()).unwrap()),
);
};
}

if let Some(max_str) = stats_record.get(stats_col_index_map["max"]) {
let max = max_str.parse::<f64>().unwrap();
if let Some(max) = stats_record.max {
field_map.insert(
"maximum".to_string(),
Value::Number(Number::from_f64(max).unwrap()),
Value::Number(Number::from_f64(max.parse::<f64>().unwrap()).unwrap()),
);
};
}
},
"NULL" => {
type_list.push(Value::String("null".to_string()));
Expand Down Expand Up @@ -403,29 +403,22 @@ pub fn infer_schema_from_stats(

/// get column selector argument string for low cardinality columns
fn build_low_cardinality_column_selector_arg(
low_cardinality_column_indices: &mut Vec<usize>,
enum_cardinality_threshold: usize,
low_cardinality_column_indices: &mut Vec<u64>,
enum_cardinality_threshold: u64,
csv_fields: &ByteRecord,
csv_stats: &[Stats],
stats_col_index_map: &AHashMap<String, usize>,
csv_stats: &[StatsData],
) -> String {
low_cardinality_column_indices.clear();

// identify low cardinality columns
#[allow(clippy::needless_range_loop)]
for i in 0..csv_fields.len() {
// grab stats record for current column
let stats_record = csv_stats.get(i).unwrap().clone().to_record(4, false);

// get Cardinality
let col_cardinality = match stats_record.get(stats_col_index_map["cardinality"]) {
Some(s) => s.parse::<usize>().unwrap_or(0_usize),
None => 0_usize,
};
// debug!("column_{i}: cardinality={col_cardinality}");
let col_cardinality = csv_stats[i].cardinality;

if col_cardinality > 0 && col_cardinality <= enum_cardinality_threshold {
// column selector uses 1-based index
low_cardinality_column_indices.push(i + 1);
low_cardinality_column_indices.push((i + 1) as u64);
};
}

Expand All @@ -450,7 +443,7 @@ fn get_unique_values(
arg_input: args.arg_input.clone(),
flag_select: crate::select::SelectColumns::parse(column_select_arg).unwrap(),
flag_limit: args.flag_enum_threshold as isize,
flag_unq_limit: args.flag_enum_threshold,
flag_unq_limit: args.flag_enum_threshold as usize,
flag_lmt_threshold: 0,
flag_pct_dec_places: -5,
flag_other_sorted: false,
Expand Down Expand Up @@ -545,8 +538,6 @@ fn generate_string_patterns(
args: &util::SchemaArgs,
properties_map: &Map<String, Value>,
) -> CliResult<AHashMap<String, String>> {
// standard boiler-plate for reading CSV

let rconfig = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers)
Expand Down
Loading

0 comments on commit 95dc988

Please sign in to comment.