Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage more engine capabilities in data skipping 2/n #83

Merged
merged 15 commits into from
Feb 15, 2024
Merged
2 changes: 1 addition & 1 deletion acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ tar = "0.4"

[dev-dependencies]
arrow = { version = "^49.0", features = ["json", "prettyprint"] }
datatest-stable = "0.1.3"
datatest-stable = "0.2"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tempfile = "3"
test-case = { version = "3.1.0" }
Expand Down
22 changes: 15 additions & 7 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@ version.workspace = true

[dependencies]
arrow-array = { version = "^49.0" }
arrow-arith = { version = "^49.0" }
arrow-json = { version = "^49.0" }
arrow-ord = { version = "^49.0" }
arrow-schema = { version = "^49.0" }
arrow-select = { version = "^49.0" }
bytes = "1.4"
chrono = { version = "0.4", optional = true }
chrono = { version = "0.4" }
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
indexmap = "2.2.1"
roeap marked this conversation as resolved.
Show resolved Hide resolved
itertools = "0.12"
lazy_static = "1.4"
# need to generalize over arrow, arrow2 and diff parquet etc. (BYOP)
regex = "1.8"
roaring = "0.10.1"
serde = { version = "1", features = ["derive"] }
Expand All @@ -37,6 +33,10 @@ z85 = "3.0.5"
visibility = "0.1.0"

# Used in default client
arrow-arith = { version = "^49.0", optional = true }
arrow-json = { version = "^49.0", optional = true }
arrow-ord = { version = "^49.0", optional = true }
arrow-schema = { version = "^49.0", optional = true }
futures = { version = "0.3", optional = true }
object_store = { version = "^0.8.0", optional = true }
parquet = { version = "^49.0", optional = true, features = [
Expand All @@ -49,7 +49,15 @@ tokio = { version = "1", optional = true, features = ["rt-multi-thread"] }

[features]
default = ["default-client"]
default-client = ["chrono", "futures", "object_store", "parquet"]
default-client = [
"arrow-arith",
"arrow-json",
"arrow-ord",
"arrow-schema",
"futures",
"object_store",
"parquet",
]
developer-visibility = []

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn main() {
let scan = ScanBuilder::new(snapshot).build();

let schema = scan.schema();
let header_names = schema.fields.iter().map(|field| {
let header_names = schema.fields().map(|field| {
let cell = Cell::new(field.name());
if cli.ascii {
cell
Expand Down
11 changes: 5 additions & 6 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ fn struct_array_to_map(arr: &StructArray) -> DeltaResult<HashMap<String, Option<
mod tests {
use std::sync::Arc;

use arrow_array::ArrayRef;
use object_store::local::LocalFileSystem;

use super::*;
Expand All @@ -534,13 +535,12 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
handler.parse_json(json_strings, output_schema).unwrap()
}
Expand Down Expand Up @@ -597,15 +597,14 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
let batch = handler.parse_json(json_strings, output_schema).unwrap();

Expand Down
32 changes: 7 additions & 25 deletions kernel/src/client/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ impl TryFrom<&StructType> for ArrowSchema {
fn try_from(s: &StructType) -> Result<Self, ArrowError> {
let fields = s
.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;
roeap marked this conversation as resolved.
Show resolved Hide resolved

Ok(ArrowSchema::new(fields))
Expand Down Expand Up @@ -103,23 +102,12 @@ impl TryFrom<&DataType> for ArrowDataType {
PrimitiveType::Boolean => Ok(ArrowDataType::Boolean),
PrimitiveType::Binary => Ok(ArrowDataType::Binary),
PrimitiveType::Decimal(precision, scale) => {
let precision = u8::try_from(*precision).map_err(|_| {
ArrowError::SchemaError(format!(
"Invalid precision for decimal: {}",
precision
))
})?;
let scale = i8::try_from(*scale).map_err(|_| {
ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale))
})?;

if precision <= 38 {
Ok(ArrowDataType::Decimal128(precision, scale))
} else if precision <= 76 {
Ok(ArrowDataType::Decimal256(precision, scale))
if precision <= &38 {
Ok(ArrowDataType::Decimal128(*precision, *scale))
} else {
// NOTE: since we are converting from delta, we should never get here.
Err(ArrowError::SchemaError(format!(
"Precision too large to be represented in Arrow: {}",
"Precision too large to be represented as Delta type: {} > 38",
roeap marked this conversation as resolved.
Show resolved Hide resolved
precision
)))
}
Expand All @@ -137,8 +125,7 @@ impl TryFrom<&DataType> for ArrowDataType {
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
Expand Down Expand Up @@ -226,12 +213,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal(
*p as i32, *s as i32,
))),
ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal(
*p as i32, *s as i32,
))),
ArrowDataType::Decimal128(p, s) => Ok(DataType::decimal(*p, *s)),
ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Expand Down
Loading
Loading