Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

macros: make select budget-aware #7111

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ doc! {macro_rules! select {
// supplied.
let start = $start;

// Return `Pending` when the task budget is depleted since budget-aware futures
// are going to yield anyway and other futures will not cooperate.
::std::task::ready!($crate::macros::support::poll_budget_available(cx));

for i in 0..BRANCHES {
let branch;
#[allow(clippy::modulo_one)]
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/macros/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ cfg_macros! {
pub fn thread_rng_n(n: u32) -> u32 {
crate::runtime::context::thread_rng_n(n)
}

pub fn poll_budget_available(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
#[cfg(feature = "rt")]
{ crate::task::poll_budget_available(cx) }
#[cfg(not(feature = "rt"))]
{
// Use the `cx` argument to suppress unused variable warnings
let _ = cx;
std::task::Poll::Ready(())
}
}
}

pub use std::future::{Future, IntoFuture};
Expand Down
20 changes: 19 additions & 1 deletion tokio/src/task/consume_budget.rs → tokio/src/task/budget.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::task::{ready, Poll};
use std::task::{ready, Context, Poll};

/// Consumes a unit of budget and returns the execution back to the Tokio
/// runtime *if* the task's coop budget was exhausted.
Expand Down Expand Up @@ -39,3 +39,21 @@ pub async fn consume_budget() {
})
.await
}

/// Polls to see if any budget is available or not.
///
/// See also the usage example in the [task module](index.html#poll_budget_available).
///
/// This method returns:
/// - `Poll::Pending` if the budget is depleted
/// - `Poll::Ready(())` if there is still budget left
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
ready!(crate::trace::trace_leaf(cx));
if crate::runtime::coop::has_budget_remaining() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
88 changes: 86 additions & 2 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,94 @@
//! # }
//! ```
//!
//! #### `poll_budget_available`
//!
//! Futures created by the tokio library functions are budget-aware and yield when there is no more
//! budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner
//! futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures
//! were budget-aware, at some point the budget would be depleted which would cause both futures to
//! return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way
//! back to the runtime, the budget would be reset, and `B` and `C` could make progress again.
//! Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be
//! depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return
//! `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will cause the budget
//! to stay depleted and `B` not making progress.
//!
//! In these scenarios you could use [`task::poll_budget_available`] to check whether the budget has been depleted
//! or not and act accordingly:
//! ```
//! # use std::future::{poll_fn, Future};
//! # use std::task::{ready, Context, Poll};
//! # use std::pin::{pin, Pin};
//! # use std::sync::atomic::{AtomicBool, Ordering};
//! # use tokio::task::{consume_budget, poll_budget_available};
//! #
//! # #[tokio::main]
//! # async fn main() {
//! struct Greedy;
//! struct Aware;
//! struct Combined {
//! greedy: Greedy,
//! aware: Aware,
//! }
//!
//! impl Future for Greedy {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
//! Poll::Ready(())
//! }
//! }
//!
//! impl Future for Aware {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
//! pin!(consume_budget()).poll(cx)
//! }
//! }
//!
//! impl Future for Combined {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
//! let this = Pin::into_inner(self);
//!
//! ready!(poll_budget_available(cx));
//!
//! if Pin::new(&mut this.aware).poll(cx).is_ready() {
//! return Poll::Ready(());
//! } else {
//! return Pin::new(&mut this.greedy).poll(cx);
//! }
//! }
//! }
//!
//! let did_yield = AtomicBool::new(false);
//!
//! while !did_yield.load(Ordering::Relaxed) {
//! poll_fn(|cx| {
//! let combined = pin!(Combined {
//! greedy: Greedy,
//! aware: Aware,
//! });
//!
//! if combined.poll(cx).is_pending() {
//! did_yield.store(true, Ordering::Relaxed);
//! }
//!
//! Poll::Ready(())
//! })
//! .await;
//! }
//! # }
//!```
//!
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
//! [`task::block_in_place`]: crate::task::block_in_place
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
//! [`task::yield_now`]: crate::task::yield_now()
//! [`task::poll_budget_available`]: crate::task::poll_budget_available()
//! [`thread::yield_now`]: std::thread::yield_now
//! [`task::unconstrained`]: crate::task::unconstrained()
//! [`poll`]: method@std::future::Future::poll
Expand All @@ -337,8 +421,8 @@ cfg_rt! {
mod yield_now;
pub use yield_now::yield_now;

mod consume_budget;
pub use consume_budget::consume_budget;
mod budget;
pub use budget::{consume_budget, poll_budget_available};

mod local;
pub use local::{spawn_local, LocalSet, LocalEnterGuard};
Expand Down
28 changes: 28 additions & 0 deletions tokio/tests/coop_budget.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::future::poll_fn;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use tokio::net::UdpSocket;
use tokio::task::{consume_budget, poll_budget_available};

/// Ensure that UDP sockets have functional budgeting
///
Expand Down Expand Up @@ -76,3 +79,28 @@ async fn coop_budget_udp_send_recv() {

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_poll_budget_available() {
const BUDGET: usize = 128;

// At the begining budget should be available
poll_fn(|cx| {
assert!(poll_budget_available(cx).is_ready());

Poll::Ready(())
})
.await;

// Deplete the budget
for _ in 0..BUDGET {
consume_budget().await;
}

poll_fn(|cx| {
assert!(poll_budget_available(cx).is_pending());

Poll::Ready(())
})
.await;
}
38 changes: 37 additions & 1 deletion tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
use tokio::test as maybe_tokio_test;

use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tokio_test::{assert_ok, assert_pending, assert_ready};

use std::future::poll_fn;
Expand Down Expand Up @@ -630,6 +630,42 @@ async fn mut_ref_patterns() {
};
}

#[tokio::test]
async fn select_is_budget_aware() {
const BUDGET: usize = 128;

let message_sent = BUDGET + 1;
let (sender, mut receiver) = mpsc::channel(message_sent);

let jh = tokio::spawn(async move {
for n in 0usize..message_sent {
sender.send(n).await.unwrap();
}
});

// Wait for all messages to be sent
jh.await.unwrap();

let mut message_received = 0;
loop {
tokio::select! {
// Poll the `receiver` first in order to deplete the budget
biased;

msg = receiver.recv() => {
if msg.is_some() {
message_received += 1
} else {
break;
}
},
() = std::future::ready(()) => {}
}
}

assert_eq!(message_sent, message_received);
}

#[cfg(tokio_unstable)]
mod unstable {
use tokio::runtime::RngSeed;
Expand Down
Loading