Skip to content

Commit

Permalink
Fix multiplication/division with null columns
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Jul 31, 2024
1 parent 7baf1fa commit 3bc0870
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 17 deletions.
34 changes: 28 additions & 6 deletions src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub struct BatchResult<'a> {
pub unsafe_referenced_buffers: Vec<BoxedData<'a>>,
}


impl<'a> BatchResult<'a> {
pub fn len(&self) -> usize {
self.columns.first().map_or(0, |s| s.len())
Expand Down Expand Up @@ -52,7 +51,12 @@ impl<'a> BatchResult<'a> {
}

#[must_use]
pub fn into_columns(self) -> (HashMap<String, Arc<dyn DataSource + 'a>>, Vec<BoxedData<'a>>) {
pub fn into_columns(
self,
) -> (
HashMap<String, Arc<dyn DataSource + 'a>>,
Vec<BoxedData<'a>>,
) {
let mut cols = HashMap::<String, Arc<dyn DataSource>>::default();
let columns = self.columns.into_iter().map(Arc::new).collect::<Vec<_>>();
for projection in self.projection {
Expand Down Expand Up @@ -130,25 +134,33 @@ pub fn combine<'a>(
(vec![], ops)
} else if lprojection.len() == 1 {
let (l, r) = unify_types(&mut qp, left[lprojection[0]], right[rprojection[0]]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
let (ops, merged) = qp.merge_deduplicate(l, r);
(vec![merged.any()], ops)
} else {
let (l, r) = unify_types(&mut qp, left[lprojection[0]], right[rprojection[0]]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
let mut partitioning = qp.partition(l, r, limit, false);
for i in 1..(lprojection.len() - 1) {
let (l, r) = unify_types(&mut qp, left[lprojection[i]], right[rprojection[i]]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
partitioning = qp.subpartition(partitioning,l, r, false);
partitioning = qp.subpartition(partitioning, l, r, false);
}

let last = lprojection.len() - 1;
let (l, r) = unify_types(&mut qp, left[lprojection[last]], right[rprojection[last]]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
let (ops, merged) = qp.merge_deduplicate_partitioned(partitioning, l, r);

let mut group_by_cols = Vec::with_capacity(lprojection.len());
for i in 0..last {
let (l, r) = unify_types(&mut qp, left[lprojection[i]], right[rprojection[i]]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
let merged = qp.merge_drop(ops, l, r);
group_by_cols.push(merged.any());
}
Expand Down Expand Up @@ -231,7 +243,7 @@ pub fn combine<'a>(
let (l, r) = unify_types(&mut qp, l, r);
let mut partitioning = qp.partition(l, r, limit, desc);

for i in 1..(batch1.order_by.len() - 1){
for i in 1..(batch1.order_by.len() - 1) {
let (index1, desc) = batch1.order_by[i];
let (index2, _) = batch2.order_by[i];
let (l, r) = unify_types(&mut qp, left[index1], right[index2]);
Expand All @@ -253,7 +265,12 @@ pub fn combine<'a>(
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
assert!(l.tag == r.tag, "Types do not match: {:?} {:?}", l.tag, r.tag);
assert!(
l.tag == r.tag,
"Types do not match: {:?} {:?}",
l.tag,
r.tag
);
let merged = qp.merge_keep(merge_ops, l, r);
projection.push(merged.any());
}
Expand All @@ -266,7 +283,12 @@ pub fn combine<'a>(
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
assert!(l.tag == r.tag, "Types do not match: {:?} {:?}", l.tag, r.tag);
assert!(
l.tag == r.tag,
"Types do not match: {:?} {:?}",
l.tag,
r.tag
);
let merged = qp.merge_keep(merge_ops, l, r);
order_by.push((merged.any(), desc));
}
Expand Down
52 changes: 45 additions & 7 deletions src/engine/planning/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,26 @@ impl Function2 {
encoding_invariance: true,
}
}

pub fn forward_left_null(t: BasicType) -> Function2 {
Function2 {
factory: Box::new(|_, lhs, _| lhs),
type_lhs: BasicType::Null,
type_rhs: t,
type_out: Type::unencoded(t).mutable(),
encoding_invariance: false,
}
}

pub fn forward_right_null(t: BasicType) -> Function2 {
Function2 {
factory: Box::new(|_, _, rhs| rhs),
type_lhs: t,
type_rhs: BasicType::Null,
type_out: Type::unencoded(t).mutable(),
encoding_invariance: false,
}
}
}

lazy_static! {
Expand Down Expand Up @@ -849,14 +869,26 @@ fn function2_registry() -> HashMap<Func2Type, Vec<Function2>> {
type_rhs: BasicType::Integer,
type_out: Type::unencoded(BasicType::Integer).mutable(),
encoding_invariance: false,
}
},
Function2::forward_left_null(BasicType::Float),
Function2::forward_right_null(BasicType::Float),
Function2::forward_left_null(BasicType::Integer),
Function2::forward_right_null(BasicType::Integer),
Function2::forward_left_null(BasicType::Null),
Function2::forward_right_null(BasicType::Null),
],
),
(
Func2Type::Divide,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| {
qp.checked_divide(lhs, rhs)
}))],
vec![
Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_divide(lhs, rhs))),
Function2::forward_left_null(BasicType::Float),
Function2::forward_right_null(BasicType::Float),
Function2::forward_left_null(BasicType::Integer),
Function2::forward_right_null(BasicType::Integer),
Function2::forward_left_null(BasicType::Null),
Function2::forward_right_null(BasicType::Null),
],
),
(
Func2Type::Modulo,
Expand Down Expand Up @@ -1772,7 +1804,7 @@ fn try_bitpacking(
max
};
order_preserving = order_preserving && plan_type.is_order_preserving();
let adjusted_query_plan = if query_plan.is_nullable() {
let mut adjusted_query_plan = if query_plan.is_nullable() {
let fused = planner.fuse_int_nulls(-min + 1, query_plan);
if fused.tag != EncodingType::I64 {
planner.cast(fused, EncodingType::I64).i64()?
Expand All @@ -1783,12 +1815,18 @@ fn try_bitpacking(
let offset = planner.scalar_i64(-min, true);
planner.add(query_plan, offset.into()).i64()?
} else if query_plan.is_null() {
planner
let x = planner
.constant_expand(0, partition_len, EncodingType::I64)
.i64()?
.i64()?;
info!("EMITTING NULL CONSTANT EXPAND {:?}", x);
x
} else {
planner.cast(query_plan, EncodingType::I64).i64()?
};
adjusted_query_plan = filter
.apply_filter(planner, adjusted_query_plan.into())
.i64()
.expect("source type should be i64");

if total_width == 0 {
plan = Some(adjusted_query_plan);
Expand Down
6 changes: 2 additions & 4 deletions tests/query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,12 +883,10 @@ fn test_multiply_null_by_constant() {
);
}

// TODO: lots of combinations of null and other types not supported for multiply/divide/...
#[ignore]
#[test]
fn test_multiply_null() {
fn test_divide_multiply_null() {
test_query_ec(
"SELECT MIN(_step), 493 * _step, _step / 10 FROM default WHERE _step IS NOT NULL AND value_loss IS NOT NULL",
"SELECT MIN(_step), _step * 12.321, 493 * _step, _step / 10 FROM default WHERE _step IS NOT NULL AND value_loss IS NOT NULL",
&[],
);
}
Expand Down

0 comments on commit 3bc0870

Please sign in to comment.