Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pvf: Fix missing execution request when retrying preparation #6537

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 89 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,10 @@ async fn handle_execute_pvf(
},
)
.await?;

// Add an execution request that will wait to run after this prepare job has
// finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
Expand Down Expand Up @@ -931,6 +935,13 @@ mod tests {
ValidationHost { to_host_tx }
}

async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
where
T: Send,
{
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
}

async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
Expand Down Expand Up @@ -991,7 +1002,7 @@ mod tests {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
msg = to_sweeper_rx.next().fuse() => {
panic!("the sweeper supposed to be empty, but received: {:?}", msg)
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
}
}
}
Expand Down Expand Up @@ -1311,12 +1322,12 @@ mod tests {
// Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed.
#[tokio::test]
async fn test_precheck_prepare_retry() {
async fn test_precheck_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a precheck request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();

// The queue received the prepare request.
Expand All @@ -1333,22 +1344,34 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Submit another precheck request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();

// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another precheck request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();

// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(PrepareError::TimedOut));
}

// Test that multiple execution requests trigger preparation retries if the first one failed due
Expand All @@ -1359,7 +1382,7 @@ mod tests {
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1384,8 +1407,12 @@ mod tests {
.await
.unwrap();

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1399,11 +1426,15 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1419,6 +1450,30 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);

test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();

// Preparation should have been retried and succeeded this time.
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
);

// Send an error for the execution here, just so we can check the result receiver is still
// alive.
result_tx_3
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_3.now_or_never().unwrap().unwrap(),
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
}

// Test that multiple execution requests don't trigger preparation retries if the first one
Expand All @@ -1428,8 +1483,8 @@ mod tests {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
// Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1454,8 +1509,15 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1469,11 +1531,18 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1486,6 +1555,13 @@ mod tests {

// Assert the prepare queue is empty - we do not retry for prevalidation errors.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
}

// Test that multiple heads-up requests trigger preparation retries if the first one failed.
Expand Down