Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[columnar] Running simple Vacuum after INSERT in a columnar table mak… #74

Merged
merged 1 commit into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
}

FinishModifyRelation(modifyState);
table_close(columnarChunkGroup, NoLock);
table_close(columnarChunkGroup, RowExclusiveLock);
}


Expand Down Expand Up @@ -663,7 +663,7 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId,
}

FinishModifyRelation(modifyState);
table_close(columnarRowMask, NoLock);
table_close(columnarRowMask, RowExclusiveLock);

return chunkInserted;
}
Expand Down Expand Up @@ -1960,20 +1960,19 @@ DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[0], stripeIdAttrNumber, BTEqualStrategyNumber,
ScanKeyInit(&scanKey[1], stripeIdAttrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(stripeId));

Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
Relation metadataTable = try_relation_open(metadataTableId, RowShareLock);
if (metadataTable == NULL)
{
/* extension has been dropped */
return;
}

Relation index = index_open(storageIdIndexId, AccessShareLock);

Relation index = index_open(storageIdIndexId, RowShareLock);
SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1, scanKey);
2, scanKey);

ModifyState *modifyState = StartModifyRelation(metadataTable);

Expand All @@ -1988,8 +1987,8 @@ DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,

FinishModifyRelation(modifyState);

index_close(index, AccessShareLock);
table_close(metadataTable, AccessShareLock);
index_close(index, RowShareLock);
table_close(metadataTable, RowShareLock);
}


Expand Down
6 changes: 3 additions & 3 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,11 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
}

/*
* ColumnaSetStripeReadState
* ColumnarSetStripeReadState
*/
bool
ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
ColumnarSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
{
if (!ColumnarReadIsCurrentStripe(readState, startStripeMetadata->firstRowNumber))
{
Expand Down
41 changes: 33 additions & 8 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
Expand Down Expand Up @@ -1200,30 +1201,52 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)
}
}

/*
* We need to clear current proces (VACUUUM) status flag here. Why?
* Current process has flag `PROC_IN_VACUUM` which is problematic because
* we write here into metadata heap tables. If concurrent process
* read page in which we inserted metadata tuples these tuples will be considered
* DEAD and will be removed (in problematic scenarion).
* Concurrent process, when assigning RecentXmin will scan all active processes in
* system but will NOT consider this process because of
* `PROC_IN_VACUUM` flag set. It looks that invalidating status flag here doesn't affect
* further execution.
*/

LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
#if PG_VERSION_NUM >= PG_VERSION_14
MyProc->statusFlags = 0;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
#else
MyPgXact->vacuumFlags = 0;
#endif
LWLockRelease(ProcArrayLock);

/* We need to re-assing RecentXmin here */
PushActiveSnapshot(GetTransactionSnapshot());

ColumnarWriteState *writeState = ColumnarBeginWrite(rel->rd_node,
columnarOptions,
tupleDesc);


/* we need all columns */
int natts = rel->rd_att->natts;
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);

/* no quals for table rewrite */
List *scanQual = NIL;

/* use SnapshotAny when re-writing table as heapAM does */
Snapshot snapshot = SnapshotAny;

MemoryContext scanContext = CreateColumnarScanMemoryContext();
bool randomAccess = true;
ColumnarReadState *readState = init_columnar_read_state(rel, tupleDesc,
attr_needed, scanQual,
scanContext, snapshot,
scanContext, SnapshotAny,
randomAccess,
NULL);

ColumnaSetStripeReadState(readState,
list_nth(stripeMetadataList, startingStripeListPosition - 1));
ColumnarSetStripeReadState(readState,
list_nth(stripeMetadataList, startingStripeListPosition - 1));

Datum *values = palloc0(tupleDesc->natts * sizeof(Datum));
bool *nulls = palloc0(tupleDesc->natts * sizeof(bool));
Expand All @@ -1249,14 +1272,16 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)

ColumnarStorageTruncate(rel, newDataReservation);

ColumnarEndWrite(writeState);
ColumnarEndRead(readState);

for (int i = 0; i < startingStripeListPosition; i++)
{
StripeMetadata *metadata = list_nth(stripeMetadataList, i);
DeleteMetadataRowsForStripeId(rel->rd_node, metadata->id);
}

ColumnarEndWrite(writeState);
ColumnarEndRead(readState);
PopActiveSnapshot();

return true;
}
Expand Down
4 changes: 2 additions & 2 deletions columnar/src/include/columnar/columnar.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
extern bool ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata);
extern bool ColumnarSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata);

/* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
Expand Down
47 changes: 27 additions & 20 deletions columnar/src/test/regress/expected/columnar_vacuum.out
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ SET columnar.compression TO default;
SET columnar.stripe_row_limit TO default;
SET columnar.chunk_group_row_limit TO default;
CREATE TABLE t(a INT, b INT) USING columnar;
SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
Expand All @@ -305,20 +306,20 @@ SELECT COUNT(*) = 2 FROM columnar.stripe;
VACUUM t;
-- No change since we can't combine 2 stripe because row_number is higher
-- than maximum row number per stripe
SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

DELETE FROM t WHERE a % 2 = 0;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
Expand All @@ -327,19 +328,19 @@ SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0;
VACUUM t;
-- Stripes are merged into one stripe because total number of non-deleted rows
-- is less than maximum stripe row number
SELECT COUNT(*) = 1 FROM columnar.stripe;
SELECT COUNT(*) = 1 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
count
-------
10
(1 row)

SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;
count
-------
10
Expand All @@ -348,18 +349,22 @@ SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0;
DROP TABLE t;
-- Vacuum on single stripe
CREATE TABLE t(a INT, b INT) USING columnar;
SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask) FROM columnar.chunk_group;
SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask WHERE storage_id = :t_oid)
FROM columnar.chunk_group WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask \gset
SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group WHERE storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask WHERE storage_id = :t_oid \gset
DELETE FROM t WHERE a % 2 = 0;
SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset
SELECT :columnar_chunk_group_after_delete_rows = :columnar_chunk_group_rows;
?column?
----------
Expand All @@ -372,39 +377,41 @@ SELECT :columnar_row_mask_after_delete_rows = :columnar_row_mask_rows;
t
(1 row)

SELECT SUM(deleted_rows) FROM columnar.row_mask;
SELECT SUM(deleted_rows) FROM columnar.row_mask WHERE storage_id = :t_oid;
sum
-------
50000
(1 row)

SELECT SUM(deleted_rows) FROM columnar.chunk_group;
SELECT SUM(deleted_rows) FROM columnar.chunk_group WHERE storage_id = :t_oid;
sum
-------
50000
(1 row)

VACUUM t;
-- No more deleted_rows after vacuum
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group
WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask
WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
Expand All @@ -421,13 +428,13 @@ SELECT (:table_count_mod_7 / :table_count) < 0.2;
DELETE FROM t WHERE a % 7 = 0;
VACUUM t;
-- Vacuuming will not be done because number of deleted rows / total_rows is less than 20%
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask ;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid;
?column?
----------
t
Expand Down
Loading