Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(turborepo): --continue=dependencies-successful #10023

Merged
merged 10 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions crates/turborepo-graph-utils/src/walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use tracing::log::trace;
pub struct Walker<N, S> {
marker: std::marker::PhantomData<S>,
cancel: watch::Sender<bool>,
node_events: Option<mpsc::Receiver<(N, oneshot::Sender<()>)>>,
node_events: Option<mpsc::Receiver<(N, oneshot::Sender<bool>)>>,
join_handles: FuturesUnordered<JoinHandle<()>>,
}

pub struct Start;
pub struct Walking;

pub type WalkMessage<N> = (N, oneshot::Sender<()>);
pub type WalkMessage<N> = (N, oneshot::Sender<bool>);

// These constraint might look very stiff, but since all of the petgraph graph
// types use integers as node ids and GraphBase already constraints these types
Expand All @@ -37,7 +37,7 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
let mut rxs = HashMap::new();
for node in graph.node_identifiers() {
// Each node can finish at most once so we set the capacity to 1
let (tx, rx) = broadcast::channel::<()>(1);
let (tx, rx) = broadcast::channel::<bool>(1);
txs.insert(node, tx);
rxs.insert(node, rx);
}
Expand Down Expand Up @@ -76,8 +76,14 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
results = deps_fut => {
for res in results {
match res {
// No errors from reading dependency channels
Ok(()) => (),
// Dependency channel signaled this subgraph is terminal;
// let our dependents know too (if any)
Ok(false) => {
tx.send(false).ok();
return;
}
// Otherwise continue
Ok(true) => (),
// A dependency finished without sending a finish
// Could happen if a cancel is sent and is racing with deps
// so we interpret this as a cancel.
Expand All @@ -95,7 +101,7 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
}
}

let (callback_tx, callback_rx) = oneshot::channel::<()>();
let (callback_tx, callback_rx) = oneshot::channel::<bool>();
// do some err handling with the send failure?
if node_tx.send((node, callback_tx)).await.is_err() {
// Receiving end of node channel has been closed/dropped
Expand All @@ -104,14 +110,15 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
trace!("Receiver was dropped before walk finished without calling cancel");
return;
}
if callback_rx.await.is_err() {
let Ok(callback_result) = callback_rx.await else {
// If the caller drops the callback sender without signaling
// that the node processing is finished we assume that it is finished.
trace!("Callback sender was dropped without sending a finish signal")
}
trace!("Callback sender was dropped without sending a finish signal");
return;
};
// Send errors indicate that there are no receivers which
// happens when this node has no dependents
tx.send(()).ok();
tx.send(callback_result).ok();
}
}
}));
Expand Down Expand Up @@ -204,7 +211,7 @@ mod test {
let (walker, mut node_emitter) = walker.walk();
while let Some((index, done)) = node_emitter.recv().await {
visited.push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
walker.wait().await.unwrap();
assert_eq!(visited, vec![c, b, a]);
Expand All @@ -228,7 +235,7 @@ mod test {
walker.cancel().unwrap();

visited.push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
assert_eq!(visited, vec![c]);
let Walker { join_handles, .. } = walker;
Expand Down Expand Up @@ -272,16 +279,16 @@ mod test {
tokio::spawn(async move {
is_b_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == b {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
b_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
Expand Down Expand Up @@ -322,12 +329,12 @@ mod test {
tokio::spawn(async move {
is_b_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == b {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
b_done.take().unwrap().send(()).unwrap();
} else if index == a {
// don't mark as done until d finishes
Expand All @@ -336,19 +343,69 @@ mod test {
tokio::spawn(async move {
is_d_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == d {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
d_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
assert_eq!(visited.lock().unwrap().as_slice(), &[c, b, e, d, a]);
}

#[tokio::test]
async fn test_dependent_cancellation() {
// a -- b -- c -- f
// \ /
// - d -- e -
let mut g = Graph::new();
let a = g.add_node("a");
let b = g.add_node("b");
let c = g.add_node("c");
let d = g.add_node("d");
let e = g.add_node("e");
let f = g.add_node("f");
g.add_edge(a, b, ());
g.add_edge(b, c, ());
g.add_edge(a, d, ());
g.add_edge(d, e, ());
g.add_edge(c, f, ());
g.add_edge(e, f, ());

// We intentionally wait to mark c as finished until e has been finished
let walker = Walker::new(&g);
let visited = Arc::new(Mutex::new(Vec::new()));
let (walker, mut node_emitter) = walker.walk();
let (e_done, is_e_done) = oneshot::channel::<()>();
let mut e_done = Some(e_done);
let mut is_e_done = Some(is_e_done);
while let Some((index, done)) = node_emitter.recv().await {
if index == c {
// don't mark as done until we get the signal that e is finished
let is_e_done = is_e_done.take().unwrap();
let visited = visited.clone();
tokio::spawn(async move {
is_e_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(true).unwrap();
});
} else if index == e {
// send the signal that e is finished, and cancel its dependents
visited.lock().unwrap().push(index);
done.send(false).unwrap();
e_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
assert_eq!(visited.lock().unwrap().as_slice(), &[f, e, c, b]);
}
}
62 changes: 54 additions & 8 deletions crates/turborepo-lib/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ impl fmt::Display for EnvMode {
}
}

#[derive(Copy, Clone, Debug, Default, PartialEq, ValueEnum, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ContinueMode {
#[default]
Never,
DependenciesSuccessful,
Always,
}

impl fmt::Display for ContinueMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ContinueMode::Never => "never",
ContinueMode::DependenciesSuccessful => "dependencies-successful",
ContinueMode::Always => "always",
})
}
}

/// The parsed arguments from the command line. In general we should avoid using
/// or mutating this directly, and instead use the fully canonicalized `Opts`
/// struct.
Expand Down Expand Up @@ -825,10 +844,13 @@ pub struct ExecutionArgs {
/// one-at-a-time) execution.
#[clap(long)]
pub concurrency: Option<String>,
/// Continue execution even if a task exits with an error or non-zero
/// exit code. The default behavior is to bail
#[clap(long = "continue")]
pub continue_execution: bool,
/// Specify how task execution should proceed when an error occurs.
/// Use "never" to cancel all tasks. Use "dependencies-successful" to
/// continue running tasks whose dependencies have succeeded. Use "always"
/// to continue running all tasks, even those whose dependencies have
/// failed.
#[clap(long = "continue", value_name = "CONTINUE", num_args = 0..=1, default_value = "never", default_missing_value = "always")]
pub continue_execution: ContinueMode,
/// Run turbo in single-package mode
#[clap(long)]
pub single_package: bool,
Expand Down Expand Up @@ -896,7 +918,16 @@ impl ExecutionArgs {
// default to false
track_usage!(telemetry, self.framework_inference, |val: bool| !val);

track_usage!(telemetry, self.continue_execution, |val| val);
track_usage!(telemetry, self.continue_execution, |val| matches!(
val,
ContinueMode::Always | ContinueMode::DependenciesSuccessful
));
telemetry.track_arg_value(
"continue-execution-strategy",
self.continue_execution,
EventType::NonSensitive,
);

track_usage!(telemetry, self.single_package, |val| val);
track_usage!(telemetry, self.only, |val| val);
track_usage!(telemetry, &self.cache_dir, Option::is_some);
Expand Down Expand Up @@ -1653,7 +1684,7 @@ mod test {
use itertools::Itertools;
use pretty_assertions::assert_eq;

use crate::cli::{ExecutionArgs, LinkTarget, RunArgs};
use crate::cli::{ContinueMode, ExecutionArgs, LinkTarget, RunArgs};

struct CommandTestCase {
command: &'static str,
Expand Down Expand Up @@ -1898,14 +1929,29 @@ mod test {
command: Some(Command::Run {
execution_args: Box::new(ExecutionArgs {
tasks: vec!["build".to_string()],
continue_execution: true,
continue_execution: ContinueMode::Always,
..get_default_execution_args()
}),
run_args: Box::new(get_default_run_args())
}),
..Args::default()
} ;
"continue option with no value"
)]
#[test_case::test_case(
&["turbo", "run", "build", "--continue=dependencies-successful"],
Args {
command: Some(Command::Run {
execution_args: Box::new(ExecutionArgs {
tasks: vec!["build".to_string()],
continue_execution: ContinueMode::DependenciesSuccessful,
..get_default_execution_args()
}),
run_args: Box::new(get_default_run_args())
}),
..Args::default()
} ;
"continue flag"
"continue option with explicit value"
)]
#[test_case::test_case(
&["turbo", "run", "build", "--dry-run"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ error: unexpected argument '--no-daemon' found
tip: a similar argument exists: '--no-update-notifier'
tip: to pass '--no-daemon' as a value, use '-- --no-daemon'

Usage: turbo watch --no-update-notifier <--cache-dir <CACHE_DIR>|--concurrency <CONCURRENCY>|--continue|--single-package|--framework-inference [<BOOL>]|--global-deps <GLOBAL_DEPS>|--env-mode [<ENV_MODE>]|--filter <FILTER>|--affected|--output-logs <OUTPUT_LOGS>|--log-order <LOG_ORDER>|--only|--pkg-inference-root <PKG_INFERENCE_ROOT>|--log-prefix <LOG_PREFIX>|TASKS|PASS_THROUGH_ARGS>
Usage: turbo watch --no-update-notifier <--cache-dir <CACHE_DIR>|--concurrency <CONCURRENCY>|--continue [<CONTINUE>]|--single-package|--framework-inference [<BOOL>]|--global-deps <GLOBAL_DEPS>|--env-mode [<ENV_MODE>]|--filter <FILTER>|--affected|--output-logs <OUTPUT_LOGS>|--log-order <LOG_ORDER>|--only|--pkg-inference-root <PKG_INFERENCE_ROOT>|--log-prefix <LOG_PREFIX>|TASKS|PASS_THROUGH_ARGS>

For more information, try '--help'.
34 changes: 22 additions & 12 deletions crates/turborepo-lib/src/engine/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ impl From<mpsc::error::SendError<Message<VisitorData, VisitorResult>>> for Execu
}

#[derive(Debug, Clone, Copy)]
pub struct StopExecution;
pub enum StopExecution {
AllTasks,
DependentTasks,
}

impl Engine {
/// Execute a task graph by sending task ids to the visitor
Expand Down Expand Up @@ -93,7 +96,7 @@ impl Engine {
.expect("node id should be present")
else {
// Root task has nothing to do so we don't emit any event for it
if done.send(()).is_err() {
if done.send(true).is_err() {
debug!(
"Graph walker done callback receiver was closed before done signal \
could be sent"
Expand All @@ -114,23 +117,30 @@ impl Engine {
let (message, result) = Message::new(task_id.clone());
visitor.send(message).await?;

if let Err(StopExecution) = result.await.unwrap_or_else(|_| {
let mut continue_walking_subgraph = true;
match result.await.unwrap_or_else(|_| {
// If the visitor doesn't send a callback, then we assume the task finished
tracing::trace!(
"Engine visitor dropped callback sender without sending result"
);
Ok(())
}) {
if walker
.lock()
.expect("Walker mutex poisoned")
.cancel()
.is_err()
{
debug!("Unable to cancel graph walk");
Err(StopExecution::AllTasks) => {
if walker
.lock()
.expect("Walker mutex poisoned")
.cancel()
.is_err()
{
debug!("Unable to cancel graph walk");
}
}
}
if done.send(()).is_err() {
Err(StopExecution::DependentTasks) => {
continue_walking_subgraph = false;
}
_ => (),
};
if done.send(continue_walking_subgraph).is_err() {
debug!("Graph walk done receiver closed before node was finished processing");
}
Ok(())
Expand Down
Loading