From a753349cb0d8a76b7b5884c216236f6e57357d30 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 15 Jan 2025 17:04:06 -0500 Subject: [PATCH] feat(pageserver): validate data integrity during gc-compaction (#10131) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem part of /~https://github.com/neondatabase/neon/issues/9114 part of investigation of /~https://github.com/neondatabase/neon/issues/10049 ## Summary of changes * If `cfg!(test) or cfg!(feature = testing)`, then we will always try generating an image to ensure the history is replayable, but not put the image layer into the final layer results, therefore discovering wrong key history before we hit a read error. * I suspect it's easier to trigger some races if gc-compaction is continuously run on a timeline, so I increased the frequency to twice per 10 churns. * Also, create branches in gc-compaction smoke tests to get more test coverage. --------- Signed-off-by: Alex Chi Z Co-authored-by: Arpad Müller --- pageserver/src/tenant/timeline/compaction.rs | 77 +++++++++++++++----- test_runner/fixtures/workload.py | 16 ++++ test_runner/regress/test_compaction.py | 20 ++++- 3 files changed, 89 insertions(+), 24 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 05f8d476f93d..2042a18e96e1 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1776,7 +1776,10 @@ impl Timeline { base_img_from_ancestor: Option<(Key, Lsn, Bytes)>, ) -> anyhow::Result { // Pre-checks for the invariants - if cfg!(debug_assertions) { + + let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing"); + + if debug_mode { for (log_key, _, _) in full_history { assert_eq!(log_key, &key, "mismatched key"); } @@ -1922,15 +1925,19 @@ impl Timeline { output } + let mut key_exists = false; for (i, split_for_lsn) in split_history.into_iter().enumerate() { // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly. records_since_last_image += split_for_lsn.len(); - let generate_image = if i == 0 && !has_ancestor { + // Whether to produce an image into the final layer files + let produce_image = if i == 0 && !has_ancestor { // We always generate images for the first batch (below horizon / lowest retain_lsn) true } else if i == batch_cnt - 1 { // Do not generate images for the last batch (above horizon) false + } else if records_since_last_image == 0 { + false } else if records_since_last_image >= delta_threshold_cnt { // Generate images when there are too many records true @@ -1945,29 +1952,45 @@ impl Timeline { break; } } - if let Some((_, _, val)) = replay_history.first() { - if !val.will_init() { - return Err(anyhow::anyhow!("invalid history, no base image")).with_context( - || { - generate_debug_trace( - Some(&replay_history), - full_history, - retain_lsn_below_horizon, - horizon, - ) - }, - ); - } + if replay_history.is_empty() && !key_exists { + // The key does not exist at earlier LSN, we can skip this iteration. + retention.push(Vec::new()); + continue; + } else { + key_exists = true; } - if generate_image && records_since_last_image > 0 { + let Some((_, _, val)) = replay_history.first() else { + unreachable!("replay history should not be empty once it exists") + }; + if !val.will_init() { + return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| { + generate_debug_trace( + Some(&replay_history), + full_history, + retain_lsn_below_horizon, + horizon, + ) + }); + } + // Whether to reconstruct the image. In debug mode, we will generate an image + // at every retain_lsn to ensure data is not corrupted, but we won't put the + // image into the final layer. + let generate_image = produce_image || debug_mode; + if produce_image { records_since_last_image = 0; - let replay_history_for_debug = if cfg!(debug_assertions) { + } + let img_and_lsn = if generate_image { + let replay_history_for_debug = if debug_mode { Some(replay_history.clone()) } else { None }; let replay_history_for_debug_ref = replay_history_for_debug.as_deref(); - let history = std::mem::take(&mut replay_history); + let history = if produce_image { + std::mem::take(&mut replay_history) + } else { + replay_history.clone() + }; let mut img = None; let mut records = Vec::with_capacity(history.len()); if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() { @@ -2004,8 +2027,20 @@ impl Timeline { } records.reverse(); let state = ValueReconstructState { img, records }; - let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range + // last batch does not generate image so i is always in range, unless we force generate + // an image during testing + let request_lsn = if i >= lsn_split_points.len() { + Lsn::MAX + } else { + lsn_split_points[i] + }; let img = self.reconstruct_value(key, request_lsn, state).await?; + Some((request_lsn, img)) + } else { + None + }; + if produce_image { + let (request_lsn, img) = img_and_lsn.unwrap(); replay_history.push((key, request_lsn, Value::Image(img.clone()))); retention.push(vec![(request_lsn, Value::Image(img))]); } else { @@ -2273,6 +2308,8 @@ impl Timeline { let compact_key_range = job.compact_key_range; let compact_lsn_range = job.compact_lsn_range; + let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing"); + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end); scopeguard::defer! { @@ -2398,7 +2435,7 @@ impl Timeline { .first() .copied() .unwrap_or(job_desc.gc_cutoff); - if cfg!(debug_assertions) { + if debug_mode { assert_eq!( res, job_desc diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 1b8c9fef4444..eea0ec2b95c3 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -53,6 +53,22 @@ def __init__( self._endpoint: Endpoint | None = None self._endpoint_opts = endpoint_opts or {} + def branch( + self, + timeline_id: TimelineId, + branch_name: str | None = None, + endpoint_opts: dict[str, Any] | None = None, + ) -> Workload: + """ + Checkpoint the current status of the workload in case of branching + """ + branch_workload = Workload( + self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts + ) + branch_workload.expect_rows = self.expect_rows + branch_workload.churn_cursor = self.churn_cursor + return branch_workload + def reconfigure(self) -> None: """ Request the endpoint to reconfigure based on location reported by storage controller diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index fe0422088a1a..d0a2349ccf0c 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -112,7 +112,11 @@ def test_pageserver_compaction_smoke( @skip_in_debug_build("only run with release build") -def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize( + "with_branches", + ["with_branches", "no_branches"], +) +def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str): SMOKE_CONF = { # Run both gc and gc-compaction. "gc_period": "5s", @@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Writing initial data ...") workload.write_rows(row_count, env.pageserver.id) + child_workloads: list[Workload] = [] + for i in range(1, churn_rounds + 1): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") - - if (i - 1) % 10 == 0: - # Run gc-compaction every 10 rounds to ensure the test doesn't take too long time. + if i % 10 == 5 and with_branches == "with_branches": + branch_name = f"child-{i}" + branch_timeline_id = env.create_branch(branch_name) + child_workloads.append(workload.branch(branch_timeline_id, branch_name)) + if (i - 1) % 10 == 0 or (i - 1) % 10 == 1: + # Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time. ps_http.timeline_compact( tenant_id, timeline_id, @@ -179,6 +188,9 @@ def compaction_finished(): log.info("Validating at workload end ...") workload.validate(env.pageserver.id) + for child_workload in child_workloads: + log.info(f"Validating at branch {child_workload.branch_name}") + child_workload.validate(env.pageserver.id) # Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction. ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)