Skip to content

Commit

Permalink
alternative approach to cancellation where we wait for interest in th…
Browse files Browse the repository at this point in the history
…e result to go away
  • Loading branch information
problame committed Feb 28, 2025
1 parent 9040f2f commit a53987c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
9 changes: 8 additions & 1 deletion libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub async fn exponential_backoff(
max_seconds: f64,
cancel: &CancellationToken,
) {
exponential_backoff2(n, base_increment, max_seconds, cancel.cancelled()).await
}

pub async fn exponential_backoff2<F>(n: u32, base_increment: f64, max_seconds: f64, cancel: F)
where
F: Future<Output = ()>,
{
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
Expand All @@ -23,7 +30,7 @@ pub async fn exponential_backoff(
drop(
tokio::time::timeout(
std::time::Duration::from_secs_f64(backoff_duration_seconds),
cancel.cancelled(),
cancel,
)
.await,
)
Expand Down
50 changes: 48 additions & 2 deletions libs/utils/src/sync/duplex/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use tokio::sync::mpsc;

/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
tx: mpsc::Sender<S>,
rx: mpsc::Receiver<R>,
}

/// Creates a bi-directional channel.
Expand Down Expand Up @@ -33,4 +33,50 @@ impl<S: Send, R: Send> Duplex<S, R> {
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}

pub fn close(self) {
let Self { tx: _, rx } = { self };
drop(rx); // this makes the other Duplex's tx resolve on tx.closed()
}

/// Future
pub async fn closed(&self) {
self.tx.closed().await
}

pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

const FOREVER: Duration = Duration::from_secs(100 * 365 * 24 * 60 * 60);

#[tokio::test(start_paused = true)]
async fn test_await_close() {
let (a, mut b) = super::channel::<i32, i32>(1);

let mut recv_fut = Box::pin(b.recv());

tokio::select! {
_ = &mut recv_fut => unreachable!("nothing was sent"),
_ = tokio::time::sleep(FOREVER) => (),
}

a.close();

tokio::select! {
res = &mut recv_fut => {
assert!(res.is_none());
},
_ = tokio::time::sleep(FOREVER) => (),
}

drop(recv_fut);

assert!(b.is_closed());
}
}
13 changes: 10 additions & 3 deletions pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ where
.inner
.take()
.expect("must not use after we returned an error");
drop(handle.channel.tx);
handle.channel.close();
handle.join_handle.await.unwrap()
}

Expand Down Expand Up @@ -276,6 +276,14 @@ where
//
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
if self.channel.is_closed() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"flush handle shutting down or dropped during flush attempt number {attempt}"
),
));
}
let result = async {
if attempt > 1 {
info!("retrying flush");
Expand All @@ -290,8 +298,7 @@ where
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
utils::backoff::exponential_backoff2(attempt, 1.0, 10.0, self.channel.closed()).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))
Expand Down

0 comments on commit a53987c

Please sign in to comment.