Skip to content

Commit

Permalink
Address new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 17, 2025
1 parent 3eaf3bc commit c543108
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 487 deletions.
15 changes: 9 additions & 6 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elements) {
Random random, Types.ListType list, Supplier<Object> elementSupplier) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
Expand All @@ -246,23 +246,26 @@ public static List<Object> generateList(
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elements.get());
result.add(elementSupplier.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> values) {
Random random,
Types.MapType map,
Supplier<Object> keySupplier,
Supplier<Object> valueSupplier) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
keyFunc = () -> keySupplier.get().toString();
} else {
keyFunc = keyResult;
keyFunc = keySupplier;
}

Set<Object> keySet = Sets.newHashSet();
Expand All @@ -279,7 +282,7 @@ public static Map<Object, Object> generateMap(
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, values.get());
result.put(key, valueSupplier.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand All @@ -36,11 +46,14 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetReaders<T> {
protected BaseParquetReaders() {}

Expand All @@ -67,29 +80,44 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) {
return null;
}

protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) {
return null;
}

protected Optional<ParquetValueReader<?>> dateReader(ColumnDescriptor desc) {
return Optional.empty();
protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new FixedReader(desc);
}

protected Optional<ParquetValueReader<?>> timeReader(ColumnDescriptor desc, TimeUnit unit) {
return Optional.empty();
protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new DateReader(desc);
}

protected Optional<ParquetValueReader<?>> timestampReader(
ColumnDescriptor desc, TimeUnit unit, boolean isAdjustedToUTC) {
return Optional.empty();
protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
switch (unit) {
case MICROS:
return new TimeReader(desc);
case MILLIS:
return new TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
}

protected Optional<ParquetValueReader<?>> uuidReader(ColumnDescriptor desc) {
return Optional.empty();
protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
switch (unit) {
case MICROS:
return isAdjustedToUTC ? new TimestamptzReader(desc) : new TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new TimestamptzMillisReader(desc)
: new TimestampMillisReader(desc);
case NANOS:
if (isAdjustedToUTC) {
return new TimestampInt96Reader(desc);
} else {
throw new UnsupportedOperationException("Nanos should be adjusted to UTC");
}
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
}

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
Expand Down Expand Up @@ -180,22 +208,23 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return dateReader(desc);
return Optional.of(dateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
return timeReader(desc, timeLogicalType.getUnit());
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
return timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC());
return Optional.of(
timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC()));
}

@Override
Expand Down Expand Up @@ -224,7 +253,7 @@ public Optional<ParquetValueReader<?>> visit(
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return uuidReader(desc);
return Optional.of(ParquetValueReaders.uuids(desc));
}
}

Expand Down Expand Up @@ -404,7 +433,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return int96Reader(desc);
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand All @@ -414,4 +443,124 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading

0 comments on commit c543108

Please sign in to comment.