Skip to content

Commit

Permalink
ancestor detach: delete hardlinked layers on error (#10977)
Browse files Browse the repository at this point in the history
Delete layers that we have hardlinked so far when there is an error in
`remote_copy`. This prevents a retry of the ancestor detach from
stumbling over already present layer files: the hardlink would fail with
an error.

If there is a crash, we already clean up during the timeline attach: we
loop over all layer files and purge all layers that are not referenced
by the `index_part.json`.

Make sure to hold the timeline gate to prevent races with
detach&attach&read from the layer file.

These cleanups aren't completely enough however, as there is code after
`prepare` as well. To handle errors there, we add a special case for
`AlreadyExists` errors during the hardlink, where we check if the layer
is an orphan, and if yes, we delete it from local disk. That is ideally
not the case we hit, as it is less clear in that scenario where the
layer came from, but it provides good defense in depth.

Related #10729
Fixes #10970
  • Loading branch information
arpad-m authored Feb 26, 2025
1 parent 86b9703 commit 1434763
Showing 1 changed file with 84 additions and 8 deletions.
92 changes: 84 additions & 8 deletions pageserver/src/tenant/timeline/detach_ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use utils::completion;
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;

use super::layer_manager::LayerManager;
use super::{FlushLayerError, Timeline};
Expand Down Expand Up @@ -363,14 +364,25 @@ pub(super) async fn prepare(

let mut tasks = tokio::task::JoinSet::new();
let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
let cancel_eval = CancellationToken::new();

for adopted in rest_of_historic {
let limiter = limiter.clone();
let timeline = detached.clone();
let cancel_eval = cancel_eval.clone();

tasks.spawn(
async move {
let _permit = limiter.acquire().await;
let _permit = tokio::select! {
permit = limiter.acquire() => {
permit
}
// Wait for the cancellation here instead of letting the entire task be cancelled.
// Cancellations are racy in that they might leave layers on disk.
_ = cancel_eval.cancelled() => {
Err(Error::ShuttingDown)?
}
};
let (owned, did_hardlink) = remote_copy(
&adopted,
&timeline,
Expand All @@ -386,7 +398,22 @@ pub(super) async fn prepare(
);
}

fn delete_layers(timeline: &Timeline, layers: Vec<Layer>) -> Result<(), Error> {
// We are deleting layers, so we must hold the gate
let _gate = timeline.gate.enter().map_err(|e| match e {
GateError::GateClosed => Error::ShuttingDown,
})?;
{
layers.into_iter().for_each(|l: Layer| {
l.delete_on_drop();
std::mem::drop(l);
});
}
Ok(())
}

let mut should_fsync = false;
let mut first_err = None;
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok((owned, did_hardlink))) => {
Expand All @@ -395,13 +422,24 @@ pub(super) async fn prepare(
}
new_layers.push(owned);
}

// Don't stop the evaluation on errors, so that we get the full set of hardlinked layers to delete.
Ok(Err(failed)) => {
return Err(failed);
cancel_eval.cancel();
first_err.get_or_insert(failed);
}
Err(je) => {
cancel_eval.cancel();
first_err.get_or_insert(Error::Prepare(je.into()));
}
Err(je) => return Err(Error::Prepare(je.into())),
}
}

if let Some(failed) = first_err {
delete_layers(detached, new_layers)?;
return Err(failed);
}

// fsync directory again if we hardlinked something
if should_fsync {
fsync_timeline_dir(detached, ctx).await;
Expand Down Expand Up @@ -649,6 +687,11 @@ async fn remote_copy(
let conf = adoptee.conf;
let file_name = adopted.layer_desc().layer_name();

// We don't want to shut the timeline down during this operation because we do `delete_on_drop` below
let _gate = adoptee.gate.enter().map_err(|e| match e {
GateError::GateClosed => Error::ShuttingDown,
})?;

// depending if Layer::keep_resident, do a hardlink
let did_hardlink;
let owned = if let Some(adopted_resident) = adopted.keep_resident().await {
Expand All @@ -660,21 +703,54 @@ async fn remote_copy(
&file_name,
&metadata.generation,
);
std::fs::hard_link(adopted_path, &adoptee_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;

match std::fs::hard_link(adopted_path, &adoptee_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
// In theory we should not get into this situation as we are doing cleanups of the layer file after errors.
// However, we don't do cleanups for errors past `prepare`, so there is the slight chance to get to this branch.

// Double check that the file is orphan (probably from an earlier attempt), then delete it
let key = file_name.clone().into();
if adoptee.layers.read().await.contains_key(&key) {
// We are supposed to filter out such cases before coming to this function
return Err(Error::Prepare(anyhow::anyhow!(
"layer file {file_name} already present and inside layer map"
)));
}
tracing::info!("Deleting orphan layer file to make way for hard linking");
// Delete orphan layer file and try again, to ensure this layer has a well understood source
std::fs::remove_file(adopted_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;
std::fs::hard_link(adopted_path, &adoptee_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;
}
Err(e) => {
return Err(Error::launder(e.into(), Error::Prepare));
}
};
did_hardlink = true;
Layer::for_resident(conf, adoptee, adoptee_path, file_name, metadata).drop_eviction_guard()
} else {
did_hardlink = false;
Layer::for_evicted(conf, adoptee, file_name, metadata)
};

let layer = adoptee
let layer = match adoptee
.remote_client
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
.map_err(|e| Error::launder(e, Error::Prepare))?;
{
Ok(()) => owned,
Err(e) => {
{
// Clean up the layer so that on a retry we don't get errors that the file already exists
owned.delete_on_drop();
std::mem::drop(owned);
}
return Err(Error::launder(e, Error::Prepare));
}
};

Ok((layer, did_hardlink))
}
Expand Down

1 comment on commit 1434763

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7896 tests run: 7507 passed, 0 failed, 389 skipped (full report)


Flaky tests (4)

Postgres 17

Postgres 16

Postgres 15

Code coverage* (full report)

  • functions: 32.8% (8636 of 26350 functions)
  • lines: 48.6% (73097 of 150404 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1434763 at 2025-02-26T19:00:15.634Z :recycle:

Please sign in to comment.