-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Add readers and writers for the internal object model #11904
base: main
Are you sure you want to change the base?
Conversation
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java
Outdated
Show resolved
Hide resolved
parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java
Outdated
Show resolved
Hide resolved
772f5c2
to
233a00b
Compare
|
||
@Override | ||
public UUID read(UUID reuse) { | ||
return UUIDUtil.convert(column.nextBinary().toByteBuffer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine to me.
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java
Outdated
Show resolved
Hide resolved
@Override | ||
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) { | ||
// accepts ByteBuffer and internally writes as binary. | ||
return ParquetValueWriters.byteBuffers(desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure this writer checks the length of the incoming bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Also, the existing code of GenericParquetWriter
also not having this length check while writing as byte[], I will add that check.
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java
Show resolved
Hide resolved
@@ -359,10 +250,10 @@ public ParquetValueReader<?> primitive( | |||
|
|||
ColumnDescriptor desc = type.getColumnDescription(currentPath()); | |||
|
|||
if (primitive.getOriginalType() != null) { | |||
if (primitive.getLogicalTypeAnnotation() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this change, but please point these kinds of changes out for reviewers.
The old version worked because all of the supported logical type annotations had an equivalent ConvertedType
(which is what OriginalType
is called in Parquet format and the logical type docs).
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { | ||
return Optional.of(new DateReader(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead:
protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC);
parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
Outdated
Show resolved
Hide resolved
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); | ||
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); | ||
|
||
private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with moving the date/time reader classes here.
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { | ||
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct. The unit of the incoming timestamp value still needs to be handled, even if the in-memory representation of the value is the same (a long
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the Spark implementations for this should work well, just like the int96 cases.
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { | ||
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct. Like timestamp, this needs to handle the unit of the incoming value. In addition, millisecond values must annotate an int32 according to the Parquet logical type docs. When the unit is a millisecond value, this needs to call readInt
and multiply by 1000.
It looks like both Spark currently gets the underlying Parquet type for milliseconds wrong (which makes sense because this is never used in Spark). We can go ahead and fix this now and share the reader between Internal and Spark.
private static class TimestampMillisReader extends UnboxedReader<Long> {
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public Long read(Long ignored) {
return readLong();
}
@Override
public long readLong() {
return 1000L * column.nextInteger();
}
}
parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
Outdated
Show resolved
Hide resolved
parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java
Outdated
Show resolved
Hide resolved
@@ -397,7 +404,7 @@ public ParquetValueReader<?> primitive( | |||
case INT96: | |||
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards | |||
// compatibility we try to read INT96 as timestamps. | |||
return new TimestampInt96Reader(desc); | |||
return int96Reader(desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this call timestampReader(desc, NANOS, ((TimestampType) primitive).isAdjustedToUTC())
instead? The primitive type is available from desc
so that method can check and we don't need to have a factory method for INT96.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should assume isAdjustedToUTC
to be true? because primitive don't have TimestampType
and the reader was using OffsetDateTime
,
my understanding is if OffsetDateTime
is used it is with timezone and LocalDateTime
is without timezone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Because the reader produces and OffsetDateTime
, it should only be used when the expected type is compatible.
@@ -76,6 +67,31 @@ protected ParquetValueReader<T> createReader( | |||
protected abstract ParquetValueReader<T> createStructReader( | |||
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType); | |||
|
|||
protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has to return a generic fixed reader here to match the existing behavior. Because this class is public, people may have created subclasses for other object models, and iceberg-parquet
is a module that is covered by revapi
so we make guarantees about API and behavior stability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this class is not widely used, could you also mark it deprecated so that we can make it package-private in 1.9.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do it because I can't move the date/time classes to GenericReader
in that case. It has to be in this class. But yeah, I got your point about behavior change. I will revert that date/time classes movement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajantha-bhat, why can't the reader classes be moved if this returns an instance? The classes are currently private
so there is no direct dependency on them. As long as an instance is returned, wouldn't it work?
parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
Outdated
Show resolved
Hide resolved
return null; | ||
} | ||
|
||
protected Optional<ParquetValueReader<?>> dateReader(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should not return Optional
:
- Iceberg generally prefers returning
null
over usingOptional
- The existing methods (like
createStructReader
) do not wrap return types inOptional
- These methods should always return a valid reader, or should throw an exception to signal that something is not supported
return null; | ||
} | ||
|
||
protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should all return ParquetValueReader<?>
rather than constrain the implementation to a subclass of PrimitiveReader
.
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Millisecond time must be stored as an INT32 according to the spec, so I think this should call column.nextLong
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think this should call column.nextLong instead.
You mean column.nextInt
right?
So the existing code is wrong? I didn't added this code newly.
how do I test these? DataTest
doesn't handle time units and writers write in micro always?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I meant nextInteger
. Thanks for fixing this.
No need to modify the tests in this PR. It would be nice to have a test for millis, but these were written based on the spec originally and we don't need to block to add a test for existing code.
parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java
Outdated
Show resolved
Hide resolved
return Optional.of(new ParquetValueReaders.UUIDReader(desc)); | ||
} | ||
|
||
private static class StructLikeReader extends StructReader<StructLike, StructLike> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm questioning the use of StructLikeReader
vs RecordReader
in GenericParquetReaders
. They both produce the same implementation class (GenericRecord
), and Record
is a StructLike
. Why not move RecordReader
into ParquetValueReaders
and reuse the code?
That would also preserve the comment in RecordReader
that was removed when copying here, but is still relevant to why there is a template
record.
private final GenericRecord template; | ||
|
||
StructLikeReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) { | ||
super(types, readers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The superclass doesn't use types
so it doesn't need to be passed in. This isn't a big deal if you don't want to fix the superclass in this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It touches Flink and other classes. I will handle in separate PR.
@@ -401,6 +404,17 @@ public ByteBuffer read(ByteBuffer reuse) { | |||
} | |||
} | |||
|
|||
public static class UUIDReader extends PrimitiveReader<UUID> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than making the classes here public, I would prefer to introduce factory methods like we have for the Avro readers. I think that gives us more flexibility to change implementations and move classes around without worrying because none of them need to be public.
|
||
@Override | ||
public void write(int repetitionLevel, UUID value) { | ||
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(UUIDUtil.convert(value))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The byte array passed here is allocated, not reused. It probably should be reused, like the one in Spark's writers.
|
||
private InternalWriter() {} | ||
|
||
public static ParquetValueWriter<StructLike> buildWriter(MessageType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better as create
as well.
return Optional.of(ParquetValueWriters.uuids(desc)); | ||
} | ||
|
||
private static class StructLikeWriter extends StructWriter<StructLike> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need both RecordWriter
and StructLikeWriter
because this can be used to write records. Can you remove RecordWriter
and use this instead?
return Optional.empty(); | ||
} | ||
|
||
protected Optional<ParquetValueWriters.PrimitiveWriter<?>> uuidWriter(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feedback here is similar to the read path:
- Should not use Optional
- Should produce the same writer as before
- Should deprecate this class so that we can make it package-private
- Should not need a factory method for UUID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI just failed for ArrowReaderTest > testReadAll().
Generic writer expects byte[] based UUID handling and internal reader and writer works directly on UUID object. So, we do need a factory method here as we need to supply different kinds of UUID representation.
c543108
to
3bb323b
Compare
@rdblue: Thanks for the review. I have addressed all the comments. Please take another look. |
: new TimestampMillisReader(desc); | ||
case NANOS: | ||
if (isAdjustedToUTC) { | ||
return new TimestampInt96Reader(desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct. The INT96 timestamp reader is used when the underlying type is INT96, not when the Iceberg or Parquet type has nanosecond precision. Nanosecond precision timestamps were recently introduced and are not used to signal that Parquet stores timestamp as an INT96.
This should match the previous behavior, where this is used when underlying physical type (desc.getPrimitiveType().getPrimitiveTypeName()
) is INT96.
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()), | ||
"Cannot write time in %s, only MICROS is supported", | ||
timeType.getUnit()); | ||
return Optional.ofNullable(timeWriter(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this use ofNullable
? The intent is not to expose BaseParquetWriter
beyond its package, so there is no need to handle cases where the factory method return null
. Those are error cases.
@Override | ||
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit( | ||
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { | ||
return Optional.ofNullable(uuidWriter(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any need to make this more flexible. It makes sense to support the new UUIDWriter
to ensure the tests pass with all primitive types, but this doesn't need to introduce a way to override the writer and representation (same for the read side).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generic writers handle UUID logical type by retuning default empty(), so that it will be handled as primitive writer of fixed length byte[] here
/~https://github.com/apache/iceberg/blob/5b13760c01bdbb2ab15be072c582adb2a8792f23/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java#L131C1-L132C1
But internal writers needs new UUID based writer. Hence, I added an override to return empty writer for generic types (default) and UUID based writer for internal type.
return new ParquetValueReaders.UUIDReader(desc); | ||
} | ||
|
||
public static ParquetValueReader<Long> timestampInt96Reader(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To match the other method names, I think this should be int96Timestamps
.
return new ParquetValueReaders.TimestampInt96Reader(desc); | ||
} | ||
|
||
public static ParquetValueReader<Long> timestampMillisReader(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably rename this as well, maybe millisAsTimestamps
?
@@ -106,6 +114,10 @@ public static PrimitiveWriter<ByteBuffer> byteBuffers(ColumnDescriptor desc) { | |||
return new BytesWriter(desc); | |||
} | |||
|
|||
public static PrimitiveWriter<ByteBuffer> fixedBuffer(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Most other methods are plural. How about fixedBuffers
?
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
return ParquetValueReaders.timestampMillisReader(desc); | ||
} else if (unit == LogicalTypeAnnotation.TimeUnit.NANOS) { | ||
return ParquetValueReaders.timestampInt96Reader(desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed as well.
} | ||
|
||
protected ParquetValueWriters.PrimitiveWriter<?> uuidWriter(ColumnDescriptor desc) { | ||
// Use primitive-type writer; no special writer needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no primitive writer for UUIDs. Maybe this was a copy/paste error?
return new FixedWriter(desc); | ||
} | ||
|
||
protected ParquetValueWriters.PrimitiveWriter<?> dateWriter(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods should return ParquetValueWriter<?>
and not a specific class.
} | ||
} | ||
|
||
public static class RecordWriter<T extends StructLike> extends StructWriter<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than adding a public class, can you add a factory method for this?
Splitted into 3 commits,
a) Refactor BaseParquetWriter to only keep common functionality required for internal and generic writer.
b) Refactor BaseParquetReaders to only keep common functionality required for internal and generic reader.
c) Add internal writer and reader that consumes and produces the Iceberg in-memory data model.