Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added more conversions from parquet (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 22, 2021
1 parent f583531 commit 349b276
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 5 deletions.
2 changes: 2 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def case_basic_nullable(size=1):
pa.field("decimal_9", pa.decimal128(9, 0)),
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
]
schema = pa.schema(fields)

Expand All @@ -43,6 +44,7 @@ def case_basic_nullable(size=1):
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"timestamp_us": int64 * size,
},
schema,
f"basic_nullable_{size*10}.parquet",
Expand Down
55 changes: 51 additions & 4 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,41 @@ fn dict_read<
)
}
Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} => match (physical_type, logical_type) {
(PhysicalType::Int96, _) => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
(_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit {
ParquetTimeUnit::MILLIS(_) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i64| x * 1_000_000,
)
}
ParquetTimeUnit::MICROS(_) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i64| x * 1_000,
)
}
ParquetTimeUnit::NANOS(_) => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i64| x,
),
},
_ => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
Expand Down Expand Up @@ -243,14 +271,33 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
}

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_array(
ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} => match (physical_type, logical_type) {
(PhysicalType::Int96, _) => primitive::iter_to_array(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
nested,
int96_to_i64_ns,
),
(_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit {
ParquetTimeUnit::MILLIS(_) => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| {
x * 1_000_000
})
}
ParquetTimeUnit::MICROS(_) => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| {
x * 1_000
})
}
ParquetTimeUnit::NANOS(_) => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x)
}
},
_ => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x),
},
_ => unreachable!(),
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/schema/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ mod tests {
"string_large",
"decimal_9",
"decimal_18",
"decimal_26"
"decimal_26",
"timestamp_us"
]
);
Ok(())
Expand Down
11 changes: 11 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ pub fn pyarrow_nullable(column: usize) -> Box<dyn Array> {
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(values).to(DataType::Decimal(26, 0)))
}
10 => Box::new(
PrimitiveArray::<i64>::from(i64_values)
.to(DataType::Timestamp(TimeUnit::Microsecond, None)),
),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -392,6 +396,13 @@ pub fn pyarrow_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>>
max_value: Some(9i128),
data_type: DataType::Decimal(26, 0),
}),
10 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Timestamp(TimeUnit::Microsecond, None),
distinct_count: None,
null_count: Some(3),
min_value: Some(0),
max_value: Some(9),
}),
_ => unreachable!(),
})
}
Expand Down
5 changes: 5 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ fn v2_decimal_26_nullable() -> Result<()> {
test_pyarrow_integration(9, 2, "basic", false, false, None)
}

#[test]
fn v1_timestamp_us_nullable() -> Result<()> {
test_pyarrow_integration(10, 1, "basic", false, false, None)
}

#[test]
fn v2_decimal_26_required() -> Result<()> {
test_pyarrow_integration(8, 2, "basic", false, true, None)
Expand Down

0 comments on commit 349b276

Please sign in to comment.