Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage more engine capabilities in data skipping 2/n #83

Merged
merged 15 commits into from
Feb 15, 2024

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Dec 9, 2023

based on #81

This PR updated the data skipping logic to better leverage the engines capabilities and reduce our arrow-exposure in core kernel. This best reviewed commit by commit as it permeates quite far. If more helpful, I can also split up this PR via these commits.

  1. handle partition values
  1. use json handler for stats parsing
  • rewrite json parsing to replace hack_parse function.
  1. evaluate full skipping predicate via evaluator & feedback ... rest
  • Add new DISTINCT binary expression
  • Evaluate skipping predicate via expression evaluator

closes: #69
closes: #68

kernel/src/client/conversion.rs Outdated Show resolved Hide resolved
Comment on lines 216 to 217
// TODO we should be passing an empty batch here, but not sure how
partiton_arrays.push(evaluator.evaluate(&batch)?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this TODO means, sorry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: We're evaluating a Literal expression that needs no inputs...

Another approach might be to get the (top-level) column names, and create a struct of expressions that becomes the output batch:

let mut fields = Vec::with_capacity(...);
for (column, field) in &partition_fields {
  let value_expression = ...;
  fields.push(value_expression);
}
for field in self.schema.fields {
  let column_expression = Expression::Column(field.name);
  fields.push(column_expression);
}

// TODO: Set this up once overall, rather than once per batch!
let evaluator = expression_handler.get_evaluator(batch.schema(), Expression::Struct(fields));
evaluator.evaluate(&batch)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 246 to 247
// TODO the protocol states that an empty string is always a null value
// does this mean that we cannot have empty strings as a string partition value?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta spark hit this as well. It's a limitation of hive value partitioning that Delta inherited, with some spark limitations thrown in for good measure.

The least-bad solution we could come up with was to forcibly interpret empty strings as null on the write path, so that the read path consistently returns null. This indeed means you can't store an empty string partition value (it coerces to null), and also means you can't store an empty string (= null) in a partition column with a not-null constraint. See e.g.
/~https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala#L193

Comment on lines 251 to 258
match data_type {
DataType::Primitive(primitive) => match primitive {
PrimitiveType::String => Ok(Scalar::String(raw.to_string())),
PrimitiveType::Integer => {
Ok(Scalar::Integer(raw.parse::<i32>().map_err(|_| {
Error::ParseError(raw.to_string(), data_type.clone())
})?))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define a helper method to capture this boilerplate?
where

Suggested change
match data_type {
DataType::Primitive(primitive) => match primitive {
PrimitiveType::String => Ok(Scalar::String(raw.to_string())),
PrimitiveType::Integer => {
Ok(Scalar::Integer(raw.parse::<i32>().map_err(|_| {
Error::ParseError(raw.to_string(), data_type.clone())
})?))
}
match data_type {
DataType::Primitive(primitive) => primitive.parse_scalar(raw)

where

impl PrimitiveType {
  pub fn parse_scalar(&self, raw: &str) -> Result<Scalar, Error> {
    match self {
      Self::String => Ok(Scalar::String(raw.to_string())),
      Self::Byte => self.str_parse_scalar(raw, |i| Scalar::Byte(i)),
        ... other numeric types ...
      Self::Double => self.str_parse_scalar(raw, |i| Scalar::Double(i)),
        ... remaining types (decimal, bool, date/time)
    }
  }

  fn str_parse_scalar<T: std::str::FromStr>(
    &self,
    raw: &str,
    f: impl FnOnce(T) -> Scalar
  ) -> Result<Scalar, Error> {
    match raw.parse() {
        Ok(val) => Ok(f(val)),
        Err(..) => Err(Error::ParseError(raw.to_string(), DataType::Primitive(self))),
    }
  }
}

(I tested it in rust playground, and the compiler is in fact able to do the necessary type inference!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably factor out the error handling as well, since not all primitive types would use str_parse_scalar method:

fn parse_error(&self, raw: &str) -> Error {
  Error::ParseError(raw.to_string(), DataType::Primitive(self))
}

and then

    match raw.parse() {
        Ok(val) => Ok(f(val)),
        Err(..) => Err(self.parse_error(raw)),
    }

(we can't "just" factor out the ParseError to top-level, because the error type of str::parse<T> result depends on T)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These updates are included in handle partition values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI Delta spec doesn't require the file names to use hive-style partitioning scheme (the actual partition values come from the file's Add metadata entry). But it doesn't hurt either.

@roeap roeap force-pushed the partition-values branch 4 times, most recently from 4d228b4 to 7c26fd4 Compare December 16, 2023 17:36
@roeap roeap changed the title [WIP] feat: handle partition values [WIP] leverage more engine capabilities in data skipping Dec 16, 2023
@roeap roeap force-pushed the partition-values branch 2 times, most recently from f3d18dc to d14eae8 Compare December 16, 2023 18:01
@roeap roeap marked this pull request as ready for review December 16, 2023 18:27
@roeap roeap force-pushed the partition-values branch 6 times, most recently from 039561b to 8eefc31 Compare December 17, 2023 13:18
@roeap roeap changed the title [WIP] leverage more engine capabilities in data skipping [WIP] leverage more engine capabilities in data skipping 2/n Dec 17, 2023
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
kernel/src/client/expression.rs Show resolved Hide resolved
kernel/src/client/expression.rs Show resolved Hide resolved
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
Comment on lines 224 to 225
.as_any()
.downcast_ref::<StructArray>()
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
.into()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: This casting idiom seems to show up a lot, and very bloaty.
Any thoughts on how we might be able to factor out the bloat to a helper of some kind?

Copy link
Collaborator Author

@roeap roeap Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do the same as you recently did for expressions etc and define helpers

impl Error {
    pub fn unexpected_column_type(msg: impl Into<String>) -> Self ...
}

this would also allow us to harmonize messages, which IIRC are still inconsistent:

impl Error {
    pub fn unexpected_column_type(expected: &DataType, found: &DataType) -> Self ...
}

It seems we should do a pass trough the errors soon anyhow, since the internal errors still have variants for arrow / parquet, and we should floow up on the discussion you started in slack. Maybe something for todays sync?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make it a little cleaner with:

use arrow_array::cast::AsArray;
...
                evaluator
                        .evaluate(&batch)?
                        .as_struct_opt()
                        .ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
                        .into()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, there is likely some more opportunity to simplify things with AsArray as we move forward.

fn get_partition_value(raw: &str, data_type: &DataType) -> DeltaResult<Scalar> {
match data_type {
DataType::Primitive(primitive) => primitive.parse_scalar(raw),
_ => todo!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really a TODO? AFAIK the spec only allows primitive values? If anything, Primitive might be too permissive, if the spec fails to mention some primitive type we support?

Or does the spec require us to support some non-primitive scalar types? If so, it might be helpful to spec them out with individual todo!() clauses, and a catch-all clause that only errors out for the remaining unknown and/or unsupported types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: A quick comparison of schema.rs vs. partition value spec suggests that we have a near-perfect match. Just missing the TimestampNTZ type on the rust side, which is a table feature and so arguably ok to leave as TODO for now. All other types should be rejected outright.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so then... nit: make this return an Error instead of the todo!() panic :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

kernel/src/schema.rs Outdated Show resolved Hide resolved
kernel/src/schema.rs Show resolved Hide resolved
Copy link
Collaborator Author

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review @ryan-johnson-databricks - not sure how you feel, but it seems the data type / schema validation is a bigger thing in itself, that deserves a dedicated discussion (and with that PR) and we would focus here on the null-if thig as well as the other comments?

.downcast_ref::<BooleanArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'BooleanArray'.".into(),
))?;

let before_count = actions.num_rows();
let after = filter_record_batch(actions, skipping_vector)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryan-johnson-databricks - the only larger thing where we spill arrow into data skipping is the filtering.

IIRC, you mentioned that we may want to introduce a dedicated API to apply filter vectors to data? Related to that, do we have a plan already how we create and engine-specific filter vector form the deletion vector?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems to have been in pending state since december - i certainly did not add it now 😆

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good one to double check w/ kernel-jvm folks at our next Thursday sync. At least at one point they were passing around (selection vector, columnar batch) pairs, and letting engine combine those if it wanted. I don't know what they did for creating the DV selection vector -- we should check that as well -- but I favor creating a boolean array directly, and asking engine to copy that to whatever internal format it likes.

kernel/src/client/expression.rs Show resolved Hide resolved
kernel/src/client/expression.rs Show resolved Hide resolved
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
BinaryOperation { op, left, right } => {
let left_arr = evaluate_expression(left.as_ref(), batch)?;
let right_arr = evaluate_expression(right.as_ref(), batch)?;
(BinaryOperation { op, left, right }, _) => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - i guess the first question to answer is if we want to include some kind of casting here as well and allow e.g float + int, in which case i would probably take the "biggest" type of left / right and make sure the result is of that type, otherwise probably left == right == result? not sure, but i think arrow will always require inputs of the same type.

in case of comparisons it must be boolean, but we may proactively check equality of the input types.

this may raise the question - if we want to validate a lot of things, and not let arrow raise for us, we may want to consider splitting comparisons and arithmetics. but not totally sure that thats wirth it.

Comment on lines 104 to 105
// TODO how to model required functions?
NullIf {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the time i felt that the operations were modelled consistently, but with adding additional functions that don't fit in these categories, I at least wanted to discuss if we just keep adding a new variant per function (if we even need more), or should have some additional grouping ...

Comment on lines 127 to 121
static ref FILTER_EXPR: Expr = Expr::is_null(Expr::null_if(
Expr::column("predicate"),
Expr::column("predicate"),
));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both options also have the benefit of us not havig to pre-compute "predicate" as a separate batch ... out of those I think I like distinct better, due to it being (at least i think so :)) more directly what we want, in contrast to coalesce, which if i read that right ...

Returns the data type of expression with the highest data type precedence.

... is also kind of "dynamic" in its return type and allows for mixed inputs.

somehow I also keep wondering if and_kleene might help as well, do you think thats worth looking into?

let reducer = match op {
VariadicOperator::And => and,
VariadicOperator::Or => or,
};
exprs
.iter()
.map(|expr| evaluate_expression(expr, batch))
.map(|expr| evaluate_expression(expr, batch, Some(&DataType::BOOLEAN)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think yes ...

kernel/src/client/expression.rs Outdated Show resolved Hide resolved
@ryan-johnson-databricks
Copy link
Contributor

it seems the data type / schema validation is a bigger thing in itself, that deserves a dedicated discussion (and with that PR) and we would focus here on the null-if thig as well as the other comments?

For sure. Sorry if that wasn't clear from my comments.

@roeap roeap changed the title [WIP] leverage more engine capabilities in data skipping 2/n Leverage more engine capabilities in data skipping 2/n Feb 1, 2024
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
kernel/src/client/conversion.rs Outdated Show resolved Hide resolved
kernel/src/client/expression.rs Outdated Show resolved Hide resolved
}
(Distinct { lhs, rhs }, Some(&DataType::BOOLEAN)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a new top-level expression type? Can Distinct to be a new BinaryOperator instead?

        (BinaryOperation { op, left, right }, _) => {
              ...
            let eval: Operation = match op {
                  ... 
                Equal => |l, r| eq(l, r).map(wrap_comparison_result),
                NotEqual => |l, r| neq(l, r).map(wrap_comparison_result),
+               Distinct => |l, r| distinct(l, r).map(wrap_comparison_result),
            };

(bonus: whatever type checking we eventually add would then benefit all comparison operators)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: In a future PR that adds better type checking, should we introduce a ComparisonOperator sub-enum, for things that map (T, T) -> bool? And if we did that, should we also add an AlgebraicOperator (**) sub-enum, for things that map (T, T) -> T? That would capture the vast majority of binary operations in a structured way, while still allowing to add arbitrary other binary operators if needed (***)?

Edit: In retrospect, this seems very related to your question #83 (comment)

(**) According to Wikipedia,

An algebraic operation may also be defined simply as a function from a Cartesian power of a set to the same set.

(***) Perhaps ironically, arrow's nullif function is one such operator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kernel/src/client/expression.rs Outdated Show resolved Hide resolved
}
}

fn read_from_json<R: BufRead>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this combo of read_from_json and get_reader solve the hack parsing we used to have?
Or just move it?
(the new logic looks a lot more complex, trying to figure out why)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm having trouble understanding how the code works.
In particular, what happens if (when?) buffer read boundaries don't match json record boundaries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this combo of read_from_json and get_reader solve the hack parsing we used to have? Or just move it?

I hope it does, or at least improve the situation, see also comment below.

Not sure if this is what you are referring to, but the main thing to wrap my head around here was the the decode function will return, once it has filled a batch of size batch_size, which may or may not have consumed to whole buffer?

At lest from the docs it seemed, that the decoder can handle seeing incomplete data. Not sure though if that also holds true when we flush, at which point we should have always consumed the whole (reader) buffer though, so I guess that would be a error in the inout data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would:

  1. Add a comment below that states that the closure either reads batch_size or until the buffer is empty
  2. add a comment below that decoded != read implies that we read more data into buf than a single batch, so we return and leave data in the buffer that will be handled in the next time the closure is called. perhaps also note that the data is left in the reader, so it will be in the buf again when we call fill_buf because we only consumed the decoded amount.

this could also maybe be more clear if you just made a mut res = vec!() and then looped and appended to it. I think the collect into a vec won't be much more efficient anyway.

let columns = schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), null_count))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't new_null_array handle complex types? If so, why do we need to map over individual fields?
I think we just need to convert the schema to a DataType::Struct, perhaps via From for Fields? Tho in typical arrow-rust fashion, top-level vs. nested is just different enough that it might not be worth the trouble...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its like you said ...

top-level vs. nested is just different enough that it might not be worth the trouble
... or at least I thought so.

To create a Record batch we need a Vec, so we do make use of new_null_arrays ability to create complex types, but as it always creates a single array, we invoke it for every top level field.

Once we move to passing EngineData around, I thought about not using record batches anymore, but rather do everything via ArrayRef, casting to StructArray where we have RecordBatch right now. This would have the benefit of just having one type we pass around, rather then two - e.g. the ExpressionEvaluator takes a RecordBatch and returns an ArrayRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of just using ArrayRef everywhere.

Given that RecordBatch implements From<StructArray> and StructArray implements From<RecordBatch>, it would seem arrow-rust at least tacitly recognizes the redundancy as well.

let mut value_count = 0;
let mut value_start = 0;

for it in 0..json_strings.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is probably correct, but seems hard to grok and maintain. Is there any way we could simplify it?

If I'm not mistaken, the loop is basically breaking the input array into alternating null and non-null segments, and then replacing each segment with either its parsed result or nulls?

Suggested change
for it in 0..json_strings.len() {
// Early out here, because loop below can't handle an empty batch
if (... empty ...) return ...;
// Algo: Start a run that includes only element 0. Keep adding to the run as long
// as the "polarity" (null vs. non-null) matches. Upon encountering a polarity change,
// emit the previous run and start a new run with the new polarity. When the loop
// exits, we just need to emit the final (possibly also first) run and we're done!
let schema_as_struct = /* see other PR comment */;
let run_is_null = json_strings.is_null(mark)
let mut mark = 0;
// I forgot the magic incantation for inner functions that capture state...
fn emit_run|...|(...) {
if (run_is_null) {
insert_nulls(&mut batches, it - mark, &schema_as_struct);
} else {
// ... parse and insert the run of json values
}
}
for it in 1..json_strings.len() {
let value_is_null = json_strings.is_null(it);
if run_is_null != value_is_null {
// polarity change! emit the previous run
emit_run(...);
run_is_null = value_is_null;
mark = it;
}
emit_run(...);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: It would be helpful to explain why we go through such care to parse each non-null array segment as a group, instead of one-at-a-time or all-at-once?

I suspect we don't want to parse single values because it's expensive to fire up the parsing machinery and we want to amortize the cost as much as possible.

We also can't "just" parse the raw array as-is because the Arrow spec for variable-sized binary data says that, although the offsets for entries must be monotonically increasing,

It should be noted that a null value may have a positive slot length. That is, a null value may occupy a non-empty memory space in the data buffer. When this is true, the content of the corresponding memory space is undefined.

... so we can only safely parse contiguous chunks of non-null values.

That said, given that we anyway have to feed values into the parser's buffer before consuming them, I wonder if we instantiate the parsing machinery just once, and feed values to it as needed? If so, it should cost about the same parse one at a time in a simple for-loop. As a bonus, it would also harden our implementation against the worst-case where nulls and non-nulls alternate with run sizes all 1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this part of the code I was never really happy with.

A main difference to hack_parse is that we would create a new decoder (via a higer level API) for each row, and in addition, have every rows represented as a &str and convert back to bytes before passing to the parser. So what I think our main gain here is to create a single decoder, and always operate on the raw bytes without creating an intermediary string. Although I think the conversion is quite optimized, sice arrow string arrays are guaranteed utf8 encoded, and i think they con omit some cheks..

As for the loops ... The decoder will not give us back null rows, where the input value is null, which is why we have to fill these. From that I though we do have to keep track of the null runs at the very least. I guess instantiating the reader / cursor should not be too expensive and we could pass each valid row immediately to the decoder. However, this comes at a price as well, since the decoder will emit a new batch once the internal buffer has reached batch_size number of rows and we do have to flush whenever we switch polarity to have the matching non-null / null layout. Here I felt just tracking the run might be easier?

I'll try and simplify a bit using emit_run, but see no clear way yet to make it fulyl clean. Then again, during your reviews we already eliminated a lot of comlexity before, so lets see :).

Copy link
Collaborator Author

@roeap roeap Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing maybe worth mentioning, the arrow Decoder seems to implemnt a fairly sophisticated algorithm which tries to vectorize the parsing - "inspired by" simdjson's approach.

Without having actually looked at the internals i felt this might benefit from recievieng larger chunks of data at once, so that some of these optimizations can take effect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably our best hope is to keep tracking runs, but simplify the run-tracking code as suggested above. It would still behave poorly in the worst case, but for Delta metadata reads, we expect either long runs of nulls (non-file actions), or long runs of values (file actions) and so the worst case should be super rare.

Comment on lines 186 to 189
fn str_parse_scalar<T: std::str::FromStr>(
&self,
raw: &str,
f: impl FnOnce(T) -> Scalar,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This took a while to unpack. If I understand correctly we're passing some Scalar variant's (implicit) constructor function as the FnOnce here? So e.g. a caller who passes Scalar::Double sets T: f64 (inferred from the constructor's argument type), leverages FromStr for f64 to convert string to f64, and then the result is passed to the constructor?

(maybe a code comment explaining the magic could help)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agree on a comment.

Also nit: Just call this parse_scalar

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that name is already taken :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_scalar_impl, to make clear it's a helper for parse_scalar?
(the current str_parse_scalar name carries no intuition for me, at least)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah right. well, not to bikeshed too much. I'd suggest parse_str_as_scalar then, but fine as is too.

a comment explaining what it does would be great.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to add:

This PR updated the data skipping logic to better leverage the engines capabilities and reduce our arrow-exposure in core kernel. This best reviewed commit by commit as it permeates quite far. If more helpful, I can also split up this PR via these commits.

Splitting is usually good... but I don't know how much the three overlap? If it's just one giant mess of conflicts, maybe not?

This seems out of date?

  • Add new NULLIF expression
  • Evaluate is_null(null_if(..)) logic via expression evaluator

@@ -109,6 +123,78 @@ impl From<String> for Scalar {

// TODO: add more From impls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: We currently have From for i32 and i64, but we have scalars for i8 and i16. If somebody says Scalar::from(10u8) will they get Scalar::Integer or a compiler error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now this would get an error, but just added the missing implementations. so now you would get Scalar::byte.

Comment on lines 186 to 189
fn str_parse_scalar<T: std::str::FromStr>(
&self,
raw: &str,
f: impl FnOnce(T) -> Scalar,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agree on a comment.

Also nit: Just call this parse_scalar

Comment on lines 224 to 225
.as_any()
.downcast_ref::<StructArray>()
.ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
.into()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make it a little cleaner with:

use arrow_array::cast::AsArray;
...
                evaluator
                        .evaluate(&batch)?
                        .as_struct_opt()
                        .ok_or(Error::UnexpectedColumnType("Unexpected array type".into()))?
                        .into()

fn get_partition_value(raw: &str, data_type: &DataType) -> DeltaResult<Scalar> {
match data_type {
DataType::Primitive(primitive) => primitive.parse_scalar(raw),
_ => todo!(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so then... nit: make this return an Error instead of the todo!() panic :)

}
}

fn read_from_json<R: BufRead>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would:

  1. Add a comment below that states that the closure either reads batch_size or until the buffer is empty
  2. add a comment below that decoded != read implies that we read more data into buf than a single batch, so we return and leave data in the buffer that will be handled in the next time the closure is called. perhaps also note that the data is left in the reader, so it will be in the buf again when we call fill_buf because we only consumed the decoded amount.

this could also maybe be more clear if you just made a mut res = vec!() and then looped and appended to it. I think the collect into a vec won't be much more efficient anyway.

kernel/src/client/json.rs Show resolved Hide resolved
kernel/Cargo.toml Show resolved Hide resolved
kernel/src/scan/data_skipping.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ~all nits at this point, except #83 (comment); and even that can potentially be fixed as a follow-up to unblock this PR.

Comment on lines 220 to 221
"Variadic {expression:?} is expected to return boolean results, got {:?}",
result_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not:

Suggested change
"Variadic {expression:?} is expected to return boolean results, got {:?}",
result_type
"Variadic {expression:?} should return a boolean result, got {result_type:?}"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b/c it was late :D, fixed.

data_type: &DataType,
) -> DeltaResult<Scalar> {
match raw {
None | Some(None) => Ok(Scalar::Null(data_type.clone())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just make this the second case and use a match-all?

        _ => Ok(Scalar::Null(data_type.clone())),

.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;

let fields: Vec<ArrowField> = s.fields().map(TryInto::try_into).try_collect()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think type inference would allow just:

Suggested change
let fields: Vec<ArrowField> = s.fields().map(TryInto::try_into).try_collect()?;
Ok(ArrowSchema::new(s.fields().map(TryInto::try_into).try_collect()?))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed this ...

Ok(ArrowSchema::new(
    s.fields()
        .map(TryInto::try_into)
        .try_collect::<_, Vec<ArrowField>, _>()?,
))

... which fmt wants on three lines, so thought the current way is a little more concise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree current way is better. I guess could also do Vec<_>, since AFAIK that's what the type inference can't figure out on its own (not sure why). But type clarity is also good, so probably we should leave it as-is.

Comment on lines 68 to 69
.collect::<Vec<_>>()
.into_iter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is just copied code, but it seems redundant to call collect and into_iter back to back like this?

Comment on lines +75 to +78
stats_schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discussed this somewhere else, but just confirming: arrow for some reason treats "schema" and "struct" as somehow different concepts, with no obviously easy way to convert between them, so we have to manually build up a struct here, rather than passing the schema directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tho looking at the docs, it seems like this might work?

Suggested change
stats_schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), 1))
new_null_array(DataType::Struct(stats_schema.fields.clone()), 1)

https://arrow.apache.org/rust/arrow_schema/struct.Schema.html
https://arrow.apache.org/rust/arrow_schema/fields/struct.Fields.html
https://arrow.apache.org/rust/arrow_schema/enum.DataType.html#variant.Struct

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turs out it almost works. but trying to do

let arr = new_null_array(&DataType::Struct(stats_schema.fields.clone()), 1);
Ok(arr.as_struct().into())

leads to runtime errors since it will make top level fields nullable, which the RecordBatch does not allow. Once we move to only moving ArrayRefs around we should be able to make this change though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... this comes down to the difference between null struct (= definitely not allowed nor even sensible for a record batch), vs. struct whose fields are all null (allowed, and what your code was doing). We probably want to keep the struct-of-nulls behavior even after moving to ArrayRef, because otherwise we'd have to check whether the whole thing is null before accessing any of its columns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, as you said :)

Comment on lines 197 to 199
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StructArray'.".into(),
))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StructArray'.".into(),
))?
.ok_or(Error::unexpectedColumnType("Expected type 'StructArray'."))?

Comment on lines 150 to 156
.filter(|f| {
!self
.snapshot
.metadata()
.partition_columns
.contains(f.name())
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is that a potentially expensive inner loop access? I wonder if it might be easier to grok (as well as cheaper) by capturing a variable instead?

        let partition_columns = self.snapshot.metadata().partition_columns;

and then

Suggested change
.filter(|f| {
!self
.snapshot
.metadata()
.partition_columns
.contains(f.name())
})
.filter(|f| !partition_columns.contains(f.name())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(especially since we could reuse partition_columns at L161 below)

kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
Comment on lines 217 to 218
(VariadicOperation { .. }, _) => {
// NOTE: If we get here, it would be a bug in our code. However it does swallow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, would something like this work?

(VariadicOperation { op, exprs }, None | Some(&DataType::BOOLEAN)) =>  {

kernel/src/client/json.rs Show resolved Hide resolved
Comment on lines 218 to 219
// NOTE: If we get here, it would be a bug in our code. However it does swallow
// the error message from the compiler if we add variants to the enum and forget to add them here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the note is no longer accurate? Now shouldn't swallow any compile-time errors for new variants, because the match at L202-205 would become incomplete, and the generic case only applies if the caller passed Some incompatible data type?

@@ -106,7 +104,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.collect::<Result<Vec<_>, _>>()?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case where type inference isn't working nicely.
I think the original code worked better, with "only" two underscores instead of three...

Comment on lines 280 to 283
Self::BinaryOperation {
op: BinaryOperator::Distinct,
left: Box::new(self),
right: Box::new(other),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Self::BinaryOperation {
op: BinaryOperator::Distinct,
left: Box::new(self),
right: Box::new(other),
Self::binary(BinaryOperator::Distinct, self, other)

Comment on lines 197 to 199
.ok_or(Error::unexpected_column_type(
"Expected type 'StructArray'.",
))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit surprising that this doesn't fit on one line, but rustfmt does what it does, I guess?

Copy link
Collaborator Author

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some pending answers.

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this LGTM. Thanks so much!

There are a few open suggestions from Ryan that should probably be applied or resolved (if you don't want to apply them for some reason), but they are mostly minor so... Approved!

json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.collect::<Result<Vec<_>, _>>()?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really want to simplify this:

        let output: Vec<_> =
            json_strings
                .iter()
                .map(|json_string| hack_parse(&output_schema, json_string))
                .try_collect()?;
        Ok(concat_batches(&output_schema, output.iter())?)

@roeap roeap merged commit 5f48dea into delta-io:main Feb 15, 2024
3 checks passed
@roeap roeap deleted the partition-values branch February 15, 2024 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support adding back in partitionColumns Support reading partitioned tables
3 participants