-
Notifications
You must be signed in to change notification settings - Fork 496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
buffered writer: handle write errors by retrying all write IO errors indefinitely #10993
base: main
Are you sure you want to change the base?
Changes from all commits
adbbec2
cbf7354
4d73357
a03f335
e935aff
80f2c26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,14 @@ | ||
use std::ops::ControlFlow; | ||
use std::sync::Arc; | ||
|
||
use once_cell::sync::Lazy; | ||
use tokio_util::sync::CancellationToken; | ||
use tracing::{Instrument, info, info_span, warn}; | ||
use utils::sync::duplex; | ||
|
||
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; | ||
use crate::context::RequestContext; | ||
use crate::virtual_file::MaybeFatalIo; | ||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned; | ||
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; | ||
|
||
|
@@ -118,18 +123,22 @@ where | |
buf: B, | ||
gate_guard: utils::sync::gate::GateGuard, | ||
ctx: RequestContext, | ||
span: tracing::Span, | ||
) -> Self | ||
where | ||
B: Buffer<IoBuf = Buf> + Send + 'static, | ||
{ | ||
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. | ||
let (front, back) = duplex::mpsc::channel(1); | ||
|
||
let join_handle = tokio::spawn(async move { | ||
FlushBackgroundTask::new(back, file, gate_guard, ctx) | ||
.run(buf.flush()) | ||
.await | ||
}); | ||
let join_handle = tokio::spawn( | ||
async move { | ||
FlushBackgroundTask::new(back, file, gate_guard, ctx) | ||
.run(buf.flush()) | ||
.await | ||
} | ||
.instrument(span), | ||
); | ||
|
||
FlushHandle { | ||
inner: Some(FlushHandleInner { | ||
|
@@ -236,6 +245,7 @@ where | |
/// The passed in slice is immediately sent back to the flush handle through the duplex channel. | ||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> { | ||
// Sends the extra buffer back to the handle. | ||
// TODO: can this ever await and or fail? I think not. | ||
self.channel.send(slice).await.map_err(|_| { | ||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") | ||
})?; | ||
|
@@ -251,10 +261,47 @@ where | |
} | ||
|
||
// Write slice to disk at `offset`. | ||
let slice = self | ||
.writer | ||
.write_all_at(request.slice, request.offset, &self.ctx) | ||
.await?; | ||
// | ||
// Error handling happens according to the current policy of crashing | ||
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). | ||
// (The upper layers of the Pageserver write path are not equipped to retry write errors | ||
// becasuse they often deallocate the buffers that were already written). | ||
// | ||
// TODO: cancellation sensitiity. | ||
// Without it, if we hit a bug where retrying is never successful, | ||
// then we can't shut down the timeline/tenant/pageserver cleanly because | ||
// layers of the Pageserver write path are holding the gate open for EphemeralFile. | ||
// | ||
// TODO: use utils::backoff::retry once async closures are actually usable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't async closures just get stabilized in 1.85? (if this comment is calling out a specific outstanding issue then let's mention or link it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried in this PR, they're not good yet: cbf7354 https://neondb.slack.com/archives/C0277TKAJCA/p1740579692226909 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the commit where I gave up: a03f335 |
||
// | ||
let mut slice_storage = Some(request.slice); | ||
for attempt in 1.. { | ||
let result = async { | ||
if attempt > 1 { | ||
info!("retrying flush"); | ||
} | ||
let slice = slice_storage.take().expect( | ||
"likely previous invocation of this future didn't get polled to completion", | ||
); | ||
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; | ||
slice_storage = Some(slice); | ||
let res = res.maybe_fatal_err("owned_buffers_io flush"); | ||
let Err(err) = res else { | ||
return ControlFlow::Break(()); | ||
}; | ||
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); | ||
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new); | ||
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; | ||
ControlFlow::Continue(()) | ||
} | ||
.instrument(info_span!("flush_attempt", %attempt)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A little concerned about extra spans this deep in the I/O stack -- I guess the idea is that in the rare event where we retry here, we want to know who/what is retrying rather than it just looking like a hang. Is there some way we can do that without constructing the spans on the happy path? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What we are really paying for on the happy path is the span lifecycle. I don't know if this would show up without benchmarking. We could only instrument if |
||
.await; | ||
match result { | ||
ControlFlow::Break(()) => break, | ||
ControlFlow::Continue(()) => continue, | ||
} | ||
} | ||
let slice = slice_storage.expect("loop must have run at least once"); | ||
|
||
#[cfg(test)] | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't checked the ingest code, but are these writers kind of short-lived (like one wal record?)
Just thinking about perf impact of the extra span