Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Add readers and writers for the internal object model #11904

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
Random random, Types.ListType list, Supplier<Object> elements) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
Expand All @@ -246,15 +246,15 @@ public static List<Object> generateList(
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
result.add(elements.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> values) {
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Expand All @@ -279,7 +279,7 @@ public static Map<Object, Object> generateMap(
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
result.put(key, values.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.Files;
import org.apache.iceberg.InternalTestHelpers;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RandomInternalData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class TestInternalData extends DataTest {
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
rdblue marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected void writeAndValidate(Schema schema) throws IOException {
writeAndValidate(schema, schema);
}

@Override
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
List<StructLike> expected = RandomInternalData.generate(writeSchema, 100, 42L);
rdblue marked this conversation as resolved.
Show resolved Hide resolved

File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

OutputFile outputFile = Files.localOutput(testFile);
rdblue marked this conversation as resolved.
Show resolved Hide resolved

try (DataWriter<StructLike> dataWriter =
Parquet.writeData(outputFile)
.schema(writeSchema)
.createWriterFunc(InternalWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build()) {
for (StructLike record : expected) {
dataWriter.write(record);
}
}

List<StructLike> rows;
try (CloseableIterable<StructLike> reader =
Parquet.read(Files.localInput(testFile))
.project(expectedSchema)
.createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema))
.build()) {
rows = Lists.newArrayList(reader);
}

for (int i = 0; i < expected.size(); i += 1) {
InternalTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i));
}

// test reuseContainers
try (CloseableIterable<StructLike> reader =
Parquet.read(Files.localInput(testFile))
.project(expectedSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema))
.build()) {
int index = 0;
for (StructLike actualRecord : reader) {
InternalTestHelpers.assertEquals(
expectedSchema.asStruct(), expected.get(index), actualRecord);
index += 1;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand All @@ -46,6 +36,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -76,6 +67,31 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to return a generic fixed reader here to match the existing behavior. Because this class is public, people may have created subclasses for other object models, and iceberg-parquet is a module that is covered by revapi so we make guarantees about API and behavior stability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class is not widely used, could you also mark it deprecated so that we can make it package-private in 1.9.0?

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it because I can't move the date/time classes to GenericReader in that case. It has to be in this class. But yeah, I got your point about behavior change. I will revert that date/time classes movement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat, why can't the reader classes be moved if this returns an instance? The classes are currently private so there is no direct dependency on them. As long as an instance is returned, wouldn't it work?

}

protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all return ParquetValueReader<?> rather than constrain the implementation to a subclass of PrimitiveReader.

return null;
}

protected Optional<ParquetValueReader<?>> dateReader(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should not return Optional:

  • Iceberg generally prefers returning null over using Optional
  • The existing methods (like createStructReader) do not wrap return types in Optional
  • These methods should always return a valid reader, or should throw an exception to signal that something is not supported

return Optional.empty();
}

protected Optional<ParquetValueReader<?>> timeReader(ColumnDescriptor desc, TimeUnit unit) {
return Optional.empty();
}

protected Optional<ParquetValueReader<?>> timestampReader(
ColumnDescriptor desc, TimeUnit unit, boolean isAdjustedToUTC) {
return Optional.empty();
}

protected Optional<ParquetValueReader<?>> uuidReader(ColumnDescriptor desc) {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
}
Expand Down Expand Up @@ -164,37 +180,22 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead:

  protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
  protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
  protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC);

return dateReader(desc);
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
return timeReader(desc, timeLogicalType.getUnit());
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
return timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC());
}

@Override
Expand All @@ -219,6 +220,12 @@ public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return uuidReader(desc);
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
Expand Down Expand Up @@ -359,7 +366,7 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please point these kinds of changes out for reviewers.

The old version worked because all of the supported logical type annotations had an equivalent ConvertedType (which is what OriginalType is called in Parquet format and the logical type docs).

return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
Expand All @@ -371,7 +378,7 @@ public ParquetValueReader<?> primitive(

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
Expand All @@ -397,7 +404,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return int96Reader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this call timestampReader(desc, NANOS, ((TimestampType) primitive).isAdjustedToUTC()) instead? The primitive type is available from desc so that method can check and we don't need to have a factory method for INT96.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should assume isAdjustedToUTC to be true? because primitive don't have TimestampType and the reader was using OffsetDateTime,

my understanding is if OffsetDateTime is used it is with timezone and LocalDateTime is without timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Because the reader produces and OffsetDateTime, it should only be used when the expected type is compatible.

default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand All @@ -407,124 +414,4 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading