diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 32f487cfc062..038d8e803299 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -103,10 +103,22 @@ pub enum ArtifactState { last_time_needed: SystemTime, }, /// A task to prepare this artifact is scheduled. - Preparing { waiting_for_response: Vec }, + Preparing { + /// List of result senders that are waiting for a response. + waiting_for_response: Vec, + /// The number of times this artifact has failed to prepare. + num_failures: u32, + }, /// The code couldn't be compiled due to an error. Such artifacts /// never reach the executor and stay in the host's memory. - FailedToProcess(PrepareError), + FailedToProcess { + /// Keep track of the last time that processing this artifact failed. + last_time_failed: SystemTime, + /// The number of times this artifact has failed to prepare. + num_failures: u32, + /// The last error encountered for preparation. + error: PrepareError, + }, } /// A container of all known artifact ids and their states. @@ -150,7 +162,7 @@ impl Artifacts { // See the precondition. always!(self .artifacts - .insert(artifact_id, ArtifactState::Preparing { waiting_for_response }) + .insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 }) .is_none()); } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 69f2e07b56cc..5c29072da1c3 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -22,6 +22,7 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, + error::PrepareError, execute, metrics::Metrics, prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, @@ -49,6 +50,16 @@ pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60); // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360); +/// The time period after which a failed preparation artifact is considered ready to be retried. +/// Note that we will only retry if another request comes in after this cooldown has passed. +#[cfg(not(test))] +pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); +#[cfg(test)] +pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200); + +/// The amount of times we will retry failed prepare jobs. +pub const NUM_PREPARE_RETRIES: u32 = 5; + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -97,7 +108,13 @@ impl ValidationHost { result_tx: ResultSender, ) -> Result<(), String> { self.to_host_tx - .send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx }) + .send(ToHost::ExecutePvf(ExecutePvfInputs { + pvf, + execution_timeout, + params, + priority, + result_tx, + })) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -117,20 +134,17 @@ impl ValidationHost { } enum ToHost { - PrecheckPvf { - pvf: Pvf, - result_tx: PrepareResultSender, - }, - ExecutePvf { - pvf: Pvf, - execution_timeout: Duration, - params: Vec, - priority: Priority, - result_tx: ResultSender, - }, - HeadsUp { - active_pvfs: Vec, - }, + PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender }, + ExecutePvf(ExecutePvfInputs), + HeadsUp { active_pvfs: Vec }, +} + +struct ExecutePvfInputs { + pvf: Pvf, + execution_timeout: Duration, + params: Vec, + priority: Priority, + result_tx: ResultSender, } /// Configuration for the validation host. @@ -361,6 +375,8 @@ async fn run( Some(to_host) => to_host, }; + // If the artifact failed before, it could be re-scheduled for preparation here if + // the preparation failure cooldown has elapsed. break_if_fatal!(handle_to_host( &cache_path, &mut artifacts, @@ -377,9 +393,9 @@ async fn run( // Note that preparation always succeeds. // // That's because the error conditions are written into the artifact and will be - // reported at the time of the execution. It potentially, but not necessarily, - // can be scheduled as a result of this function call, in case there are pending - // executions. + // reported at the time of the execution. It potentially, but not necessarily, can + // be scheduled for execution as a result of this function call, in case there are + // pending executions. // // We could be eager in terms of reporting and plumb the result from the preparation // worker but we don't for the sake of simplicity. @@ -407,24 +423,19 @@ async fn handle_to_host( ToHost::PrecheckPvf { pvf, result_tx } => { handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; }, - ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => { + ToHost::ExecutePvf(inputs) => { handle_execute_pvf( cache_path, artifacts, prepare_queue, execute_queue, awaiting_prepare, - pvf, - execution_timeout, - params, - priority, - result_tx, + inputs, ) .await?; }, - ToHost::HeadsUp { active_pvfs } => { - handle_heads_up(artifacts, prepare_queue, active_pvfs).await?; - }, + ToHost::HeadsUp { active_pvfs } => + handle_heads_up(artifacts, prepare_queue, active_pvfs).await?, } Ok(()) @@ -432,8 +443,9 @@ async fn handle_to_host( /// Handles PVF prechecking requests. /// -/// This tries to prepare the PVF by compiling the WASM blob within a given timeout -/// ([`PRECHECK_PREPARATION_TIMEOUT`]). +/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). +/// +/// If the prepare job failed previously, we may retry it under certain conditions. async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, @@ -448,10 +460,12 @@ async fn handle_precheck_pvf( *last_time_needed = SystemTime::now(); let _ = result_sender.send(Ok(())); }, - ArtifactState::Preparing { waiting_for_response } => + ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), - ArtifactState::FailedToProcess(result) => { - let _ = result_sender.send(PrepareResult::Err(result.clone())); + ArtifactState::FailedToProcess { error, .. } => { + // Do not retry failed preparation if another pre-check request comes in. We do not retry pre-checking, + // anyway. + let _ = result_sender.send(PrepareResult::Err(error.clone())); }, } } else { @@ -471,22 +485,22 @@ async fn handle_precheck_pvf( /// Handles PVF execution. /// -/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there -/// is already a preparation job, we coalesce the two preparation jobs. When preparing for -/// execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`]) than when -/// prechecking. +/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a +/// preparation job, we coalesce the two preparation jobs. +/// +/// If the prepare job failed previously, we may retry it under certain conditions. +/// +/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`]) +/// than when prechecking. async fn handle_execute_pvf( cache_path: &Path, artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, - pvf: Pvf, - execution_timeout: Duration, - params: Vec, - priority: Priority, - result_tx: ResultSender, + inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { + let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs; let artifact_id = pvf.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { @@ -494,6 +508,7 @@ async fn handle_execute_pvf( ArtifactState::Prepared { last_time_needed } => { *last_time_needed = SystemTime::now(); + // This artifact has already been prepared, send it to the execute queue. send_execute( execute_queue, execute::ToQueue::Enqueue { @@ -505,11 +520,29 @@ async fn handle_execute_pvf( ) .await?; }, - ArtifactState::Preparing { waiting_for_response: _ } => { + ArtifactState::Preparing { .. } => { awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, - ArtifactState::FailedToProcess(error) => { - let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { + // If we are allowed to retry the failed prepare job, change the state to + // Preparing and re-queue this job. + *state = ArtifactState::Preparing { + waiting_for_response: Vec::new(), + num_failures: *num_failures, + }; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority, + pvf, + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + }, + ) + .await?; + } else { + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + } }, } } else { @@ -526,6 +559,7 @@ async fn handle_execute_pvf( ) .await?; + // Add an execution request that will wait to run after this prepare job has finished. awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } @@ -546,10 +580,28 @@ async fn handle_heads_up( ArtifactState::Prepared { last_time_needed, .. } => { *last_time_needed = now; }, - ArtifactState::Preparing { waiting_for_response: _ } => { + ArtifactState::Preparing { .. } => { // The artifact is already being prepared, so we don't need to do anything. }, - ArtifactState::FailedToProcess(_) => {}, + ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { + // If we are allowed to retry the failed prepare job, change the state to + // Preparing and re-queue this job. + *state = ArtifactState::Preparing { + waiting_for_response: vec![], + num_failures: *num_failures, + }; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority: Priority::Normal, + pvf: active_pvf, + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + }, + ) + .await?; + } + }, } } else { // It's not in the artifacts, so we need to enqueue a job to prepare it. @@ -599,20 +651,26 @@ async fn handle_prepare_done( never!("the artifact is already prepared: {:?}", artifact_id); return Ok(()) }, - Some(ArtifactState::FailedToProcess(_)) => { + Some(ArtifactState::FailedToProcess { .. }) => { // The reasoning is similar to the above, the artifact cannot be // processed at this point. never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); return Ok(()) }, - Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state, + Some(state @ ArtifactState::Preparing { .. }) => state, }; - if let ArtifactState::Preparing { waiting_for_response } = state { + let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } = + state + { for result_sender in waiting_for_response.drain(..) { let _ = result_sender.send(result.clone()); } - } + num_failures + } else { + never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed"); + return Ok(()) + }; // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. @@ -644,7 +702,11 @@ async fn handle_prepare_done( *state = match result { Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, - Err(error) => ArtifactState::FailedToProcess(error.clone()), + Err(error) => ArtifactState::FailedToProcess { + last_time_failed: SystemTime::now(), + num_failures: *num_failures + 1, + error: error.clone(), + }, }; Ok(()) @@ -707,6 +769,24 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver) { } } +/// Check if the conditions to retry a prepare job have been met. +fn can_retry_prepare_after_failure( + last_time_failed: SystemTime, + num_failures: u32, + error: &PrepareError, +) -> bool { + use PrepareError::*; + match error { + // Gracefully returned an error, so it will probably be reproducible. Don't retry. + Prevalidation(_) | Preparation(_) => false, + // Retry if the retry cooldown has elapsed and if we have already retried less than + // `NUM_PREPARE_RETRIES` times. + Panic(_) | TimedOut | DidNotMakeIt => + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + num_failures <= NUM_PREPARE_RETRIES, + } +} + /// A stream that yields a pulse continuously at a given interval. fn pulse_every(interval: std::time::Duration) -> impl futures::Stream { futures::stream::unfold(interval, { @@ -834,6 +914,25 @@ mod tests { .await } + async fn poll_ensure_to_prepare_queue_is_empty(&mut self) { + use futures_timer::Delay; + + let to_prepare_queue_rx = &mut self.to_prepare_queue_rx; + run_until( + &mut self.run, + async { + futures::select! { + _ = Delay::new(Duration::from_millis(500)).fuse() => (), + _ = to_prepare_queue_rx.next().fuse() => { + panic!("the prepare queue is supposed to be empty") + } + } + } + .boxed(), + ) + .await + } + async fn poll_ensure_to_execute_queue_is_empty(&mut self) { use futures_timer::Delay; @@ -844,7 +943,7 @@ mod tests { futures::select! { _ = Delay::new(Duration::from_millis(500)).fuse() => (), _ = to_execute_queue_rx.next().fuse() => { - panic!("the execute queue supposed to be empty") + panic!("the execute queue is supposed to be empty") } } } @@ -1168,6 +1267,228 @@ mod tests { } } + // Test that multiple prechecking requests do not trigger preparation retries if the first one + // failed. + #[async_std::test] + async fn test_precheck_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a precheck request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another precheck request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; + + // Submit another precheck request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap(); + + // Assert the prepare queue is empty - we do not retry for precheck requests. + test.poll_ensure_to_prepare_queue_is_empty().await; + } + + // Test that multiple execution requests trigger preparation retries if the first one failed due + // to a potentially non-reproducible error. + #[async_std::test] + async fn test_execute_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a execute request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another execute request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_2, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; + + // Submit another execute request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_3, + ) + .await + .unwrap(); + + // Assert the prepare queue contains the request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + } + + // Test that multiple execution requests don't trigger preparation retries if the first one + // failed due to reproducible error (e.g. Prevalidation). + #[async_std::test] + async fn test_execute_prepare_no_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a execute request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::Prevalidation("reproducible error".into())), + }) + .await + .unwrap(); + + // Submit another execute request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_2, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; + + // Submit another execute request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_3, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty - we do not retry for prevalidation errors. + test.poll_ensure_to_prepare_queue_is_empty().await; + } + + // Test that multiple heads-up requests trigger preparation retries if the first one failed. + #[async_std::test] + async fn test_heads_up_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a heads-up request that fails. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another heads-up request. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; + + // Submit another heads-up request. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // Assert the prepare queue contains the request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + } + #[async_std::test] async fn cancellation() { let mut test = Builder::default().build();