Skip to content

Commit

Permalink
higher kinded lifetime error
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Feb 26, 2025
1 parent adbbec2 commit cbf7354
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
69 changes: 64 additions & 5 deletions libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,80 @@ where
}
}

/// Async closure aware version of [`retry`].
pub async fn retry_async_closure<T, E>(
mut op: impl AsyncFnMut() -> Result<T, E>,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: &CancellationToken,
) -> Option<Result<T, E>>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
// For context see /~https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug + 'static,
{
let mut attempts = 0;
loop {
if cancel.is_cancelled() {
return None;
}

let result = op().await;
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("{description} succeeded after {attempts} retries");
}
return Some(result);
}

// These are "permanent" errors that should not be retried.
Err(e) if is_permanent(e) => {
return Some(result);
}
// Assume that any other failure might be transient, and the operation might
// succeed if we just keep trying.
Err(err) if attempts < warn_threshold => {
tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(err) if attempts < max_retries || max_retries == INFINITE_RETRIES => {
tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(err) => {
// Operation failed `max_attempts` times. Time to give up.
tracing::warn!(
"{description} still failed after {attempts} retries, giving up: {err:?}"
);
return Some(result);
}
}
// sleep and retry
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempts += 1;
}
}

/// Like [`retry`] but retries forever.
///
/// Returns `Some(T)` if operation succeeded, `None` if `cancel` was observed to have been cancelled before succeeded.
pub async fn retry_forever<T, O, F, E>(
mut op: O,
pub async fn retry_forever<T, E>(
mut op: impl AsyncFnMut() -> Result<T, E>,
warn_threshold: u32,
description: &str,
cancel: &CancellationToken,
) -> Option<T>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
match retry(
match retry_async_closure(
op,
|_| false,
warn_threshold,
Expand Down
9 changes: 4 additions & 5 deletions pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use chrono::Duration;
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::sync::duplex;

use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
Expand Down Expand Up @@ -270,14 +271,12 @@ where
// then we can't shut down the timeline/tenant/pageserver cleanrly because
// upp layers of the Pageserver write path are holding the gate open for EphemeralFile.
let mut slice_storage = Some(request.slice);
let op = || async {
let offset = request.offset;
let op = async || {
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
let (slice, res) = self
.writer
.write_all_at(slice, request.offset, &self.ctx)
.await;
let (slice, res) = self.writer.write_all_at(slice, offset, &self.ctx).await;
slice_storage = Some(slice);
res
};
Expand Down

0 comments on commit cbf7354

Please sign in to comment.