Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Add readers and writers for the internal object model #11904

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
Random random, Types.ListType list, Supplier<Object> elementSupplier) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
Expand All @@ -246,23 +246,26 @@ public static List<Object> generateList(
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
result.add(elementSupplier.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
Random random,
Types.MapType map,
Supplier<Object> keySupplier,
Supplier<Object> valueSupplier) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
keyFunc = () -> keySupplier.get().toString();
} else {
keyFunc = keyResult;
keyFunc = keySupplier;
}

Set<Object> keySet = Sets.newHashSet();
Expand All @@ -279,7 +282,7 @@ public static Map<Object, Object> generateMap(
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
result.put(key, valueSupplier.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetReaders<T> {
protected BaseParquetReaders() {}

Expand All @@ -76,6 +80,46 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new FixedReader(desc);
}

protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new DateReader(desc);
}

protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
switch (unit) {
case MICROS:
return new TimeReader(desc);
case MILLIS:
return new TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
}

protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
switch (unit) {
case MICROS:
return isAdjustedToUTC ? new TimestamptzReader(desc) : new TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new TimestamptzMillisReader(desc)
: new TimestampMillisReader(desc);
case NANOS:
if (isAdjustedToUTC) {
return new TimestampInt96Reader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The INT96 timestamp reader is used when the underlying type is INT96, not when the Iceberg or Parquet type has nanosecond precision. Nanosecond precision timestamps were recently introduced and are not used to signal that Parquet stores timestamp as an INT96.

This should match the previous behavior, where this is used when underlying physical type (desc.getPrimitiveType().getPrimitiveTypeName()) is INT96.

} else {
throw new UnsupportedOperationException("Nanos should be adjusted to UTC");
}
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
}

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
}
Expand Down Expand Up @@ -164,37 +208,23 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead:

  protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
  protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
  protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC);

return Optional.of(dateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
return Optional.of(
timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC()));
}

@Override
Expand All @@ -219,6 +249,12 @@ public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(ParquetValueReaders.uuids(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
Expand Down Expand Up @@ -359,7 +395,7 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please point these kinds of changes out for reviewers.

The old version worked because all of the supported logical type annotations had an equivalent ConvertedType (which is what OriginalType is called in Parquet format and the logical type docs).

return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
Expand All @@ -371,7 +407,7 @@ public ParquetValueReader<?> primitive(

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
Expand All @@ -397,7 +433,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand Down Expand Up @@ -497,7 +533,7 @@ private TimeMillisReader(ColumnDescriptor desc) {

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
return LocalTime.ofNanoOfDay(column.nextInteger() * 1000000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetWriter<T> {

@SuppressWarnings("unchecked")
Expand All @@ -50,6 +54,32 @@ protected ParquetValueWriter<T> createWriter(MessageType type) {
protected abstract ParquetValueWriters.StructWriter<T> createStructWriter(
List<ParquetValueWriter<?>> writers);

protected ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc) {
return new FixedWriter(desc);
}

protected ParquetValueWriters.PrimitiveWriter<?> dateWriter(ColumnDescriptor desc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should return ParquetValueWriter<?> and not a specific class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing class LogicalTypeWriterVisitor have declared return values as Optional<ParquetValueWriters.PrimitiveWriter<?>>. So, I need to modify the base class also to return ParquetValueWriter<?>.

I thought I will get a comment that why we modified base class and avoid unnecessary refactoring!
Hence, I didn't do.

Since, you also want to handle this. I will update it.

return new DateWriter(desc);
}

protected ParquetValueWriters.PrimitiveWriter<?> timeWriter(ColumnDescriptor desc) {
return new TimeWriter(desc);
}

protected ParquetValueWriters.PrimitiveWriter<?> timestampWriter(
ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (isAdjustedToUTC) {
return new TimestamptzWriter(desc);
} else {
return new TimestampWriter(desc);
}
}

protected ParquetValueWriters.PrimitiveWriter<?> uuidWriter(ColumnDescriptor desc) {
// Use primitive-type writer; no special writer needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no primitive writer for UUIDs. Maybe this was a copy/paste error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the fixed length primitive writer with byte[] as input.

return null;
}

private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
private final MessageType type;

Expand Down Expand Up @@ -128,7 +158,7 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedWriter(desc);
return fixedWriter(desc);
case BINARY:
return ParquetValueWriters.byteBuffers(desc);
case BOOLEAN:
Expand All @@ -147,7 +177,7 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
}
}

private static class LogicalTypeWriterVisitor
private class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>> {
private final ColumnDescriptor desc;
Expand Down Expand Up @@ -192,13 +222,17 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(new DateWriter(desc));
return Optional.ofNullable(dateWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
Preconditions.checkArgument(
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()),
"Cannot write time in %s, only MICROS is supported",
timeType.getUnit());
return Optional.ofNullable(timeWriter(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this use ofNullable? The intent is not to expose BaseParquetWriter beyond its package, so there is no need to handle cases where the factory method return null. Those are error cases.

}

@Override
Expand All @@ -208,11 +242,7 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
if (timestampType.isAdjustedToUTC()) {
return Optional.of(new TimestamptzWriter(desc));
} else {
return Optional.of(new TimestampWriter(desc));
}
return Optional.ofNullable(timestampWriter(desc, timestampType.isAdjustedToUTC()));
}

@Override
Expand All @@ -239,6 +269,12 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.ofNullable(uuidWriter(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any need to make this more flexible. It makes sense to support the new UUIDWriter to ensure the tests pass with all primitive types, but this doesn't need to introduce a way to override the writer and representation (same for the read side).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic writers handle UUID logical type by retuning default empty(), so that it will be handled as primitive writer of fixed length byte[] here
/~https://github.com/apache/iceberg/blob/5b13760c01bdbb2ab15be072c582adb2a8792f23/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java#L131C1-L132C1

But internal writers needs new UUID based writer. Hence, I added an override to return empty writer for generic types (default) and UUID based writer for internal type.

}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
Expand Down Expand Up @@ -291,12 +327,20 @@ public void write(int repetitionLevel, OffsetDateTime value) {
}

private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
private final int length;

private FixedWriter(ColumnDescriptor desc) {
super(desc);
this.length = desc.getPrimitiveType().getTypeLength();
}

@Override
public void write(int repetitionLevel, byte[] value) {
Preconditions.checkArgument(
value.length == length,
"Cannot write byte buffer of length %s as fixed[%s]",
value.length,
length);
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericDataUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
Expand All @@ -49,47 +48,11 @@ public static ParquetValueReader<Record> buildReader(
@Override
protected ParquetValueReader<Record> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return new RecordReader(types, fieldReaders, structType);
return new ParquetValueReaders.RecordReader<>(types, fieldReaders, structType);
}

@Override
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return GenericDataUtil.internalToGeneric(type, value);
}

private static class RecordReader extends StructReader<Record, Record> {
private final GenericRecord template;

RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) {
super(types, readers);
this.template = struct != null ? GenericRecord.create(struct) : null;
}

@Override
protected Record newStructData(Record reuse) {
if (reuse != null) {
return reuse;
} else {
// GenericRecord.copy() is more performant then GenericRecord.create(StructType) since
// NAME_MAP_CACHE access
// is eliminated. Using copy here to gain performance.
return template.copy();
}
}

@Override
protected Object getField(Record intermediate, int pos) {
return intermediate.get(pos);
}

@Override
protected Record buildStruct(Record struct) {
return struct;
}

@Override
protected void set(Record struct, int pos, Object value) {
struct.set(pos, value);
}
}
}
Loading