diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index b41ebd3c425b..6b51b8cc1351 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -550,6 +550,10 @@ async fn handle_execute_pvf( }, ) .await?; + + // Add an execution request that will wait to run after this prepare job has + // finished. + awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } else { let _ = result_tx.send(Err(ValidationError::from(error.clone()))); } @@ -931,6 +935,13 @@ mod tests { ValidationHost { to_host_tx } } + async fn poll_and_recv_result(&mut self, result_rx: oneshot::Receiver) -> T + where + T: Send, + { + run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await + } + async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue { let to_prepare_queue_rx = &mut self.to_prepare_queue_rx; run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed()) @@ -991,7 +1002,7 @@ mod tests { futures::select! { _ = Delay::new(Duration::from_millis(500)).fuse() => (), msg = to_sweeper_rx.next().fuse() => { - panic!("the sweeper supposed to be empty, but received: {:?}", msg) + panic!("the sweeper is supposed to be empty, but received: {:?}", msg) } } } @@ -1311,12 +1322,12 @@ mod tests { // Test that multiple prechecking requests do not trigger preparation retries if the first one // failed. #[tokio::test] - async fn test_precheck_prepare_retry() { + async fn test_precheck_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); // Submit a precheck request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + let (result_tx, result_rx) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); // The queue received the prepare request. @@ -1333,22 +1344,34 @@ mod tests { .await .unwrap(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!(result, Err(PrepareError::TimedOut)); + // Submit another precheck request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + let (result_tx_2, result_rx_2) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap(); // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!(result, Err(PrepareError::TimedOut)); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another precheck request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap(); // Assert the prepare queue is empty - we do not retry for precheck requests. test.poll_ensure_to_prepare_queue_is_empty().await; + + // The result should still contain the original error. + let result = test.poll_and_recv_result(result_rx_3).await; + assert_matches!(result, Err(PrepareError::TimedOut)); } // Test that multiple execution requests trigger preparation retries if the first one failed due @@ -1359,7 +1382,7 @@ mod tests { let mut host = test.host_handle(); // Submit a execute request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1384,8 +1407,12 @@ mod tests { .await .unwrap(); - // Submit another execute request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!(result, Err(ValidationError::InternalError(_))); + + // Submit another execute request. We shouldn't try to prepare again, yet. + let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1399,11 +1426,15 @@ mod tests { // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!(result, Err(ValidationError::InternalError(_))); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1419,6 +1450,30 @@ mod tests { test.poll_and_recv_to_prepare_queue().await, prepare::ToQueue::Enqueue { .. } ); + + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Ok(Duration::default()), + }) + .await + .unwrap(); + + // Preparation should have been retried and succeeded this time. + let result_tx_3 = assert_matches!( + test.poll_and_recv_to_execute_queue().await, + execute::ToQueue::Enqueue { result_tx, .. } => result_tx + ); + + // Send an error for the execution here, just so we can check the result receiver is still + // alive. + result_tx_3 + .send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))) + .unwrap(); + assert_matches!( + result_rx_3.now_or_never().unwrap().unwrap(), + Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)) + ); } // Test that multiple execution requests don't trigger preparation retries if the first one @@ -1428,8 +1483,8 @@ mod tests { let mut test = Builder::default().build(); let mut host = test.host_handle(); - // Submit a execute request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + // Submit an execute request that fails. + let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1454,8 +1509,15 @@ mod tests { .await .unwrap(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); + // Submit another execute request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1469,11 +1531,18 @@ mod tests { // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1486,6 +1555,13 @@ mod tests { // Assert the prepare queue is empty - we do not retry for prevalidation errors. test.poll_ensure_to_prepare_queue_is_empty().await; + + // The result should still contain the original error. + let result = test.poll_and_recv_result(result_rx_3).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); } // Test that multiple heads-up requests trigger preparation retries if the first one failed.