Skip to content

Commit

Permalink
pageserver: add page_trace API for debugging (#10293)
Browse files Browse the repository at this point in the history
## Problem

When a pageserver is receiving high rates of requests, we don't have a
good way to efficiently discover what the client's access pattern is.

Closes: #10275

## Summary of changes

- Add
`/v1/tenant/x/timeline/y/page_trace?size_limit_bytes=...&time_limit_secs=...`
API, which returns a binary buffer.
- Add `pagectl page-trace` tool to decode and analyze the output.

---------

Co-authored-by: Erik Grinaker <erik@neon.tech>
  • Loading branch information
jcsp and erikgrinaker authored Jan 15, 2025
1 parent efaec6c commit fb0e2ac
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 3 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub struct Key {

/// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as
/// a struct of fields.
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, Debug)]
#[derive(
Clone, Copy, Default, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, Debug,
)]
pub struct CompactKey(i128);

/// The storage key size.
Expand Down
19 changes: 18 additions & 1 deletion libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use utils::{
};

use crate::{
key::Key,
key::{CompactKey, Key},
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
Expand Down Expand Up @@ -1981,6 +1981,23 @@ impl PagestreamBeMessage {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct PageTraceEvent {
pub key: CompactKey,
pub effective_lsn: Lsn,
pub time: SystemTime,
}

impl Default for PageTraceEvent {
fn default() -> Self {
Self {
key: Default::default(),
effective_lsn: Default::default(),
time: std::time::UNIX_EPOCH,
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
bit_field.workspace = true
bincode.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions pageserver/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ license.workspace = true

[dependencies]
anyhow.workspace = true
bincode.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["string"] }
humantime.workspace = true
itertools.workspace = true
pageserver = { path = ".." }
pageserver_api.workspace = true
remote_storage = { path = "../../libs/remote_storage" }
Expand Down
4 changes: 4 additions & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ mod index_part;
mod key;
mod layer_map_analyzer;
mod layers;
mod page_trace;

use page_trace::PageTraceCmd;
use std::{
str::FromStr,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -64,6 +66,7 @@ enum Commands {
Layer(LayerCmd),
/// Debug print a hex key found from logs
Key(key::DescribeKeyCommand),
PageTrace(PageTraceCmd),
}

/// Read and update pageserver metadata file
Expand Down Expand Up @@ -183,6 +186,7 @@ async fn main() -> anyhow::Result<()> {
.await?;
}
Commands::Key(dkc) => dkc.execute(),
Commands::PageTrace(cmd) => page_trace::main(&cmd)?,
};
Ok(())
}
Expand Down
73 changes: 73 additions & 0 deletions pageserver/ctl/src/page_trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::collections::HashMap;
use std::io::BufReader;

use camino::Utf8PathBuf;
use clap::Parser;
use itertools::Itertools as _;
use pageserver_api::key::{CompactKey, Key};
use pageserver_api::models::PageTraceEvent;
use pageserver_api::reltag::RelTag;

/// Parses a page trace (as emitted by the `page_trace` timeline API), and outputs stats.
#[derive(Parser)]
pub(crate) struct PageTraceCmd {
/// Trace input file.
path: Utf8PathBuf,
}

pub(crate) fn main(cmd: &PageTraceCmd) -> anyhow::Result<()> {
let mut file = BufReader::new(std::fs::OpenOptions::new().read(true).open(&cmd.path)?);
let mut events: Vec<PageTraceEvent> = Vec::new();
loop {
match bincode::deserialize_from(&mut file) {
Ok(event) => events.push(event),
Err(err) => {
if let bincode::ErrorKind::Io(ref err) = *err {
if err.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
}
return Err(err.into());
}
}
}

let mut reads_by_relation: HashMap<RelTag, i64> = HashMap::new();
let mut reads_by_key: HashMap<CompactKey, i64> = HashMap::new();

for event in events {
let key = Key::from_compact(event.key);
let reltag = RelTag {
spcnode: key.field2,
dbnode: key.field3,
relnode: key.field4,
forknum: key.field5,
};

*reads_by_relation.entry(reltag).or_default() += 1;
*reads_by_key.entry(event.key).or_default() += 1;
}

let multi_read_keys = reads_by_key
.into_iter()
.filter(|(_, count)| *count > 1)
.sorted_by_key(|(key, count)| (-*count, *key))
.collect_vec();

println!("Multi-read keys: {}", multi_read_keys.len());
for (key, count) in multi_read_keys {
println!(" {key}: {count}");
}

let reads_by_relation = reads_by_relation
.into_iter()
.sorted_by_key(|(rel, count)| (-*count, *rel))
.collect_vec();

println!("Reads by relation:");
for (reltag, count) in reads_by_relation {
println!(" {reltag}: {count}");
}

Ok(())
}
72 changes: 72 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use pageserver_api::models::LocationConfigMode;
use pageserver_api::models::LsnLease;
use pageserver_api::models::LsnLeaseRequest;
use pageserver_api::models::OffloadedTimelineInfo;
use pageserver_api::models::PageTraceEvent;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantConfigPatchRequest;
use pageserver_api::models::TenantDetails;
Expand All @@ -51,7 +52,9 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeTravelError;
use scopeguard::defer;
use tenant_size_model::{svg::SvgBranchKind, SizeResult, StorageModel};
use tokio::time::Instant;
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -1521,6 +1524,71 @@ async fn timeline_gc_unblocking_handler(
block_or_unblock_gc(request, false).await
}

/// Traces GetPage@LSN requests for a timeline, and emits metadata in an efficient binary encoding.
/// Use the `pagectl page-trace` command to decode and analyze the output.
async fn timeline_page_trace_handler(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let state = get_state(&request);
check_permission(&request, None)?;

let size_limit: usize = parse_query_param(&request, "size_limit_bytes")?.unwrap_or(1024 * 1024);
let time_limit_secs: u64 = parse_query_param(&request, "time_limit_secs")?.unwrap_or(5);

// Convert size limit to event limit based on the serialized size of an event. The event size is
// fixed, as the default bincode serializer uses fixed-width integer encoding.
let event_size = bincode::serialize(&PageTraceEvent::default())
.map_err(|err| ApiError::InternalServerError(err.into()))?
.len();
let event_limit = size_limit / event_size;

let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

// Install a page trace, unless one is already in progress. We just use a buffered channel,
// which may 2x the memory usage in the worst case, but it's still bounded.
let (trace_tx, mut trace_rx) = tokio::sync::mpsc::channel(event_limit);
let cur = timeline.page_trace.load();
let installed = cur.is_none()
&& timeline
.page_trace
.compare_and_swap(cur, Some(Arc::new(trace_tx)))
.is_none();
if !installed {
return Err(ApiError::Conflict("page trace already active".to_string()));
}
defer!(timeline.page_trace.store(None)); // uninstall on return

// Collect the trace and return it to the client. We could stream the response, but this is
// simple and fine.
let mut body = Vec::with_capacity(size_limit);
let deadline = Instant::now() + Duration::from_secs(time_limit_secs);

while body.len() < size_limit {
tokio::select! {
event = trace_rx.recv() => {
let Some(event) = event else {
break; // shouldn't happen (sender doesn't close, unless timeline dropped)
};
bincode::serialize_into(&mut body, &event)
.map_err(|err| ApiError::InternalServerError(err.into()))?;
}
_ = tokio::time::sleep_until(deadline) => break, // time limit reached
_ = cancel.cancelled() => return Err(ApiError::Cancelled),
}
}

Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(body))
.unwrap())
}

/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
///
/// Both are technically unsafe because they might fire off index uploads, thus they are POST.
Expand Down Expand Up @@ -3479,6 +3547,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc",
|r| api_handler(r, timeline_gc_unblocking_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/page_trace",
|r| api_handler(r, timeline_page_trace_handler),
)
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
api_handler(r, secondary_upload_handler)
})
Expand Down
15 changes: 15 additions & 0 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::{basebackup, timed_after_cancellation};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::PageTraceEvent;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
Expand Down Expand Up @@ -1718,6 +1719,20 @@ impl PageServerHandler {
.query_metrics
.observe_getpage_batch_start(requests.len());

// If a page trace is running, submit an event for this request.
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
let time = SystemTime::now();
for batch in &requests {
let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
// Ignore error (trace buffer may be full or tracer may have disconnected).
_ = page_trace.try_send(PageTraceEvent {
key,
effective_lsn,
time,
});
}
}

let results = timeline
.get_rel_page_at_lsn_batched(
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
Expand Down
9 changes: 8 additions & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod uninit;
mod walreceiver;

use anyhow::{anyhow, bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
Expand All @@ -23,6 +23,7 @@ use fail::fail_point;
use handle::ShardTimelineId;
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::models::PageTraceEvent;
use pageserver_api::{
config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD,
key::{
Expand All @@ -42,6 +43,7 @@ use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::sync::mpsc::Sender;
use tokio::{
runtime::Handle,
sync::{oneshot, watch},
Expand Down Expand Up @@ -433,6 +435,9 @@ pub struct Timeline {

/// Cf. [`crate::tenant::CreateTimelineIdempotency`].
pub(crate) create_idempotency: crate::tenant::CreateTimelineIdempotency,

/// If Some, collects GetPage metadata for an ongoing PageTrace.
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
}

pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
Expand Down Expand Up @@ -2380,6 +2385,8 @@ impl Timeline {
attach_wal_lag_cooldown,

create_idempotency,

page_trace: Default::default(),
};

result.repartition_threshold =
Expand Down

1 comment on commit fb0e2ac

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7315 tests run: 6938 passed, 1 failed, 376 skipped (full report)


Failures on Postgres 16

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_physical_replication_config_mismatch_max_locks_per_transaction[release-pg16]"
Flaky tests (5)

Postgres 17

Postgres 15

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-x86-64

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
fb0e2ac at 2025-01-15T20:02:57.639Z :recycle:

Please sign in to comment.