Skip to content

Commit

Permalink
Check ORC sorting using file data
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks authored and raunaqmorarka committed Feb 21, 2025
1 parent d716b27 commit c1a031d
Showing 1 changed file with 41 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.iceberg;

import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
Expand All @@ -24,12 +23,13 @@
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.HiveMetastoreFactory;
import io.trino.metastore.cache.CachingHiveMetastore;
import io.trino.orc.OrcColumn;
import io.trino.orc.OrcDataSource;
import io.trino.orc.OrcPredicate;
import io.trino.orc.OrcReader;
import io.trino.orc.OrcReaderOptions;
import io.trino.orc.metadata.OrcColumnId;
import io.trino.orc.metadata.statistics.StringStatistics;
import io.trino.orc.metadata.statistics.StripeStatistics;
import io.trino.orc.OrcRecordReader;
import io.trino.orc.metadata.OrcType;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
Expand All @@ -43,9 +43,12 @@
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.Type;
import io.trino.testing.QueryRunner;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableMetadata;
Expand All @@ -60,16 +63,20 @@
import java.util.function.Supplier;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.metastore.cache.CachingHiveMetastore.createPerTransactionCache;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.fromFilePath;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static org.joda.time.DateTimeZone.UTC;

public final class IcebergTestUtils
{
Expand Down Expand Up @@ -101,40 +108,45 @@ private static boolean checkOrcFileSorting(Supplier<OrcDataSource> dataSourceSup
OrcReaderOptions readerOptions = new OrcReaderOptions();
try (OrcDataSource dataSource = dataSourceSupplier.get()) {
OrcReader orcReader = OrcReader.createOrcReader(dataSource, readerOptions).orElseThrow();
String previousMax = null;
OrcColumnId sortColumnId = orcReader.getRootColumn().getNestedColumns().stream()
OrcColumn sortColumn = orcReader.getRootColumn().getNestedColumns().stream()
.filter(column -> column.getColumnName().equals(sortColumnName))
.collect(onlyElement())
.getColumnId();
List<StripeStatistics> statistics = orcReader.getMetadata().getStripeStatsList().stream()
.map(Optional::orElseThrow)
.collect(toImmutableList());
verify(statistics.size() > 1, "Test must produce at least two row groups");

for (StripeStatistics stripeStatistics : statistics) {
// TODO: This only works if the sort column is a String
StringStatistics columnStatistics = stripeStatistics.getColumnStatistics().get(sortColumnId).getStringStatistics();

Slice minValue = columnStatistics.getMin();
Slice maxValue = columnStatistics.getMax();
if (minValue == null || maxValue == null) {
throw new IllegalStateException("ORC files must produce min/max stripe statistics");
}

if (previousMax != null && previousMax.compareTo(minValue.toStringUtf8()) > 0) {
return false;
.collect(onlyElement());
Type sortColumnType = getType(sortColumn.getColumnType().getOrcTypeKind());
try (OrcRecordReader recordReader = orcReader.createRecordReader(
List.of(sortColumn),
List.of(sortColumnType),
OrcPredicate.TRUE,
UTC,
newSimpleAggregatedMemoryContext(),
INITIAL_BATCH_SIZE,
RuntimeException::new)) {
Comparable<Object> previousMax = null;
for (Page page = recordReader.nextPage(); page != null; page = recordReader.nextPage()) {
Block block = page.getLoadedPage().getBlock(0);
for (int position = 0; position < block.getPositionCount(); position++) {
Comparable<Object> current = (Comparable<Object>) readNativeValue(sortColumnType, block, position);
if (previousMax != null && previousMax.compareTo(current) > 0) {
return false;
}
previousMax = current;
}
}

previousMax = maxValue.toStringUtf8();
}

return true;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Type getType(OrcType.OrcTypeKind orcTypeKind)
{
return switch (orcTypeKind) {
case OrcType.OrcTypeKind.STRING, OrcType.OrcTypeKind.VARCHAR -> VARCHAR;
default -> throw new IllegalArgumentException("Unsupported orc type: " + orcTypeKind);
};
}

@SuppressWarnings({"unchecked", "rawtypes"})
public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName)
{
Expand Down

0 comments on commit c1a031d

Please sign in to comment.