Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(turborepo): --continue=dependencies-successful #10023

Merged
merged 10 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions crates/turborepo-graph-utils/src/walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use tracing::log::trace;
pub struct Walker<N, S> {
marker: std::marker::PhantomData<S>,
cancel: watch::Sender<bool>,
node_events: Option<mpsc::Receiver<(N, oneshot::Sender<()>)>>,
node_events: Option<mpsc::Receiver<(N, oneshot::Sender<bool>)>>,
join_handles: FuturesUnordered<JoinHandle<()>>,
}

pub struct Start;
pub struct Walking;

pub type WalkMessage<N> = (N, oneshot::Sender<()>);
pub type WalkMessage<N> = (N, oneshot::Sender<bool>);

// These constraint might look very stiff, but since all of the petgraph graph
// types use integers as node ids and GraphBase already constraints these types
Expand All @@ -37,7 +37,7 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
let mut rxs = HashMap::new();
for node in graph.node_identifiers() {
// Each node can finish at most once so we set the capacity to 1
let (tx, rx) = broadcast::channel::<()>(1);
let (tx, rx) = broadcast::channel::<bool>(1);
txs.insert(node, tx);
rxs.insert(node, rx);
}
Expand Down Expand Up @@ -76,8 +76,14 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
results = deps_fut => {
for res in results {
match res {
// No errors from reading dependency channels
Ok(()) => (),
// Dependency channel signaled this subgraph is terminal;
// let our dependents know too (if any)
Ok(false) => {
tx.send(false).ok();
return;
}
// Otherwise continue
Ok(true) => (),
// A dependency finished without sending a finish
// Could happen if a cancel is sent and is racing with deps
// so we interpret this as a cancel.
Expand All @@ -95,7 +101,7 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
}
}

let (callback_tx, callback_rx) = oneshot::channel::<()>();
let (callback_tx, callback_rx) = oneshot::channel::<bool>();
// do some err handling with the send failure?
if node_tx.send((node, callback_tx)).await.is_err() {
// Receiving end of node channel has been closed/dropped
Expand All @@ -104,14 +110,15 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
trace!("Receiver was dropped before walk finished without calling cancel");
return;
}
if callback_rx.await.is_err() {
let callback_result = callback_rx.await;
if callback_result.is_err() {
// If the caller drops the callback sender without signaling
// that the node processing is finished we assume that it is finished.
trace!("Callback sender was dropped without sending a finish signal")
}
// Send errors indicate that there are no receivers which
// happens when this node has no dependents
tx.send(()).ok();
tx.send(matches!(callback_result, Ok(true))).ok();
}
}
}));
Expand Down Expand Up @@ -204,7 +211,7 @@ mod test {
let (walker, mut node_emitter) = walker.walk();
while let Some((index, done)) = node_emitter.recv().await {
visited.push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
walker.wait().await.unwrap();
assert_eq!(visited, vec![c, b, a]);
Expand All @@ -228,7 +235,7 @@ mod test {
walker.cancel().unwrap();

visited.push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
assert_eq!(visited, vec![c]);
let Walker { join_handles, .. } = walker;
Expand Down Expand Up @@ -272,16 +279,16 @@ mod test {
tokio::spawn(async move {
is_b_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == b {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
b_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
Expand Down Expand Up @@ -322,12 +329,12 @@ mod test {
tokio::spawn(async move {
is_b_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == b {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
b_done.take().unwrap().send(()).unwrap();
} else if index == a {
// don't mark as done until d finishes
Expand All @@ -336,19 +343,69 @@ mod test {
tokio::spawn(async move {
is_d_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
});
} else if index == d {
// send the signal that b is finished
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
d_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(()).unwrap();
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
assert_eq!(visited.lock().unwrap().as_slice(), &[c, b, e, d, a]);
}

#[tokio::test]
async fn test_dependent_cancellation() {
// a -- b -- c -- f
// \ /
// - d -- e -
let mut g = Graph::new();
let a = g.add_node("a");
let b = g.add_node("b");
let c = g.add_node("c");
let d = g.add_node("d");
let e = g.add_node("e");
let f = g.add_node("f");
g.add_edge(a, b, ());
g.add_edge(b, c, ());
g.add_edge(a, d, ());
g.add_edge(d, e, ());
g.add_edge(c, f, ());
g.add_edge(e, f, ());

// We intentionally wait to mark c as finished until e has been finished
let walker = Walker::new(&g);
let visited = Arc::new(Mutex::new(Vec::new()));
let (walker, mut node_emitter) = walker.walk();
let (e_done, is_e_done) = oneshot::channel::<()>();
let mut e_done = Some(e_done);
let mut is_e_done = Some(is_e_done);
while let Some((index, done)) = node_emitter.recv().await {
if index == c {
// don't mark as done until we get the signal that e is finished
let is_e_done = is_e_done.take().unwrap();
let visited = visited.clone();
tokio::spawn(async move {
is_e_done.await.unwrap();
visited.lock().unwrap().push(index);
done.send(true).unwrap();
});
} else if index == e {
// send the signal that e is finished, and cancel its dependents
visited.lock().unwrap().push(index);
done.send(false).unwrap();
e_done.take().unwrap().send(()).unwrap();
} else {
visited.lock().unwrap().push(index);
done.send(true).unwrap();
}
}
walker.wait().await.unwrap();
assert_eq!(visited.lock().unwrap().as_slice(), &[f, e, c, b]);
}
}
132 changes: 122 additions & 10 deletions crates/turborepo-lib/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{
use biome_deserialize_macros::Deserializable;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{
builder::NonEmptyStringValueParser, ArgAction, ArgGroup, CommandFactory, Parser, Subcommand,
ValueEnum,
builder::{EnumValueParser, NonEmptyStringValueParser, PossibleValue, TypedValueParser},
ArgAction, ArgGroup, CommandFactory, Parser, Subcommand, ValueEnum,
};
use clap_complete::{generate, Shell};
pub use error::Error;
Expand Down Expand Up @@ -160,6 +160,25 @@ impl fmt::Display for EnvMode {
}
}

#[derive(Copy, Clone, Debug, Default, PartialEq, ValueEnum, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ContinueMode {
#[default]
None,
IndependentTasksOnly,
All,
}

impl fmt::Display for ContinueMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ContinueMode::None => "none",
ContinueMode::IndependentTasksOnly => "independent-tasks-only",
ContinueMode::All => "all",
})
}
}

/// The parsed arguments from the command line. In general we should avoid using
/// or mutating this directly, and instead use the fully canonicalized `Opts`
/// struct.
Expand Down Expand Up @@ -811,6 +830,63 @@ fn path_non_empty(s: &str) -> Result<Utf8PathBuf, String> {
}
}

#[derive(Copy, Clone, Debug, PartialEq)]
struct ContinueModeParser;

impl ContinueModeParser {
pub fn new() -> Self {
Self {}
}
}

// slightly redundant, but this allows us to transform deprecated values
// /before/ enum conversion, and also hide them from the help message
impl TypedValueParser for ContinueModeParser {
type Value = ContinueMode;

fn parse_ref(
&self,
cmd: &clap::Command,
arg: Option<&clap::Arg>,
value: &std::ffi::OsStr,
) -> Result<Self::Value, clap::Error> {
let result = EnumValueParser::<ContinueMode>::new().parse_ref(cmd, arg, value);
if result.is_err() {
match value.to_str() {
Some("false") => {
warn!(
"--continue=false is deprecated and will be removed in a future major \
version. Use --continue=none instead."
);
return Ok(ContinueMode::None);
}
Some("true") => {
warn!(
"--continue=true is deprecated and will be removed in a future major \
version. Use --continue=all instead."
);
return Ok(ContinueMode::All);
}
_ => return Err(result.unwrap_err()),
}
} else {
return Ok(result.unwrap());
}
}

// don't advertise deprecated values
fn possible_values(&self) -> Option<Box<dyn Iterator<Item = PossibleValue> + '_>> {
Some(Box::new(
vec![
PossibleValue::new("none"),
PossibleValue::new("independent-tasks-only"),
PossibleValue::new("all"),
]
.into_iter(),
))
}
}

/// Arguments used in run and watch
#[derive(Parser, Clone, Debug, Default, PartialEq)]
#[command(groups = [
Expand All @@ -824,10 +900,12 @@ pub struct ExecutionArgs {
/// one-at-a-time) execution.
#[clap(long)]
pub concurrency: Option<String>,
/// Continue execution even if a task exits with an error or non-zero
/// exit code. The default behavior is to bail
#[clap(long = "continue")]
pub continue_execution: bool,
/// Specify which tasks should continue running when an error occurs.
/// Use "none" to cancel all tasks.
/// Use "independent-tasks-only" to continue running independent tasks and
/// cancel dependent ones. Use "all" to continue running all tasks.
#[clap(long = "continue", value_name = "CONTINUE", num_args = 0..=1, default_value = "none", default_missing_value = "all", value_parser = ContinueModeParser::new())]
pub continue_execution: ContinueMode,
/// Run turbo in single-package mode
#[clap(long)]
pub single_package: bool,
Expand Down Expand Up @@ -895,7 +973,11 @@ impl ExecutionArgs {
// default to false
track_usage!(telemetry, self.framework_inference, |val: bool| !val);

track_usage!(telemetry, self.continue_execution, |val| val);
telemetry.track_arg_value(
"continue-execution",
self.continue_execution,
EventType::NonSensitive,
);
track_usage!(telemetry, self.single_package, |val| val);
track_usage!(telemetry, self.only, |val| val);
track_usage!(telemetry, &self.cache_dir, Option::is_some);
Expand Down Expand Up @@ -1644,7 +1726,7 @@ mod test {
use itertools::Itertools;
use pretty_assertions::assert_eq;

use crate::cli::{ExecutionArgs, LinkTarget, RunArgs};
use crate::cli::{ContinueMode, ExecutionArgs, LinkTarget, RunArgs};

struct CommandTestCase {
command: &'static str,
Expand Down Expand Up @@ -1889,14 +1971,44 @@ mod test {
command: Some(Command::Run {
execution_args: Box::new(ExecutionArgs {
tasks: vec!["build".to_string()],
continue_execution: true,
continue_execution: ContinueMode::All,
..get_default_execution_args()
}),
run_args: Box::new(get_default_run_args())
}),
..Args::default()
} ;
"continue option with no value"
)]
#[test_case::test_case(
&["turbo", "run", "build", "--continue=independent-tasks-only"],
Args {
command: Some(Command::Run {
execution_args: Box::new(ExecutionArgs {
tasks: vec!["build".to_string()],
continue_execution: ContinueMode::IndependentTasksOnly,
..get_default_execution_args()
}),
run_args: Box::new(get_default_run_args())
}),
..Args::default()
} ;
"continue option with explicit value"
)]
#[test_case::test_case(
&["turbo", "run", "build", "--continue=true"],
Args {
command: Some(Command::Run {
execution_args: Box::new(ExecutionArgs {
tasks: vec!["build".to_string()],
continue_execution: ContinueMode::All,
..get_default_execution_args()
}),
run_args: Box::new(get_default_run_args())
}),
..Args::default()
} ;
"continue flag"
"continue option with deprecated value"
)]
#[test_case::test_case(
&["turbo", "run", "build", "--dry-run"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ error: unexpected argument '--no-daemon' found
tip: a similar argument exists: '--no-update-notifier'
tip: to pass '--no-daemon' as a value, use '-- --no-daemon'

Usage: turbo watch --no-update-notifier <--cache-dir <CACHE_DIR>|--concurrency <CONCURRENCY>|--continue|--single-package|--framework-inference [<BOOL>]|--global-deps <GLOBAL_DEPS>|--env-mode [<ENV_MODE>]|--filter <FILTER>|--affected|--output-logs <OUTPUT_LOGS>|--log-order <LOG_ORDER>|--only|--pkg-inference-root <PKG_INFERENCE_ROOT>|--log-prefix <LOG_PREFIX>|TASKS|PASS_THROUGH_ARGS>
Usage: turbo watch --no-update-notifier <--cache-dir <CACHE_DIR>|--concurrency <CONCURRENCY>|--continue [<CONTINUE>]|--single-package|--framework-inference [<BOOL>]|--global-deps <GLOBAL_DEPS>|--env-mode [<ENV_MODE>]|--filter <FILTER>|--affected|--output-logs <OUTPUT_LOGS>|--log-order <LOG_ORDER>|--only|--pkg-inference-root <PKG_INFERENCE_ROOT>|--log-prefix <LOG_PREFIX>|TASKS|PASS_THROUGH_ARGS>

For more information, try '--help'.
Loading