Skip to content

Commit

Permalink
[columnar] Columnar vacuum (#51)
Browse files Browse the repository at this point in the history
* [columnar] Columnar vacuum

* Vacuum columnar tables by combining last n stripes until stripe row
  max count is reached. Vacuuming decision also include information
  about number of deleted rows of each stripe.
  If there is only one stripe, vacuum will be done if percentage of
  deleted rows is higher than 20%.

* Updated regression test to cover vacuum

* [columnar] Autovacuum for columnar tables

* Add pgstat for insert/delete/update table hooks so autovacuum process
  know if vacuum should be done.

---------

Co-authored-by: mkaruza <mkaruza@users.noreply.github.com>
  • Loading branch information
JerrySievert and mkaruza authored Mar 21, 2023
1 parent 4f5b508 commit 601a553
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 39 deletions.
4 changes: 2 additions & 2 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ static Cost
ColumnarPerStripeScanCost(RelOptInfo *rel, Oid relationId, int numberOfColumnsRead)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
RelationClose(relation);

uint32 maxColumnCount = 0;
Expand Down Expand Up @@ -1584,7 +1584,7 @@ static uint64
ColumnarTableStripeCount(Oid relationId)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
int stripeCount = list_length(stripeList);
RelationClose(relation);

Expand Down
166 changes: 155 additions & 11 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
uint64 *highestUsedId);
static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
bool *update, Datum *newValues);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot,
ScanDirection scanDirection);
static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
HeapTuple heapTuple);
static void ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe,
Expand All @@ -112,12 +113,19 @@ static Oid ColumnarRowMaskSeqId(void);
static Oid ColumnarChunkIndexRelationId(void);
static Oid ColumnarChunkGroupIndexRelationId(void);
static Oid ColumnarRowMaskIndexRelationId(void);
static Oid ColumnarRowMaskStripeIndexRelationId(void);
static Oid ColumnarNamespaceId(void);
static uint64 GetHighestUsedRowNumber(uint64 storageId);
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
Oid storageIdIndexId,
uint64 storageId);
static void DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
AttrNumber stripeIdAttrNumber,
Oid storageIdIndexId,
uint64 storageId,
uint64 stripeId);
static ModifyState * StartModifyRelation(Relation rel);
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
bool *nulls);
Expand Down Expand Up @@ -1327,8 +1335,9 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)


/*
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups and
* deleted rows count for each chunk group for given stripe.
* Arrays that are updated will be allocated in this function.
*/
static void
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
Expand Down Expand Up @@ -1394,8 +1403,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,


/*
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
* UpdateChunkGroupDeletedRows updates `deleted_rows` column for each chunk group.
*/
void
UpdateChunkGroupDeletedRows(uint64 storageId, uint64 stripe,
Expand Down Expand Up @@ -1499,11 +1507,42 @@ InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCou
* of the given relfilenode.
*/
List *
StripesForRelfilenode(RelFileNode relfilenode)
StripesForRelfilenode(RelFileNode relfilenode, ScanDirection scanDirection)
{
uint64 storageId = LookupStorageId(relfilenode);

return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
return ReadDataFileStripeList(storageId, GetTransactionSnapshot(), scanDirection);
}


/*
* DeletedRowsForStripe returns number of deleted rows for stripe
* of the given relfilenode.
*/
uint32
DeletedRowsForStripe(RelFileNode relfilenode, uint32 chunkCount, uint64 stripeId)
{
uint64 storageId = LookupStorageId(relfilenode);

uint32 *chunkGroupRowCounts;
uint32 *chunkGroupDeletedRows;
int i;

uint32 deletedRows = 0;

ReadChunkGroupRowCounts(storageId, stripeId, chunkCount,
&chunkGroupRowCounts, &chunkGroupDeletedRows,
GetTransactionSnapshot());

for (i = 0; i < chunkCount; i++)
{
deletedRows += chunkGroupDeletedRows[i];
}

pfree(chunkGroupRowCounts);
pfree(chunkGroupDeletedRows);

return deletedRows;
}


Expand Down Expand Up @@ -1542,7 +1581,8 @@ GetHighestUsedAddressAndId(uint64 storageId,
SnapshotData SnapshotDirty;
InitDirtySnapshot(SnapshotDirty);

List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty);
List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty,
ForwardScanDirection);

*highestUsedId = 0;

Expand Down Expand Up @@ -1695,7 +1735,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
* in the given snapshot.
*/
static List *
ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
ReadDataFileStripeList(uint64 storageId, Snapshot snapshot, ScanDirection scanDirection)
{
List *stripeMetadataList = NIL;
ScanKeyData scanKey[1];
Expand All @@ -1715,7 +1755,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
scanKey);

while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
scanDirection)))
{
StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
Expand Down Expand Up @@ -1817,6 +1857,51 @@ DeleteMetadataRows(RelFileNode relfilenode)
}


/*
* DeleteMetadataRowsForStripeId removes the rows with given relfilenode and
* stripe id from columnar metadata tables.
*/
void
DeleteMetadataRowsForStripeId(RelFileNode relfilenode, uint64 stripeId)
{
/*
* During a restore for binary upgrade, metadata tables and indexes may or
* may not exist.
*/
if (IsBinaryUpgrade)
{
return;
}

uint64 storageId = LookupStorageId(relfilenode);

DeleteStripeFromColumnarMetadataTable(
ColumnarStripeRelationId(),
Anum_columnar_stripe_storageid,
Anum_columnar_stripe_stripe,
ColumnarStripePKeyIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarChunkGroupRelationId(),
Anum_columnar_chunkgroup_storageid,
Anum_columnar_chunkgroup_stripe,
ColumnarChunkGroupIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarChunkRelationId(),
Anum_columnar_chunk_storageid,
Anum_columnar_chunk_stripe,
ColumnarChunkIndexRelationId(),
storageId, stripeId);
DeleteStripeFromColumnarMetadataTable(
ColumnarRowMaskRelationId(),
Anum_columnar_row_mask_storage_id,
Anum_columnar_row_mask_stripe_id,
ColumnarRowMaskStripeIndexRelationId(),
storageId, stripeId);
}


/*
* DeleteStorageFromColumnarMetadataTable removes the rows with given
* storageId from given columnar metadata table.
Expand Down Expand Up @@ -1860,6 +1945,54 @@ DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
}


/*
* DeleteStripeFromColumnarMetadataTable removes the rows in columnar
* metadata table that match storageId and stripeId.
*/
static void
DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
AttrNumber stripeIdAttrNumber,
Oid storageIdIndexId,
uint64 storageId,
uint64 stripeId)
{
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[0], stripeIdAttrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(stripeId));

Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
if (metadataTable == NULL)
{
/* extension has been dropped */
return;
}

Relation index = index_open(storageIdIndexId, AccessShareLock);

SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1, scanKey);

ModifyState *modifyState = StartModifyRelation(metadataTable);

HeapTuple heapTuple;
while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
{
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
}

systable_endscan_ordered(scanDescriptor);

FinishModifyRelation(modifyState);

index_close(index, AccessShareLock);
table_close(metadataTable, AccessShareLock);
}


/*
* StartModifyRelation allocates resources for modifications.
*/
Expand Down Expand Up @@ -2182,6 +2315,16 @@ ColumnarRowMaskIndexRelationId(void)
return get_relname_relid("row_mask_pkey", ColumnarNamespaceId());
}

/*
* ColumnarRowMaskStripeIndexRelationId returns relation id
* of columnar.columnar_row_mask_stripe_unique
*/
static Oid
ColumnarRowMaskStripeIndexRelationId(void)
{
return get_relname_relid("row_mask_stripe_unique", ColumnarNamespaceId());
}


/*
* ColumnarNamespaceId returns namespace id of the schema we store columnar
Expand Down Expand Up @@ -2367,7 +2510,8 @@ GetHighestUsedRowNumber(uint64 storageId)
uint64 highestRowNumber = COLUMNAR_INVALID_ROW_NUMBER;

List *stripeMetadataList = ReadDataFileStripeList(storageId,
GetTransactionSnapshot());
GetTransactionSnapshot(),
ForwardScanDirection);
StripeMetadata *stripeMetadata = NULL;
foreach_ptr(stripeMetadata, stripeMetadataList)
{
Expand Down
55 changes: 54 additions & 1 deletion columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,59 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
return ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
}

/*
* ColumnaSetStripeReadState
*/
bool
ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
{
if (!ColumnarReadIsCurrentStripe(readState, startStripeMetadata->firstRowNumber))
{
Relation columnarRelation = readState->relation;
Snapshot snapshot = readState->snapshot;
StripeMetadata *stripeMetadata =
FindStripeByRowNumber(columnarRelation,
startStripeMetadata->firstRowNumber, snapshot);
if (stripeMetadata == NULL)
{
/* no such row exists */
return false;
}

if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED)
{
/*
* Callers are expected to skip stripes that are not flushed to
* disk yet or should wait for the writer xact to commit or abort,
* but let's be on the safe side.
*/
ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
RelationGetRelationName(columnarRelation),
stripeMetadata->id)));
}

/* do the cleanup before reading a new stripe */
ColumnarResetRead(readState);

TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
List *whereClauseList = NIL;
List *whereClauseVars = NIL;
MemoryContext stripeReadContext = readState->stripeReadContext;
readState->stripeReadState = BeginStripeRead(stripeMetadata,
columnarRelation,
relationTupleDesc,
readState->projectedColumnList,
whereClauseList,
whereClauseVars,
stripeReadContext,
snapshot);

readState->currentStripeMetadata = stripeMetadata;
}

return true;
}

/*
* ColumnarReadIsCurrentStripe returns true if stripe being read contains
Expand Down Expand Up @@ -1132,7 +1185,7 @@ ColumnarTableRowCount(Relation relation)
{
ListCell *stripeMetadataCell = NULL;
uint64 totalRowCount = 0;
List *stripeList = StripesForRelfilenode(relation->rd_node);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);

foreach(stripeMetadataCell, stripeList)
{
Expand Down
Loading

0 comments on commit 601a553

Please sign in to comment.