Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[columnar] Columnar vacuum #51

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ static Cost
ColumnarPerStripeScanCost(RelOptInfo *rel, Oid relationId, int numberOfColumnsRead)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
RelationClose(relation);

uint32 maxColumnCount = 0;
Expand Down Expand Up @@ -1584,7 +1584,7 @@ static uint64
ColumnarTableStripeCount(Oid relationId)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
int stripeCount = list_length(stripeList);
RelationClose(relation);

Expand Down
166 changes: 155 additions & 11 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
uint64 *highestUsedId);
static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
bool *update, Datum *newValues);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot,
ScanDirection scanDirection);
static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
HeapTuple heapTuple);
static void ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe,
Expand All @@ -112,12 +113,19 @@ static Oid ColumnarRowMaskSeqId(void);
static Oid ColumnarChunkIndexRelationId(void);
static Oid ColumnarChunkGroupIndexRelationId(void);
static Oid ColumnarRowMaskIndexRelationId(void);
static Oid ColumnarRowMaskStripeIndexRelationId(void);
static Oid ColumnarNamespaceId(void);
static uint64 GetHighestUsedRowNumber(uint64 storageId);
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
Oid storageIdIndexId,
uint64 storageId);
static void DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
AttrNumber stripeIdAttrNumber,
Oid storageIdIndexId,
uint64 storageId,
uint64 stripeId);
static ModifyState * StartModifyRelation(Relation rel);
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
bool *nulls);
Expand Down Expand Up @@ -1327,8 +1335,9 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)


/*
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups and
* deleted rows count for each chunk group for given stripe.
* Arrays that are updated will be allocated in this function.
*/
static void
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
Expand Down Expand Up @@ -1394,8 +1403,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,


/*
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
* UpdateChunkGroupDeletedRows updates `deleted_rows` column for each chunk group.
*/
void
UpdateChunkGroupDeletedRows(uint64 storageId, uint64 stripe,
Expand Down Expand Up @@ -1499,11 +1507,42 @@ InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCou
* of the given relfilenode.
*/
List *
StripesForRelfilenode(RelFileNode relfilenode)
StripesForRelfilenode(RelFileNode relfilenode, ScanDirection scanDirection)
{
uint64 storageId = LookupStorageId(relfilenode);

return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
return ReadDataFileStripeList(storageId, GetTransactionSnapshot(), scanDirection);
}


/*
* DeletedRowsForStripe returns number of deleted rows for stripe
* of the given relfilenode.
*/
uint32
DeletedRowsForStripe(RelFileNode relfilenode, uint32 chunkCount, uint64 stripeId)
{
uint64 storageId = LookupStorageId(relfilenode);

uint32 *chunkGroupRowCounts;
uint32 *chunkGroupDeletedRows;
int i;

uint32 deletedRows = 0;

ReadChunkGroupRowCounts(storageId, stripeId, chunkCount,
&chunkGroupRowCounts, &chunkGroupDeletedRows,
GetTransactionSnapshot());

for (i = 0; i < chunkCount; i++)
{
deletedRows += chunkGroupDeletedRows[i];
}

pfree(chunkGroupRowCounts);
pfree(chunkGroupDeletedRows);

return deletedRows;
}


Expand Down Expand Up @@ -1542,7 +1581,8 @@ GetHighestUsedAddressAndId(uint64 storageId,
SnapshotData SnapshotDirty;
InitDirtySnapshot(SnapshotDirty);

List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty);
List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty,
ForwardScanDirection);

*highestUsedId = 0;

Expand Down Expand Up @@ -1695,7 +1735,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
* in the given snapshot.
*/
static List *
ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
ReadDataFileStripeList(uint64 storageId, Snapshot snapshot, ScanDirection scanDirection)
{
List *stripeMetadataList = NIL;
ScanKeyData scanKey[1];
Expand All @@ -1715,7 +1755,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
scanKey);

while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
scanDirection)))
{
StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
Expand Down Expand Up @@ -1817,6 +1857,51 @@ DeleteMetadataRows(RelFileNode relfilenode)
}


/*
* DeleteMetadataRowsForStripeId removes the rows with given relfilenode and
* stripe id from columnar metadata tables.
*/
void
DeleteMetadataRowsForStripeId(RelFileNode relfilenode, uint64 stripeId)
{
/*
* During a restore for binary upgrade, metadata tables and indexes may or
* may not exist.
*/
if (IsBinaryUpgrade)
{
return;
}

uint64 storageId = LookupStorageId(relfilenode);

DeleteStripeFromColumnarMetadataTable(
ColumnarStripeRelationId(),
Anum_columnar_stripe_storageid,
Anum_columnar_stripe_stripe,
ColumnarStripePKeyIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarChunkGroupRelationId(),
Anum_columnar_chunkgroup_storageid,
Anum_columnar_chunkgroup_stripe,
ColumnarChunkGroupIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarChunkRelationId(),
Anum_columnar_chunk_storageid,
Anum_columnar_chunk_stripe,
ColumnarChunkIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarRowMaskRelationId(),
Anum_columnar_row_mask_storage_id,
Anum_columnar_row_mask_stripe_id,
ColumnarRowMaskStripeIndexRelationId(),
storageId, stripeId);
}


/*
* DeleteStorageFromColumnarMetadataTable removes the rows with given
* storageId from given columnar metadata table.
Expand Down Expand Up @@ -1860,6 +1945,54 @@ DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
}


/*
* DeleteStripeFromColumnarMetadataTable removes the rows in columnar
* metadata table that match storageId and stripeId.
*/
static void
DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
AttrNumber stripeIdAttrNumber,
Oid storageIdIndexId,
uint64 storageId,
uint64 stripeId)
{
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[0], stripeIdAttrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(stripeId));

Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
if (metadataTable == NULL)
{
/* extension has been dropped */
return;
}

Relation index = index_open(storageIdIndexId, AccessShareLock);

SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1, scanKey);

ModifyState *modifyState = StartModifyRelation(metadataTable);

HeapTuple heapTuple;
while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
{
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
}

systable_endscan_ordered(scanDescriptor);

FinishModifyRelation(modifyState);

index_close(index, AccessShareLock);
table_close(metadataTable, AccessShareLock);
}


/*
* StartModifyRelation allocates resources for modifications.
*/
Expand Down Expand Up @@ -2182,6 +2315,16 @@ ColumnarRowMaskIndexRelationId(void)
return get_relname_relid("row_mask_pkey", ColumnarNamespaceId());
}

/*
* ColumnarRowMaskStripeIndexRelationId returns relation id
* of columnar.columnar_row_mask_stripe_unique
*/
static Oid
ColumnarRowMaskStripeIndexRelationId(void)
{
return get_relname_relid("row_mask_stripe_unique", ColumnarNamespaceId());
}


/*
* ColumnarNamespaceId returns namespace id of the schema we store columnar
Expand Down Expand Up @@ -2367,7 +2510,8 @@ GetHighestUsedRowNumber(uint64 storageId)
uint64 highestRowNumber = COLUMNAR_INVALID_ROW_NUMBER;

List *stripeMetadataList = ReadDataFileStripeList(storageId,
GetTransactionSnapshot());
GetTransactionSnapshot(),
ForwardScanDirection);
StripeMetadata *stripeMetadata = NULL;
foreach_ptr(stripeMetadata, stripeMetadataList)
{
Expand Down
55 changes: 54 additions & 1 deletion columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,59 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
return ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
}

/*
* ColumnaSetStripeReadState
*/
bool
ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
{
if (!ColumnarReadIsCurrentStripe(readState, startStripeMetadata->firstRowNumber))
{
Relation columnarRelation = readState->relation;
Snapshot snapshot = readState->snapshot;
StripeMetadata *stripeMetadata =
FindStripeByRowNumber(columnarRelation,
startStripeMetadata->firstRowNumber, snapshot);
if (stripeMetadata == NULL)
{
/* no such row exists */
return false;
}

if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED)
{
/*
* Callers are expected to skip stripes that are not flushed to
* disk yet or should wait for the writer xact to commit or abort,
* but let's be on the safe side.
*/
ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
RelationGetRelationName(columnarRelation),
stripeMetadata->id)));
}

/* do the cleanup before reading a new stripe */
ColumnarResetRead(readState);

TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
List *whereClauseList = NIL;
List *whereClauseVars = NIL;
MemoryContext stripeReadContext = readState->stripeReadContext;
readState->stripeReadState = BeginStripeRead(stripeMetadata,
columnarRelation,
relationTupleDesc,
readState->projectedColumnList,
whereClauseList,
whereClauseVars,
stripeReadContext,
snapshot);

readState->currentStripeMetadata = stripeMetadata;
}

return true;
}

/*
* ColumnarReadIsCurrentStripe returns true if stripe being read contains
Expand Down Expand Up @@ -1132,7 +1185,7 @@ ColumnarTableRowCount(Relation relation)
{
ListCell *stripeMetadataCell = NULL;
uint64 totalRowCount = 0;
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);

foreach(stripeMetadataCell, stripeList)
{
Expand Down
Loading