Skip to content

Commit

Permalink
Correctly check generationb in LFC
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Knizhnik committed Feb 21, 2025
1 parent de812fa commit a7ed5d3
Showing 1 changed file with 57 additions and 50 deletions.
107 changes: 57 additions & 50 deletions pgxn/neon/file_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ typedef struct FileCacheControl
bool lfc_store_prefetch_result;

static HTAB *lfc_hash;
static int lfc_desc = 0;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
Expand All @@ -170,6 +171,20 @@ static shmem_request_hook_type prev_shmem_request_hook;

#define LFC_ENABLED() (lfc_ctl->limit != 0)

/*
* Close LFC file if opened.
* All backends should close their LFC files once LFC is disabled.
*/
static void
lfc_close_file(void)
{
if (lfc_desc >= 0)
{
close(lfc_desc);
lfc_desc = -1;
}
}

/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
Expand All @@ -186,6 +201,7 @@ lfc_switch_off(void)
HASH_SEQ_STATUS status;
FileCacheEntry *entry;

/* Invalidate hash */
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
Expand All @@ -198,50 +214,30 @@ lfc_switch_off(void)
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);

if (lfc_desc > 0)
{
int rc;
/*
* We need to use unlink to to avoid races in LFC write, because it is not
* protected by lock
*/
unlink(lfc_path);

/*
* If the reason of error is ENOSPC, then truncation of file may
* help to reclaim some space
*/
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_TRUNCATE);
rc = ftruncate(lfc_desc, 0);
pgstat_report_wait_end();
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);

if (rc < 0)
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
}
/* Wakeup waiting backends */
for (int i = 0; i < N_COND_VARS; i++)
ConditionVariableBroadcast(&lfc_ctl->cv[i]);
}

/*
* We need to use unlink to to avoid races in LFC write, because it is not
* protected by lock
*/
unlink(lfc_path);

fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);

if (lfc_desc > 0)
close(lfc_desc);

lfc_desc = -1;
lfc_close_file();
}

static void
lfc_disable(char const *op)
{
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);

/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_switch_off();
LWLockRelease(lfc_lock);
Expand All @@ -256,11 +252,20 @@ lfc_maybe_disabled(void)
return !lfc_ctl || !LFC_ENABLED();
}

/*
* Open LFC file if not opened yet or generation is changed.
* Should be called under LFC lock.
*/
static bool
lfc_ensure_opened(void)
{
if (lfc_generation != lfc_ctl->generation)
{
lfc_close_file();
lfc_generation = lfc_ctl->generation;
}
/* Open cache file if not done yet */
if (lfc_desc <= 0)
if (lfc_desc < 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);

Expand Down Expand Up @@ -376,17 +381,22 @@ lfc_change_limit_hook(int newval, void *extra)
if (!lfc_ctl || !is_normal_backend())
return;

LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

/* Open LFC file only if LFC was enabled or we are going to reenable it */
if ((newval == 0 && !LFC_ENABLED()) || !lfc_ensure_opened())
if (newval == 0 && !LFC_ENABLED())
{
LWLockRelease(lfc_lock);
/* File should be reopened if LFC is reenabled */
if (lfc_desc > 0)
close(lfc_desc);
lfc_desc = -1;
lfc_close_file();
return;
}

LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!lfc_ensure_opened())
{
LWLockRelease(lfc_lock);
return;
}

if (lfc_ctl->limit != new_size)
{
Expand Down Expand Up @@ -665,9 +675,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return -1;

if (!lfc_ensure_opened())
return -1;

CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;

Expand Down Expand Up @@ -718,7 +725,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

/* We can return the blocks we've read before LFC got disabled;
* assuming we read any. */
if (!LFC_ENABLED())
if (!LFC_ENABLED() || !lfc_ensure_opened())
{
LWLockRelease(lfc_lock);
return blocks_read;
Expand Down Expand Up @@ -829,6 +836,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
else
{
/* generation mismatch, assume error condition */
lfc_close_file();
LWLockRelease(lfc_lock);
return -1;
}
Expand Down Expand Up @@ -975,9 +983,6 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;

if (!lfc_ensure_opened())
return false;

CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;

Expand All @@ -989,7 +994,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,

LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

if (!LFC_ENABLED())
if (!LFC_ENABLED() || !lfc_ensure_opened())
{
LWLockRelease(lfc_lock);
return false;
Expand Down Expand Up @@ -1081,6 +1086,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
SET_STATE(entry, chunk_offs, AVAILABLE);
}
}
else
{
lfc_close_file();
}
LWLockRelease(lfc_lock);
}
return true;
Expand All @@ -1106,17 +1115,14 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;

if (!lfc_ensure_opened())
return;

CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;

CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);

LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

if (!LFC_ENABLED())
if (!LFC_ENABLED() || !lfc_ensure_opened())
{
LWLockRelease(lfc_lock);
return;
Expand Down Expand Up @@ -1258,6 +1264,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
else
{
/* stop iteration if LFC was disabled */
lfc_close_file();
break;
}
}
Expand Down

0 comments on commit a7ed5d3

Please sign in to comment.