Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffered writer: handle write errors by retrying all write IO errors indefinitely #10993

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
}
}

pub const INFINITE_RETRIES: u32 = u32::MAX;

/// Retries passed operation until one of the following conditions are met:
/// - encountered error is considered as permanent (non-retryable)
/// - retries have been exhausted
Expand All @@ -56,6 +58,8 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
/// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
/// for any other error type. Final failed attempt is logged with `{:?}`.
///
/// Pass [`INFINITE_RETRIES`] as `max_retries` to retry forever.
///
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
pub async fn retry<T, O, F, E>(
mut op: O,
Expand Down Expand Up @@ -96,7 +100,7 @@ where
Err(err) if attempts < warn_threshold => {
tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(err) if attempts < max_retries => {
Err(err) if attempts < max_retries || max_retries == INFINITE_RETRIES => {
tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(err) => {
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use tracing::{error, info_span};
use utils::id::TimelineId;

use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
Expand Down Expand Up @@ -76,6 +76,7 @@ impl EphemeralFile {
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{info_span, warn};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
Expand Down Expand Up @@ -229,6 +229,7 @@ async fn download_object(
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
ctx,
info_span!(parent: None, "download_object_buffered_writer", %dst_path),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked the ingest code, but are these writers kind of short-lived (like one wal record?)

Just thinking about perf impact of the extra span

);

// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
Expand Down
5 changes: 2 additions & 3 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,9 +1344,8 @@ impl OwnedAsyncWriter for VirtualFile {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
res.map(|_| buf)
) -> (FullSlice<Buf>, std::io::Result<()>) {
VirtualFile::write_all_at(self, buf, offset, ctx).await
}
}

Expand Down
9 changes: 6 additions & 3 deletions pageserver/src/virtual_file/owned_buffers_io/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
}

/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
Expand Down Expand Up @@ -66,6 +66,7 @@ where
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
Expand All @@ -75,6 +76,7 @@ where
buf_new(),
gate_guard,
ctx.attached_child(),
flush_task_span,
),
bytes_submitted: 0,
}
Expand Down Expand Up @@ -269,12 +271,12 @@ mod tests {
buf: FullSlice<Buf>,
offset: u64,
_: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
) -> (FullSlice<Buf>, std::io::Result<()>) {
self.writes
.lock()
.unwrap()
.push((Vec::from(&buf[..]), offset));
Ok(buf)
(buf, Ok(()))
}
}

Expand All @@ -293,6 +295,7 @@ mod tests {
|| IoBufferMut::with_capacity(2),
gate.enter()?,
ctx,
tracing::Span::none(),
);

writer.write_buffered_borrowed(b"abc", ctx).await?;
Expand Down
65 changes: 56 additions & 9 deletions pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::ops::ControlFlow;
use std::sync::Arc;

use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
use utils::sync::duplex;

use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
use crate::context::RequestContext;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;

Expand Down Expand Up @@ -118,18 +123,22 @@ where
buf: B,
gate_guard: utils::sync::gate::GateGuard,
ctx: RequestContext,
span: tracing::Span,
) -> Self
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
let (front, back) = duplex::mpsc::channel(1);

let join_handle = tokio::spawn(async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
});
let join_handle = tokio::spawn(
async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
}
.instrument(span),
);

FlushHandle {
inner: Some(FlushHandleInner {
Expand Down Expand Up @@ -236,6 +245,7 @@ where
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
// TODO: can this ever await and or fail? I think not.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
Expand All @@ -251,10 +261,47 @@ where
}

// Write slice to disk at `offset`.
let slice = self
.writer
.write_all_at(request.slice, request.offset, &self.ctx)
.await?;
//
// Error handling happens according to the current policy of crashing
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
// (The upper layers of the Pageserver write path are not equipped to retry write errors
// becasuse they often deallocate the buffers that were already written).
//
// TODO: cancellation sensitiity.
// Without it, if we hit a bug where retrying is never successful,
// then we can't shut down the timeline/tenant/pageserver cleanly because
// layers of the Pageserver write path are holding the gate open for EphemeralFile.
//
// TODO: use utils::backoff::retry once async closures are actually usable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't async closures just get stabilized in 1.85? (if this comment is calling out a specific outstanding issue then let's mention or link it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the commit where I gave up: a03f335

//
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
let result = async {
if attempt > 1 {
info!("retrying flush");
}
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = res.maybe_fatal_err("owned_buffers_io flush");
let Err(err) = res else {
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little concerned about extra spans this deep in the I/O stack -- I guess the idea is that in the rare event where we retry here, we want to know who/what is retrying rather than it just looking like a hang. Is there some way we can do that without constructing the spans on the happy path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we are really paying for on the happy path is the span lifecycle.
There's atomic operations when creating and exiting the span (to determine the right subscriber).
The span allocation itself is pooled, so should be fairly cheap.

I don't know if this would show up without benchmarking. We could only instrument if attempt > 1
and not worry about it.

.await;
match result {
ControlFlow::Break(()) => break,
ControlFlow::Continue(()) => continue,
}
}
let slice = slice_storage.expect("loop must have run at least once");

#[cfg(test)]
{
Expand Down
Loading