Skip to content

Commit

Permalink
Fix incorrect updating fileRowCount when skipping rowGroup
Browse files Browse the repository at this point in the history
When filtering row groups based on split boundaries in `ParquetMetadata.getBlocks()`,
we need to maintain an accurate cumulative row count even for skipped row groups
  • Loading branch information
chenjian2664 committed Feb 28, 2025
1 parent 3b3f15d commit 60553c3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.util.List;

public record BlockMetadata(long rowCount, List<ColumnChunkMetadata> columns)
public record BlockMetadata(long rowStart, long rowCount, List<ColumnChunkMetadata> columns)
{
public long getStartingPos()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,13 @@ public List<BlockMetadata> getBlocks(long splitStart, long splitLength)
MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = parquetMetadata.getRow_groups();

long fileRowCount = 0;

if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
fileRowCount += rowGroup.getNum_rows(); // Update fileRowCount for all row groups

if (rowGroup.isSetFile_offset()) {
long rowGroupStart = rowGroup.getFile_offset();
boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength;
Expand Down Expand Up @@ -146,7 +151,7 @@ public List<BlockMetadata> getBlocks(long splitStart, long splitLength)
column.setBloomFilterOffset(metaData.bloom_filter_offset);
columnMetadataBuilder.add(column);
}
blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build()));
blocks.add(new BlockMetadata(fileRowCount - rowGroup.getNum_rows(), rowGroup.getNum_rows(), columnMetadataBuilder.build()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public static List<RowGroupInfo> getFilteredRowGroups(
ParquetReaderOptions options)
throws IOException
{
long fileRowCount = 0;
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) {
long blockStart = block.getStartingPos();
Expand All @@ -215,12 +214,11 @@ public static List<RowGroupInfo> getFilteredRowGroups(
bloomFilterStore,
timeZone,
domainCompactionThreshold)) {
rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, fileRowCount, columnIndex));
rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, block.rowStart(), columnIndex));
break;
}
}
}
fileRowCount += block.rowCount();
}
return rowGroupInfoBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.RowGroup;
Expand All @@ -57,6 +58,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -303,6 +305,45 @@ public void testColumnReordering()
}
}

@Test
public void testReadRowStart()
throws IOException
{
List<String> columnNames = ImmutableList.of("column");
List<Type> types = ImmutableList.of(INTEGER);

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(1000))
.build(),
types,
columnNames,
generateInputPages(types, 100, 10)),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
FileMetaData metaData = parquetMetadata.getParquetMetadata();
List<RowGroup> rowGroups = metaData.getRow_groups();

// we want to test more 1 row groups
assertThat(rowGroups.size()).isGreaterThan(1);

long rowStart = 0;
List<Long> rowStarts = new ArrayList<>();
List<Long> splitStarts = new ArrayList<>();
for (RowGroup rowGroup : rowGroups) {
rowStarts.add(rowStart);
splitStarts.add(rowGroup.getFile_offset());
rowStart += rowGroup.getNum_rows();
}

for (int i = 0; i < rowGroups.size(); i++) {
List<BlockMetadata> blocks = parquetMetadata.getBlocks(splitStarts.get(i), 10);
assertThat(getOnlyElement(blocks).rowStart()).isEqualTo(rowStarts.get(i));
}
}

@Test
public void testWriterMemoryAccounting()
throws IOException
Expand Down

0 comments on commit 60553c3

Please sign in to comment.