-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Leverage more engine capabilities in data skipping 2/n #83
Conversation
cb01303
to
ec20edf
Compare
kernel/src/scan/mod.rs
Outdated
// TODO we should be passing an empty batch here, but not sure how | ||
partiton_arrays.push(evaluator.evaluate(&batch)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this TODO means, sorry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: We're evaluating a Literal
expression that needs no inputs...
Another approach might be to get the (top-level) column names, and create a struct of expressions that becomes the output batch:
let mut fields = Vec::with_capacity(...);
for (column, field) in &partition_fields {
let value_expression = ...;
fields.push(value_expression);
}
for field in self.schema.fields {
let column_expression = Expression::Column(field.name);
fields.push(column_expression);
}
// TODO: Set this up once overall, rather than once per batch!
let evaluator = expression_handler.get_evaluator(batch.schema(), Expression::Struct(fields));
evaluator.evaluate(&batch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in commit evaluate full skipping predicate via evaluator.
kernel/src/scan/mod.rs
Outdated
// TODO the protocol states that an empty string is always a null value | ||
// does this mean that we cannot have empty strings as a string partition value? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delta spark hit this as well. It's a limitation of hive value partitioning that Delta inherited, with some spark limitations thrown in for good measure.
The least-bad solution we could come up with was to forcibly interpret empty strings as null on the write path, so that the read path consistently returns null. This indeed means you can't store an empty string partition value (it coerces to null), and also means you can't store an empty string (= null) in a partition column with a not-null constraint. See e.g.
/~https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala#L193
kernel/src/scan/mod.rs
Outdated
match data_type { | ||
DataType::Primitive(primitive) => match primitive { | ||
PrimitiveType::String => Ok(Scalar::String(raw.to_string())), | ||
PrimitiveType::Integer => { | ||
Ok(Scalar::Integer(raw.parse::<i32>().map_err(|_| { | ||
Error::ParseError(raw.to_string(), data_type.clone()) | ||
})?)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define a helper method to capture this boilerplate?
where
match data_type { | |
DataType::Primitive(primitive) => match primitive { | |
PrimitiveType::String => Ok(Scalar::String(raw.to_string())), | |
PrimitiveType::Integer => { | |
Ok(Scalar::Integer(raw.parse::<i32>().map_err(|_| { | |
Error::ParseError(raw.to_string(), data_type.clone()) | |
})?)) | |
} | |
match data_type { | |
DataType::Primitive(primitive) => primitive.parse_scalar(raw) |
where
impl PrimitiveType {
pub fn parse_scalar(&self, raw: &str) -> Result<Scalar, Error> {
match self {
Self::String => Ok(Scalar::String(raw.to_string())),
Self::Byte => self.str_parse_scalar(raw, |i| Scalar::Byte(i)),
... other numeric types ...
Self::Double => self.str_parse_scalar(raw, |i| Scalar::Double(i)),
... remaining types (decimal, bool, date/time)
}
}
fn str_parse_scalar<T: std::str::FromStr>(
&self,
raw: &str,
f: impl FnOnce(T) -> Scalar
) -> Result<Scalar, Error> {
match raw.parse() {
Ok(val) => Ok(f(val)),
Err(..) => Err(Error::ParseError(raw.to_string(), DataType::Primitive(self))),
}
}
}
(I tested it in rust playground, and the compiler is in fact able to do the necessary type inference!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably factor out the error handling as well, since not all primitive types would use str_parse_scalar
method:
fn parse_error(&self, raw: &str) -> Error {
Error::ParseError(raw.to_string(), DataType::Primitive(self))
}
and then
match raw.parse() {
Ok(val) => Ok(f(val)),
Err(..) => Err(self.parse_error(raw)),
}
(we can't "just" factor out the ParseError to top-level, because the error type of str::parse<T>
result depends on T
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These updates are included in handle partition values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI Delta spec doesn't require the file names to use hive-style partitioning scheme (the actual partition values come from the file's Add metadata entry). But it doesn't hurt either.
4d228b4
to
7c26fd4
Compare
f3d18dc
to
d14eae8
Compare
039561b
to
8eefc31
Compare
8eefc31
to
d644e81
Compare
d644e81
to
dc5119e
Compare
kernel/src/scan/mod.rs
Outdated
.as_any() | ||
.downcast_ref::<StructArray>() | ||
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))? | ||
.into() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: This casting idiom seems to show up a lot, and very bloaty.
Any thoughts on how we might be able to factor out the bloat to a helper of some kind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do the same as you recently did for expressions etc and define helpers
impl Error {
pub fn unexpected_column_type(msg: impl Into<String>) -> Self ...
}
this would also allow us to harmonize messages, which IIRC are still inconsistent:
impl Error {
pub fn unexpected_column_type(expected: &DataType, found: &DataType) -> Self ...
}
It seems we should do a pass trough the errors soon anyhow, since the internal errors still have variants for arrow / parquet, and we should floow up on the discussion you started in slack. Maybe something for todays sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make it a little cleaner with:
use arrow_array::cast::AsArray;
...
evaluator
.evaluate(&batch)?
.as_struct_opt()
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
.into()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, there is likely some more opportunity to simplify things with AsArray
as we move forward.
kernel/src/scan/mod.rs
Outdated
fn get_partition_value(raw: &str, data_type: &DataType) -> DeltaResult<Scalar> { | ||
match data_type { | ||
DataType::Primitive(primitive) => primitive.parse_scalar(raw), | ||
_ => todo!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really a TODO? AFAIK the spec only allows primitive values? If anything, Primitive
might be too permissive, if the spec fails to mention some primitive type we support?
Or does the spec require us to support some non-primitive scalar types? If so, it might be helpful to spec them out with individual todo!()
clauses, and a catch-all clause that only errors out for the remaining unknown and/or unsupported types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: A quick comparison of schema.rs vs. partition value spec suggests that we have a near-perfect match. Just missing the TimestampNTZ
type on the rust side, which is a table feature and so arguably ok to leave as TODO for now. All other types should be rejected outright.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so then... nit: make this return an Error
instead of the todo!()
panic :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...HIVE_DEFAULT_PARTITION__/part-00000-8eb7f29a-e6a1-436e-a638-bbf0a7953f09.c000.snappy.parquet
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the review @ryan-johnson-databricks - not sure how you feel, but it seems the data type / schema validation is a bigger thing in itself, that deserves a dedicated discussion (and with that PR) and we would focus here on the null-if thig as well as the other comments?
.downcast_ref::<BooleanArray>() | ||
.ok_or(Error::UnexpectedColumnType( | ||
"Expected type 'BooleanArray'.".into(), | ||
))?; | ||
|
||
let before_count = actions.num_rows(); | ||
let after = filter_record_batch(actions, skipping_vector)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ryan-johnson-databricks - the only larger thing where we spill arrow into data skipping is the filtering.
IIRC, you mentioned that we may want to introduce a dedicated API to apply filter vectors to data? Related to that, do we have a plan already how we create and engine-specific filter vector form the deletion vector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment seems to have been in pending state since december - i certainly did not add it now 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good one to double check w/ kernel-jvm folks at our next Thursday sync. At least at one point they were passing around (selection vector, columnar batch) pairs, and letting engine combine those if it wanted. I don't know what they did for creating the DV selection vector -- we should check that as well -- but I favor creating a boolean array directly, and asking engine to copy that to whatever internal format it likes.
BinaryOperation { op, left, right } => { | ||
let left_arr = evaluate_expression(left.as_ref(), batch)?; | ||
let right_arr = evaluate_expression(right.as_ref(), batch)?; | ||
(BinaryOperation { op, left, right }, _) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm - i guess the first question to answer is if we want to include some kind of casting here as well and allow e.g float
+ int
, in which case i would probably take the "biggest" type of left / right and make sure the result is of that type, otherwise probably left == right == result? not sure, but i think arrow will always require inputs of the same type.
in case of comparisons it must be boolean, but we may proactively check equality of the input types.
this may raise the question - if we want to validate a lot of things, and not let arrow raise for us, we may want to consider splitting comparisons and arithmetics. but not totally sure that thats wirth it.
kernel/src/expressions/mod.rs
Outdated
// TODO how to model required functions? | ||
NullIf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the time i felt that the operations were modelled consistently, but with adding additional functions that don't fit in these categories, I at least wanted to discuss if we just keep adding a new variant per function (if we even need more), or should have some additional grouping ...
kernel/src/scan/data_skipping.rs
Outdated
static ref FILTER_EXPR: Expr = Expr::is_null(Expr::null_if( | ||
Expr::column("predicate"), | ||
Expr::column("predicate"), | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both options also have the benefit of us not havig to pre-compute "predicate" as a separate batch ... out of those I think I like distinct better, due to it being (at least i think so :)) more directly what we want, in contrast to coalesce, which if i read that right ...
Returns the data type of expression with the highest data type precedence.
... is also kind of "dynamic" in its return type and allows for mixed inputs.
somehow I also keep wondering if and_kleene might help as well, do you think thats worth looking into?
kernel/src/client/expression.rs
Outdated
let reducer = match op { | ||
VariadicOperator::And => and, | ||
VariadicOperator::Or => or, | ||
}; | ||
exprs | ||
.iter() | ||
.map(|expr| evaluate_expression(expr, batch)) | ||
.map(|expr| evaluate_expression(expr, batch, Some(&DataType::BOOLEAN))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think yes ...
For sure. Sorry if that wasn't clear from my comments. |
dc5119e
to
aa44f36
Compare
5f4ba2f
to
5bc24e7
Compare
kernel/src/client/expression.rs
Outdated
} | ||
(Distinct { lhs, rhs }, Some(&DataType::BOOLEAN)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a new top-level expression type? Can Distinct
to be a new BinaryOperator
instead?
(BinaryOperation { op, left, right }, _) => {
...
let eval: Operation = match op {
...
Equal => |l, r| eq(l, r).map(wrap_comparison_result),
NotEqual => |l, r| neq(l, r).map(wrap_comparison_result),
+ Distinct => |l, r| distinct(l, r).map(wrap_comparison_result),
};
(bonus: whatever type checking we eventually add would then benefit all comparison operators)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: In a future PR that adds better type checking, should we introduce a ComparisonOperator
sub-enum, for things that map (T, T) -> bool? And if we did that, should we also add an AlgebraicOperator
(**) sub-enum, for things that map (T, T) -> T? That would capture the vast majority of binary operations in a structured way, while still allowing to add arbitrary other binary operators if needed (***)?
Edit: In retrospect, this seems very related to your question #83 (comment)
(**) According to Wikipedia,
An algebraic operation may also be defined simply as a function from a Cartesian power of a set to the same set.
(***) Perhaps ironically, arrow's nullif
function is one such operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kernel/src/client/json.rs
Outdated
} | ||
} | ||
|
||
fn read_from_json<R: BufRead>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this combo of read_from_json
and get_reader
solve the hack parsing we used to have?
Or just move it?
(the new logic looks a lot more complex, trying to figure out why)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'm having trouble understanding how the code works.
In particular, what happens if (when?) buffer read boundaries don't match json record boundaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this combo of read_from_json and get_reader solve the hack parsing we used to have? Or just move it?
I hope it does, or at least improve the situation, see also comment below.
Not sure if this is what you are referring to, but the main thing to wrap my head around here was the the decode
function will return, once it has filled a batch of size batch_size
, which may or may not have consumed to whole buffer?
At lest from the docs it seemed, that the decoder can handle seeing incomplete data. Not sure though if that also holds true when we flush
, at which point we should have always consumed the whole (reader) buffer though, so I guess that would be a error in the inout data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I would:
- Add a comment below that states that the closure either reads
batch_size
or until the buffer is empty - add a comment below that
decoded != read
implies that we read more data intobuf
than a single batch, so we return and leave data in the buffer that will be handled in the next time the closure is called. perhaps also note that the data is left in the reader, so it will be in the buf again when we callfill_buf
because we only consumed the decoded amount.
this could also maybe be more clear if you just made a mut res = vec!()
and then looped and appended to it. I think the collect
into a vec won't be much more efficient anyway.
kernel/src/client/json.rs
Outdated
let columns = schema | ||
.fields | ||
.iter() | ||
.map(|field| new_null_array(field.data_type(), null_count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't new_null_array
handle complex types? If so, why do we need to map over individual fields?
I think we just need to convert the schema to a DataType::Struct, perhaps via From for Fields? Tho in typical arrow-rust fashion, top-level vs. nested is just different enough that it might not be worth the trouble...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its like you said ...
top-level vs. nested is just different enough that it might not be worth the trouble
... or at least I thought so.
To create a Record batch we need a Vec, so we do make use of new_null_array
s ability to create complex types, but as it always creates a single array, we invoke it for every top level field.
Once we move to passing EngineData
around, I thought about not using record batches anymore, but rather do everything via ArrayRef
, casting to StructArray
where we have RecordBatch
right now. This would have the benefit of just having one type we pass around, rather then two - e.g. the ExpressionEvaluator
takes a RecordBatch
and returns an ArrayRef
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of just using ArrayRef
everywhere.
Given that RecordBatch
implements From<StructArray> and StructArray
implements From<RecordBatch>, it would seem arrow-rust at least tacitly recognizes the redundancy as well.
kernel/src/client/json.rs
Outdated
let mut value_count = 0; | ||
let mut value_start = 0; | ||
|
||
for it in 0..json_strings.len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is probably correct, but seems hard to grok and maintain. Is there any way we could simplify it?
If I'm not mistaken, the loop is basically breaking the input array into alternating null and non-null segments, and then replacing each segment with either its parsed result or nulls?
for it in 0..json_strings.len() { | |
// Early out here, because loop below can't handle an empty batch | |
if (... empty ...) return ...; | |
// Algo: Start a run that includes only element 0. Keep adding to the run as long | |
// as the "polarity" (null vs. non-null) matches. Upon encountering a polarity change, | |
// emit the previous run and start a new run with the new polarity. When the loop | |
// exits, we just need to emit the final (possibly also first) run and we're done! | |
let schema_as_struct = /* see other PR comment */; | |
let run_is_null = json_strings.is_null(mark) | |
let mut mark = 0; | |
// I forgot the magic incantation for inner functions that capture state... | |
fn emit_run|...|(...) { | |
if (run_is_null) { | |
insert_nulls(&mut batches, it - mark, &schema_as_struct); | |
} else { | |
// ... parse and insert the run of json values | |
} | |
} | |
for it in 1..json_strings.len() { | |
let value_is_null = json_strings.is_null(it); | |
if run_is_null != value_is_null { | |
// polarity change! emit the previous run | |
emit_run(...); | |
run_is_null = value_is_null; | |
mark = it; | |
} | |
emit_run(...); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: It would be helpful to explain why we go through such care to parse each non-null array segment as a group, instead of one-at-a-time or all-at-once?
I suspect we don't want to parse single values because it's expensive to fire up the parsing machinery and we want to amortize the cost as much as possible.
We also can't "just" parse the raw array as-is because the Arrow spec for variable-sized binary data says that, although the offsets for entries must be monotonically increasing,
It should be noted that a null value may have a positive slot length. That is, a null value may occupy a non-empty memory space in the data buffer. When this is true, the content of the corresponding memory space is undefined.
... so we can only safely parse contiguous chunks of non-null values.
That said, given that we anyway have to feed values into the parser's buffer before consuming them, I wonder if we instantiate the parsing machinery just once, and feed values to it as needed? If so, it should cost about the same parse one at a time in a simple for-loop. As a bonus, it would also harden our implementation against the worst-case where nulls and non-nulls alternate with run sizes all 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this part of the code I was never really happy with.
A main difference to hack_parse
is that we would create a new decoder (via a higer level API) for each row, and in addition, have every rows represented as a &str
and convert back to bytes before passing to the parser. So what I think our main gain here is to create a single decoder, and always operate on the raw bytes without creating an intermediary string. Although I think the conversion is quite optimized, sice arrow string arrays are guaranteed utf8 encoded, and i think they con omit some cheks..
As for the loops ... The decoder will not give us back null rows, where the input value is null, which is why we have to fill these. From that I though we do have to keep track of the null runs at the very least. I guess instantiating the reader / cursor should not be too expensive and we could pass each valid row immediately to the decoder. However, this comes at a price as well, since the decoder will emit a new batch once the internal buffer has reached batch_size
number of rows and we do have to flush whenever we switch polarity to have the matching non-null / null layout. Here I felt just tracking the run might be easier?
I'll try and simplify a bit using emit_run
, but see no clear way yet to make it fulyl clean. Then again, during your reviews we already eliminated a lot of comlexity before, so lets see :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing maybe worth mentioning, the arrow Decoder
seems to implemnt a fairly sophisticated algorithm which tries to vectorize the parsing - "inspired by" simdjson's approach.
Without having actually looked at the internals i felt this might benefit from recievieng larger chunks of data at once, so that some of these optimizations can take effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably our best hope is to keep tracking runs, but simplify the run-tracking code as suggested above. It would still behave poorly in the worst case, but for Delta metadata reads, we expect either long runs of nulls (non-file actions), or long runs of values (file actions) and so the worst case should be super rare.
kernel/src/expressions/scalars.rs
Outdated
fn str_parse_scalar<T: std::str::FromStr>( | ||
&self, | ||
raw: &str, | ||
f: impl FnOnce(T) -> Scalar, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This took a while to unpack. If I understand correctly we're passing some Scalar
variant's (implicit) constructor function as the FnOnce
here? So e.g. a caller who passes Scalar::Double
sets T: f64
(inferred from the constructor's argument type), leverages FromStr
for f64
to convert string to f64
, and then the result is passed to the constructor?
(maybe a code comment explaining the magic could help)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, agree on a comment.
Also nit: Just call this parse_scalar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that name is already taken :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parse_scalar_impl
, to make clear it's a helper for parse_scalar
?
(the current str_parse_scalar
name carries no intuition for me, at least)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah right. well, not to bikeshed too much. I'd suggest parse_str_as_scalar
then, but fine as is too.
a comment explaining what it does would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to add:
This PR updated the data skipping logic to better leverage the engines capabilities and reduce our arrow-exposure in core kernel. This best reviewed commit by commit as it permeates quite far. If more helpful, I can also split up this PR via these commits.
Splitting is usually good... but I don't know how much the three overlap? If it's just one giant mess of conflicts, maybe not?
This seems out of date?
- Add new NULLIF expression
- Evaluate is_null(null_if(..)) logic via expression evaluator
@@ -109,6 +123,78 @@ impl From<String> for Scalar { | |||
|
|||
// TODO: add more From impls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: We currently have From for i32 and i64, but we have scalars for i8 and i16. If somebody says Scalar::from(10u8)
will they get Scalar::Integer
or a compiler error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now this would get an error, but just added the missing implementations. so now you would get Scalar::byte
.
kernel/src/expressions/scalars.rs
Outdated
fn str_parse_scalar<T: std::str::FromStr>( | ||
&self, | ||
raw: &str, | ||
f: impl FnOnce(T) -> Scalar, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, agree on a comment.
Also nit: Just call this parse_scalar
kernel/src/scan/mod.rs
Outdated
.as_any() | ||
.downcast_ref::<StructArray>() | ||
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))? | ||
.into() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make it a little cleaner with:
use arrow_array::cast::AsArray;
...
evaluator
.evaluate(&batch)?
.as_struct_opt()
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
.into()
kernel/src/scan/mod.rs
Outdated
fn get_partition_value(raw: &str, data_type: &DataType) -> DeltaResult<Scalar> { | ||
match data_type { | ||
DataType::Primitive(primitive) => primitive.parse_scalar(raw), | ||
_ => todo!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so then... nit: make this return an Error
instead of the todo!()
panic :)
kernel/src/client/json.rs
Outdated
} | ||
} | ||
|
||
fn read_from_json<R: BufRead>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I would:
- Add a comment below that states that the closure either reads
batch_size
or until the buffer is empty - add a comment below that
decoded != read
implies that we read more data intobuf
than a single batch, so we return and leave data in the buffer that will be handled in the next time the closure is called. perhaps also note that the data is left in the reader, so it will be in the buf again when we callfill_buf
because we only consumed the decoded amount.
this could also maybe be more clear if you just made a mut res = vec!()
and then looped and appended to it. I think the collect
into a vec won't be much more efficient anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ~all nits at this point, except #83 (comment); and even that can potentially be fixed as a follow-up to unblock this PR.
kernel/src/client/expression.rs
Outdated
"Variadic {expression:?} is expected to return boolean results, got {:?}", | ||
result_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not:
"Variadic {expression:?} is expected to return boolean results, got {:?}", | |
result_type | |
"Variadic {expression:?} should return a boolean result, got {result_type:?}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b/c it was late :D, fixed.
kernel/src/scan/mod.rs
Outdated
data_type: &DataType, | ||
) -> DeltaResult<Scalar> { | ||
match raw { | ||
None | Some(None) => Ok(Scalar::Null(data_type.clone())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just make this the second case and use a match-all?
_ => Ok(Scalar::Null(data_type.clone())),
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f)) | ||
.collect::<Result<Vec<ArrowField>, ArrowError>>()?; | ||
|
||
let fields: Vec<ArrowField> = s.fields().map(TryInto::try_into).try_collect()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think type inference would allow just:
let fields: Vec<ArrowField> = s.fields().map(TryInto::try_into).try_collect()?; | |
Ok(ArrowSchema::new(s.fields().map(TryInto::try_into).try_collect()?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed this ...
Ok(ArrowSchema::new(
s.fields()
.map(TryInto::try_into)
.try_collect::<_, Vec<ArrowField>, _>()?,
))
... which fmt wants on three lines, so thought the current way is a little more concise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree current way is better. I guess could also do Vec<_>
, since AFAIK that's what the type inference can't figure out on its own (not sure why). But type clarity is also good, so probably we should leave it as-is.
kernel/src/client/json.rs
Outdated
.collect::<Vec<_>>() | ||
.into_iter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is just copied code, but it seems redundant to call collect
and into_iter
back to back like this?
stats_schema | ||
.fields | ||
.iter() | ||
.map(|field| new_null_array(field.data_type(), 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we discussed this somewhere else, but just confirming: arrow for some reason treats "schema" and "struct" as somehow different concepts, with no obviously easy way to convert between them, so we have to manually build up a struct here, rather than passing the schema directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tho looking at the docs, it seems like this might work?
stats_schema | |
.fields | |
.iter() | |
.map(|field| new_null_array(field.data_type(), 1)) | |
new_null_array(DataType::Struct(stats_schema.fields.clone()), 1) |
https://arrow.apache.org/rust/arrow_schema/struct.Schema.html
https://arrow.apache.org/rust/arrow_schema/fields/struct.Fields.html
https://arrow.apache.org/rust/arrow_schema/enum.DataType.html#variant.Struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turs out it almost works. but trying to do
let arr = new_null_array(&DataType::Struct(stats_schema.fields.clone()), 1);
Ok(arr.as_struct().into())
leads to runtime errors since it will make top level fields nullable, which the RecordBatch does not allow. Once we move to only moving ArrayRef
s around we should be able to make this change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... this comes down to the difference between null struct (= definitely not allowed nor even sensible for a record batch), vs. struct whose fields are all null (allowed, and what your code was doing). We probably want to keep the struct-of-nulls behavior even after moving to ArrayRef
, because otherwise we'd have to check whether the whole thing is null before accessing any of its columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, as you said :)
kernel/src/scan/data_skipping.rs
Outdated
.ok_or(Error::UnexpectedColumnType( | ||
"Expected type 'StructArray'.".into(), | ||
))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
.ok_or(Error::UnexpectedColumnType( | |
"Expected type 'StructArray'.".into(), | |
))? | |
.ok_or(Error::unexpectedColumnType("Expected type 'StructArray'."))? |
kernel/src/scan/mod.rs
Outdated
.filter(|f| { | ||
!self | ||
.snapshot | ||
.metadata() | ||
.partition_columns | ||
.contains(f.name()) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is that a potentially expensive inner loop access? I wonder if it might be easier to grok (as well as cheaper) by capturing a variable instead?
let partition_columns = self.snapshot.metadata().partition_columns;
and then
.filter(|f| { | |
!self | |
.snapshot | |
.metadata() | |
.partition_columns | |
.contains(f.name()) | |
}) | |
.filter(|f| !partition_columns.contains(f.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(especially since we could reuse partition_columns
at L161 below)
kernel/src/client/expression.rs
Outdated
(VariadicOperation { .. }, _) => { | ||
// NOTE: If we get here, it would be a bug in our code. However it does swallow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, would something like this work?
(VariadicOperation { op, exprs }, None | Some(&DataType::BOOLEAN)) => {
Co-authored-by: Ryan Johnson <ryan.johnson@databricks.com>
kernel/src/client/expression.rs
Outdated
// NOTE: If we get here, it would be a bug in our code. However it does swallow | ||
// the error message from the compiler if we add variants to the enum and forget to add them here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the note is no longer accurate? Now shouldn't swallow any compile-time errors for new variants, because the match at L202-205 would become incomplete, and the generic case only applies if the caller passed Some incompatible data type?
kernel/src/client/json.rs
Outdated
@@ -106,7 +104,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> { | |||
json_strings | |||
.iter() | |||
.map(|json_string| hack_parse(&output_schema, json_string)) | |||
.collect::<Result<Vec<_>, _>>()? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another case where type inference isn't working nicely.
I think the original code worked better, with "only" two underscores instead of three...
kernel/src/expressions/mod.rs
Outdated
Self::BinaryOperation { | ||
op: BinaryOperator::Distinct, | ||
left: Box::new(self), | ||
right: Box::new(other), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self::BinaryOperation { | |
op: BinaryOperator::Distinct, | |
left: Box::new(self), | |
right: Box::new(other), | |
Self::binary(BinaryOperator::Distinct, self, other) |
kernel/src/scan/data_skipping.rs
Outdated
.ok_or(Error::unexpected_column_type( | ||
"Expected type 'StructArray'.", | ||
))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit surprising that this doesn't fit on one line, but rustfmt does what it does, I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some pending answers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this LGTM. Thanks so much!
There are a few open suggestions from Ryan that should probably be applied or resolved (if you don't want to apply them for some reason), but they are mostly minor so... Approved!
kernel/src/client/json.rs
Outdated
json_strings | ||
.iter() | ||
.map(|json_string| hack_parse(&output_schema, json_string)) | ||
.collect::<Result<Vec<_>, _>>()? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we really want to simplify this:
let output: Vec<_> =
json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.try_collect()?;
Ok(concat_batches(&output_schema, output.iter())?)
based on #81
This PR updated the data skipping logic to better leverage the engines capabilities and reduce our arrow-exposure in core kernel. This best reviewed commit by commit as it permeates quite far. If more helpful, I can also split up this PR via these commits.
Struct
expression as suggested by @ryan-johnson-databrickshack_parse
function.DISTINCT
binary expressioncloses: #69
closes: #68