Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Add readers and writers for the internal object model #11904

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

ajantha-bhat
Copy link
Member

Splitted into 3 commits,

a) Refactor BaseParquetWriter to only keep common functionality required for internal and generic writer.
b) Refactor BaseParquetReaders to only keep common functionality required for internal and generic reader.
c) Add internal writer and reader that consumes and produces the Iceberg in-memory data model.

@ajantha-bhat ajantha-bhat marked this pull request as draft January 3, 2025 16:27
@ajantha-bhat ajantha-bhat requested a review from rdblue January 3, 2025 16:30
@ajantha-bhat ajantha-bhat reopened this Jan 4, 2025
@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from 772f5c2 to 233a00b Compare January 6, 2025 11:33
.palantir/revapi.yml Outdated Show resolved Hide resolved
@rdblue rdblue changed the title Parquet: Internal writer and reader Parquet: Add readers and writers for the internal object model Jan 7, 2025

@Override
public UUID read(UUID reuse) {
return UUIDUtil.convert(column.nextBinary().toByteBuffer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine to me.

@Override
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) {
// accepts ByteBuffer and internally writes as binary.
return ParquetValueWriters.byteBuffers(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this writer checks the length of the incoming bytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Also, the existing code of GenericParquetWriter also not having this length check while writing as byte[], I will add that check.

@@ -359,10 +250,10 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please point these kinds of changes out for reviewers.

The old version worked because all of the supported logical type annotations had an equivalent ConvertedType (which is what OriginalType is called in Parquet format and the logical type docs).

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead:

  protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
  protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
  protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC);

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving the date/time reader classes here.

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The unit of the incoming timestamp value still needs to be handled, even if the in-memory representation of the value is the same (a long).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the Spark implementations for this should work well, just like the int96 cases.

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. Like timestamp, this needs to handle the unit of the incoming value. In addition, millisecond values must annotate an int32 according to the Parquet logical type docs. When the unit is a millisecond value, this needs to call readInt and multiply by 1000.

It looks like both Spark currently gets the underlying Parquet type for milliseconds wrong (which makes sense because this is never used in Spark). We can go ahead and fix this now and share the reader between Internal and Spark.

  private static class TimestampMillisReader extends UnboxedReader<Long> {
    TimestampMillisReader(ColumnDescriptor desc) {
      super(desc);
    }

    @Override
    public Long read(Long ignored) {
      return readLong();
    }

    @Override
    public long readLong() {
      return 1000L * column.nextInteger();
    }
  }

.palantir/revapi.yml Outdated Show resolved Hide resolved
@@ -397,7 +404,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return int96Reader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this call timestampReader(desc, NANOS, ((TimestampType) primitive).isAdjustedToUTC()) instead? The primitive type is available from desc so that method can check and we don't need to have a factory method for INT96.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should assume isAdjustedToUTC to be true? because primitive don't have TimestampType and the reader was using OffsetDateTime,

my understanding is if OffsetDateTime is used it is with timezone and LocalDateTime is without timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Because the reader produces and OffsetDateTime, it should only be used when the expected type is compatible.

@@ -76,6 +67,31 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to return a generic fixed reader here to match the existing behavior. Because this class is public, people may have created subclasses for other object models, and iceberg-parquet is a module that is covered by revapi so we make guarantees about API and behavior stability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class is not widely used, could you also mark it deprecated so that we can make it package-private in 1.9.0?

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it because I can't move the date/time classes to GenericReader in that case. It has to be in this class. But yeah, I got your point about behavior change. I will revert that date/time classes movement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat, why can't the reader classes be moved if this returns an instance? The classes are currently private so there is no direct dependency on them. As long as an instance is returned, wouldn't it work?

return null;
}

protected Optional<ParquetValueReader<?>> dateReader(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should not return Optional:

  • Iceberg generally prefers returning null over using Optional
  • The existing methods (like createStructReader) do not wrap return types in Optional
  • These methods should always return a valid reader, or should throw an exception to signal that something is not supported

return null;
}

protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all return ParquetValueReader<?> rather than constrain the implementation to a subclass of PrimitiveReader.


@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Millisecond time must be stored as an INT32 according to the spec, so I think this should call column.nextLong instead.

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I think this should call column.nextLong instead.

You mean column.nextInt right?

So the existing code is wrong? I didn't added this code newly.
how do I test these? DataTest doesn't handle time units and writers write in micro always?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant nextInteger. Thanks for fixing this.

No need to modify the tests in this PR. It would be nice to have a test for millis, but these were written based on the spec originally and we don't need to block to add a test for existing code.

return Optional.of(new ParquetValueReaders.UUIDReader(desc));
}

private static class StructLikeReader extends StructReader<StructLike, StructLike> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm questioning the use of StructLikeReader vs RecordReader in GenericParquetReaders. They both produce the same implementation class (GenericRecord), and Record is a StructLike. Why not move RecordReader into ParquetValueReaders and reuse the code?

That would also preserve the comment in RecordReader that was removed when copying here, but is still relevant to why there is a template record.

private final GenericRecord template;

StructLikeReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) {
super(types, readers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The superclass doesn't use types so it doesn't need to be passed in. This isn't a big deal if you don't want to fix the superclass in this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It touches Flink and other classes. I will handle in separate PR.

@@ -401,6 +404,17 @@ public ByteBuffer read(ByteBuffer reuse) {
}
}

public static class UUIDReader extends PrimitiveReader<UUID> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than making the classes here public, I would prefer to introduce factory methods like we have for the Avro readers. I think that gives us more flexibility to change implementations and move classes around without worrying because none of them need to be public.


@Override
public void write(int repetitionLevel, UUID value) {
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(UUIDUtil.convert(value)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The byte array passed here is allocated, not reused. It probably should be reused, like the one in Spark's writers.


private InternalWriter() {}

public static ParquetValueWriter<StructLike> buildWriter(MessageType type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better as create as well.

return Optional.of(ParquetValueWriters.uuids(desc));
}

private static class StructLikeWriter extends StructWriter<StructLike> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need both RecordWriter and StructLikeWriter because this can be used to write records. Can you remove RecordWriter and use this instead?

return Optional.empty();
}

protected Optional<ParquetValueWriters.PrimitiveWriter<?>> uuidWriter(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback here is similar to the read path:

  • Should not use Optional
  • Should produce the same writer as before
  • Should deprecate this class so that we can make it package-private
  • Should not need a factory method for UUID

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI just failed for ArrowReaderTest > testReadAll().

Generic writer expects byte[] based UUID handling and internal reader and writer works directly on UUID object. So, we do need a factory method here as we need to supply different kinds of UUID representation.

@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from c543108 to 3bb323b Compare January 17, 2025 14:12
@ajantha-bhat
Copy link
Member Author

@rdblue: Thanks for the review. I have addressed all the comments. Please take another look.

: new TimestampMillisReader(desc);
case NANOS:
if (isAdjustedToUTC) {
return new TimestampInt96Reader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The INT96 timestamp reader is used when the underlying type is INT96, not when the Iceberg or Parquet type has nanosecond precision. Nanosecond precision timestamps were recently introduced and are not used to signal that Parquet stores timestamp as an INT96.

This should match the previous behavior, where this is used when underlying physical type (desc.getPrimitiveType().getPrimitiveTypeName()) is INT96.

LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()),
"Cannot write time in %s, only MICROS is supported",
timeType.getUnit());
return Optional.ofNullable(timeWriter(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this use ofNullable? The intent is not to expose BaseParquetWriter beyond its package, so there is no need to handle cases where the factory method return null. Those are error cases.

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.ofNullable(uuidWriter(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any need to make this more flexible. It makes sense to support the new UUIDWriter to ensure the tests pass with all primitive types, but this doesn't need to introduce a way to override the writer and representation (same for the read side).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic writers handle UUID logical type by retuning default empty(), so that it will be handled as primitive writer of fixed length byte[] here
/~https://github.com/apache/iceberg/blob/5b13760c01bdbb2ab15be072c582adb2a8792f23/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java#L131C1-L132C1

But internal writers needs new UUID based writer. Hence, I added an override to return empty writer for generic types (default) and UUID based writer for internal type.

return new ParquetValueReaders.UUIDReader(desc);
}

public static ParquetValueReader<Long> timestampInt96Reader(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match the other method names, I think this should be int96Timestamps.

return new ParquetValueReaders.TimestampInt96Reader(desc);
}

public static ParquetValueReader<Long> timestampMillisReader(ColumnDescriptor desc) {
Copy link
Contributor

@rdblue rdblue Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably rename this as well, maybe millisAsTimestamps?

@@ -106,6 +114,10 @@ public static PrimitiveWriter<ByteBuffer> byteBuffers(ColumnDescriptor desc) {
return new BytesWriter(desc);
}

public static PrimitiveWriter<ByteBuffer> fixedBuffer(ColumnDescriptor desc) {
Copy link
Contributor

@rdblue rdblue Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Most other methods are plural. How about fixedBuffers?

if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.timestampMillisReader(desc);
} else if (unit == LogicalTypeAnnotation.TimeUnit.NANOS) {
return ParquetValueReaders.timestampInt96Reader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed as well.

}

protected ParquetValueWriters.PrimitiveWriter<?> uuidWriter(ColumnDescriptor desc) {
// Use primitive-type writer; no special writer needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no primitive writer for UUIDs. Maybe this was a copy/paste error?

return new FixedWriter(desc);
}

protected ParquetValueWriters.PrimitiveWriter<?> dateWriter(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should return ParquetValueWriter<?> and not a specific class.

}
}

public static class RecordWriter<T extends StructLike> extends StructWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding a public class, can you add a factory method for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants