From 6bb352cd68dba27f3024b7b671fc82fd09af4353 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 05:09:51 -0800 Subject: [PATCH 01/19] Consolidate together Bevy's TaskPools --- crates/bevy_asset/src/processor/mod.rs | 8 +- crates/bevy_asset/src/server/loaders.rs | 4 +- crates/bevy_asset/src/server/mod.rs | 10 +- crates/bevy_core/src/lib.rs | 21 +--- crates/bevy_core/src/task_pool_options.rs | 117 ++--------------- crates/bevy_gltf/src/loader.rs | 4 +- crates/bevy_render/src/lib.rs | 2 +- .../src/render_resource/pipeline_cache.rs | 17 ++- .../bevy_render/src/view/window/screenshot.rs | 14 +-- crates/bevy_tasks/Cargo.toml | 1 + crates/bevy_tasks/README.md | 11 +- crates/bevy_tasks/src/lib.rs | 4 +- crates/bevy_tasks/src/task_pool.rs | 37 +++++- crates/bevy_tasks/src/usages.rs | 119 ++++++------------ examples/async_tasks/async_compute.rs | 10 +- examples/scene/scene.rs | 4 +- 16 files changed, 123 insertions(+), 260 deletions(-) diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index b5ef275103ac3..6700532e857f0 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -18,7 +18,7 @@ use crate::{ }; use bevy_ecs::prelude::*; use bevy_log::{debug, error, trace, warn}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::{BoxedFuture, HashMap, HashSet}; use futures_io::ErrorKind; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; @@ -165,7 +165,7 @@ impl AssetProcessor { pub fn process_assets(&self) { let start_time = std::time::Instant::now(); debug!("Processing Assets"); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.initialize().await.unwrap(); for source in self.sources().iter_processed() { @@ -315,7 +315,7 @@ impl AssetProcessor { #[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] error!("AddFolder event cannot be handled in single threaded mode (or WASM) yet."); #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.process_assets_internal(scope, source, path) .await @@ -457,7 +457,7 @@ impl AssetProcessor { loop { let mut check_reprocess_queue = std::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { for path in check_reprocess_queue.drain(..) { let processor = self.clone(); let source = self.get_source(path.source()).unwrap(); diff --git a/crates/bevy_asset/src/server/loaders.rs b/crates/bevy_asset/src/server/loaders.rs index 671064b31aa81..f4a143658d3a2 100644 --- a/crates/bevy_asset/src/server/loaders.rs +++ b/crates/bevy_asset/src/server/loaders.rs @@ -4,7 +4,7 @@ use crate::{ }; use async_broadcast::RecvError; use bevy_log::{error, warn}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::{HashMap, TypeIdMap}; use std::{any::TypeId, sync::Arc}; use thiserror::Error; @@ -78,7 +78,7 @@ impl AssetLoaders { match maybe_loader { MaybeAssetLoader::Ready(_) => unreachable!(), MaybeAssetLoader::Pending { sender, .. } => { - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let _ = sender.broadcast(loader).await; }) diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 5fda4c55e5f3d..bfd90e4acdbcf 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -19,7 +19,7 @@ use crate::{ }; use bevy_ecs::prelude::*; use bevy_log::{error, info}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::{CowArc, HashSet}; use crossbeam_channel::{Receiver, Sender}; use futures_lite::StreamExt; @@ -296,7 +296,7 @@ impl AssetServer { if should_load { let owned_handle = Some(handle.clone().untyped()); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { if let Err(err) = server.load_internal(owned_handle, path, false, None).await { error!("{}", err); @@ -366,7 +366,7 @@ impl AssetServer { let id = handle.id().untyped(); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let path_clone = path.clone(); match server.load_untyped_async(path).await { @@ -551,7 +551,7 @@ impl AssetServer { pub fn reload<'a>(&self, path: impl Into>) { let server = self.clone(); let path = path.into().into_owned(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let mut reloaded = false; @@ -690,7 +690,7 @@ impl AssetServer { let path = path.into_owned(); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let Ok(source) = server.get_source(path.source()) else { error!( diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index c49f08ab1f090..20927b461ce0b 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -112,8 +112,7 @@ fn register_math_types(app: &mut App) { .register_type::>(); } -/// Setup of default task pools: [`AsyncComputeTaskPool`](bevy_tasks::AsyncComputeTaskPool), -/// [`ComputeTaskPool`](bevy_tasks::ComputeTaskPool), [`IoTaskPool`](bevy_tasks::IoTaskPool). +/// Setup of default task pool: [`ComputeTaskPool`](bevy_tasks::ComputeTaskPool). #[derive(Default)] pub struct TaskPoolPlugin { /// Options for the [`TaskPool`](bevy_tasks::TaskPool) created at application start. @@ -175,20 +174,13 @@ pub fn update_frame_count(mut frame_count: ResMut) { #[cfg(test)] mod tests { use super::*; - use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; + use bevy_tasks::prelude::ComputeTaskPool; #[test] fn runs_spawn_local_tasks() { let mut app = App::new(); app.add_plugins((TaskPoolPlugin::default(), TypeRegistrationPlugin)); - let (async_tx, async_rx) = crossbeam_channel::unbounded(); - AsyncComputeTaskPool::get() - .spawn_local(async move { - async_tx.send(()).unwrap(); - }) - .detach(); - let (compute_tx, compute_rx) = crossbeam_channel::unbounded(); ComputeTaskPool::get() .spawn_local(async move { @@ -196,18 +188,9 @@ mod tests { }) .detach(); - let (io_tx, io_rx) = crossbeam_channel::unbounded(); - IoTaskPool::get() - .spawn_local(async move { - io_tx.send(()).unwrap(); - }) - .detach(); - app.run(); - async_rx.try_recv().unwrap(); compute_rx.try_recv().unwrap(); - io_rx.try_recv().unwrap(); } #[test] diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 29f759875bb13..405a85eb07e7c 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,35 +1,6 @@ -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use bevy_utils::tracing::trace; -/// Defines a simple way to determine how many threads to use given the number of remaining cores -/// and number of total cores -#[derive(Clone, Debug)] -pub struct TaskPoolThreadAssignmentPolicy { - /// Force using at least this many threads - pub min_threads: usize, - /// Under no circumstance use more than this many threads for this pool - pub max_threads: usize, - /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is - /// permitted to use 1.0 to try to use all remaining threads - pub percent: f32, -} - -impl TaskPoolThreadAssignmentPolicy { - /// Determine the number of threads to use for this task pool - fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { - assert!(self.percent >= 0.0); - let mut desired = (total_threads as f32 * self.percent).round() as usize; - - // Limit ourselves to the number of cores available - desired = desired.min(remaining_threads); - - // Clamp by min_threads, max_threads. (This may result in us using more threads than are - // available, this is intended. An example case where this might happen is a device with - // <= 2 threads. - desired.clamp(self.min_threads, self.max_threads) - } -} - /// Helper for configuring and creating the default task pools. For end-users who want full control, /// set up [`TaskPoolPlugin`](super::TaskPoolPlugin) #[derive(Clone, Debug)] @@ -40,13 +11,6 @@ pub struct TaskPoolOptions { /// If the number of physical cores is greater than max_total_threads, force using /// max_total_threads pub max_total_threads: usize, - - /// Used to determine number of IO threads to allocate - pub io: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of async compute threads to allocate - pub async_compute: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of compute threads to allocate - pub compute: TaskPoolThreadAssignmentPolicy, } impl Default for TaskPoolOptions { @@ -55,27 +19,6 @@ impl Default for TaskPoolOptions { // By default, use however many cores are available on the system min_total_threads: 1, max_total_threads: usize::MAX, - - // Use 25% of cores for IO, at least 1, no more than 4 - io: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use 25% of cores for async compute, at least 1, no more than 4 - async_compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use all remaining cores for compute (at least 1) - compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: usize::MAX, - percent: 1.0, // This 1.0 here means "whatever is left over" - }, } } } @@ -96,57 +39,11 @@ impl TaskPoolOptions { .clamp(self.min_total_threads, self.max_total_threads); trace!("Assigning {} cores to default task pools", total_threads); - let mut remaining_threads = total_threads; - - { - // Determine the number of IO threads we will use - let io_threads = self - .io - .get_number_of_threads(remaining_threads, total_threads); - - trace!("IO Threads: {}", io_threads); - remaining_threads = remaining_threads.saturating_sub(io_threads); - - IoTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of async compute threads we will use - let async_compute_threads = self - .async_compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Async Compute Threads: {}", async_compute_threads); - remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - - AsyncComputeTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of compute threads we will use - // This is intentionally last so that an end user can specify 1.0 as the percent - let compute_threads = self - .compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Compute Threads: {}", compute_threads); - - ComputeTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build() - }); - } + ComputeTaskPool::get_or_init(|| { + TaskPoolBuilder::default() + .num_threads(total_threads) + .thread_name("Compute Task Pool".to_string()) + .build() + }); } } diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index b3306ef3d0b1d..46ec970ff363c 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -34,7 +34,7 @@ use bevy_render::{ }; use bevy_scene::Scene; #[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_transform::components::Transform; use bevy_utils::{ smallvec::{smallvec, SmallVec}, @@ -348,7 +348,7 @@ async fn load_gltf<'a, 'b, 'c>( } } else { #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .scope(|scope| { gltf.textures().for_each(|gltf_texture| { let parent_path = load_context.path().parent().unwrap(); diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index 22a4efee7237d..9618cf50b2843 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -306,7 +306,7 @@ impl Plugin for RenderPlugin { }; // In wasm, spawn a task and detach it for execution #[cfg(target_arch = "wasm32")] - bevy_tasks::IoTaskPool::get() + bevy_tasks::ComputeTaskPool::get() .spawn_local(async_renderer) .detach(); // Otherwise, just block for it to complete diff --git a/crates/bevy_render/src/render_resource/pipeline_cache.rs b/crates/bevy_render/src/render_resource/pipeline_cache.rs index ab921024129a5..abbedfa3b0348 100644 --- a/crates/bevy_render/src/render_resource/pipeline_cache.rs +++ b/crates/bevy_render/src/render_resource/pipeline_cache.rs @@ -12,7 +12,6 @@ use bevy_utils::{ use naga::valid::Capabilities; use std::{ borrow::Cow, - future::Future, hash::Hash, mem, ops::Deref, @@ -697,8 +696,7 @@ impl PipelineCache { let device = self.device.clone(); let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); - create_pipeline_task( - async move { + create_pipeline_task(move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -796,8 +794,7 @@ impl PipelineCache { let device = self.device.clone(); let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); - create_pipeline_task( - async move { + create_pipeline_task(move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -953,14 +950,14 @@ impl PipelineCache { feature = "multi-threaded" ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, sync: bool, ) -> CachedPipelineState { if !sync { - return CachedPipelineState::Creating(bevy_tasks::AsyncComputeTaskPool::get().spawn(task)); + return CachedPipelineState::Creating(bevy_tasks::ComputeTaskPool::get().spawn_blocking(task)); } - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } @@ -972,10 +969,10 @@ fn create_pipeline_task( not(feature = "multi-threaded") ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, _sync: bool, ) -> CachedPipelineState { - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } diff --git a/crates/bevy_render/src/view/window/screenshot.rs b/crates/bevy_render/src/view/window/screenshot.rs index c13a60f88524c..6169a95381613 100644 --- a/crates/bevy_render/src/view/window/screenshot.rs +++ b/crates/bevy_render/src/view/window/screenshot.rs @@ -4,7 +4,7 @@ use bevy_app::Plugin; use bevy_asset::{load_internal_asset, Handle}; use bevy_ecs::{entity::EntityHashMap, prelude::*}; use bevy_log::{error, info, info_span}; -use bevy_tasks::AsyncComputeTaskPool; +use bevy_tasks::ComputeTaskPool; use std::sync::Mutex; use thiserror::Error; use wgpu::{ @@ -42,7 +42,8 @@ pub struct ScreenshotAlreadyRequestedError; impl ScreenshotManager { /// Signals the renderer to take a screenshot of this frame. /// - /// The given callback will eventually be called on one of the [`AsyncComputeTaskPool`]s threads. + /// The given callback will eventually be called on one of the [`ComputeTaskPool`]'s + /// blocking threads. pub fn take_screenshot( &mut self, window: Entity, @@ -317,7 +318,7 @@ pub(crate) fn collect_screenshots(world: &mut World) { let pixel_size = texture_format.pixel_size(); let ScreenshotPreparedState { buffer, .. } = window.screenshot_memory.take().unwrap(); - let finish = async move { + ComputeTaskPool::get().spawn_blocking(move || { let (tx, rx) = async_channel::bounded(1); let buffer_slice = buffer.slice(..); // The polling for this map call is done every frame when the command queue is submitted. @@ -328,7 +329,7 @@ pub(crate) fn collect_screenshots(world: &mut World) { } tx.try_send(()).unwrap(); }); - rx.recv().await.unwrap(); + rx.recv_blocking().unwrap(); let data = buffer_slice.get_mapped_range(); // we immediately move the data to CPU memory to avoid holding the mapped view for long let mut result = Vec::from(&*data); @@ -365,9 +366,8 @@ pub(crate) fn collect_screenshots(world: &mut World) { texture_format, RenderAssetUsages::RENDER_WORLD, )); - }; - - AsyncComputeTaskPool::get().spawn(finish).detach(); + }) + .detach(); } } } diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 3fb0e3c297acd..b2c7463c89158 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -18,6 +18,7 @@ async-channel = "2.2.0" async-io = { version = "2.0.0", optional = true } async-task = "4.2.0" concurrent-queue = "2.0.0" +blocking = "1" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 1d1a7fb90465b..8c778ae4f92a1 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -25,14 +25,9 @@ bevy provides three different thread pools via which tasks of different kinds ca This currently applies to WASM targets.) The determining factor for what kind of work should go in each pool is latency requirements: -* For CPU-intensive work (tasks that generally spin until completion) we have a standard - [`ComputeTaskPool`] and an [`AsyncComputeTaskPool`]. Work that does not need to be completed to - present the next frame should go to the [`AsyncComputeTaskPool`]. - -* For IO-intensive work (tasks that spend very little time in a "woken" state) we have an - [`IoTaskPool`] whose tasks are expected to complete very quickly. Generally speaking, they should just - await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready - for consumption. (likely via channels) +For CPU-intensive work, the standard [`ComputeTaskPool`] can be used. It can also be used for +non-blocking IO-intensive work (tasks that spend very little time in a "woken" state). +Work that does not need to be completed to present the next frame should use [`TaskPool::spawn_blocking`]. [bevy]: https://bevyengine.org [rayon]: /~https://github.com/rayon-rs/rayon diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 60b162dbed596..cca6020955435 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -19,7 +19,7 @@ pub use single_threaded_task_pool::{FakeTask, Scope, TaskPool, TaskPoolBuilder, mod usages; #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; +pub use usages::ComputeTaskPool; #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] mod thread_executor; @@ -44,7 +44,7 @@ pub mod prelude { block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, + usages::ComputeTaskPool, }; } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 551bb06311fd2..819de0d5e2496 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -5,9 +5,11 @@ use std::{ panic::AssertUnwindSafe, sync::Arc, thread::{self, JoinHandle}, + env }; use async_task::FallibleTask; +use blocking::unblock; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; @@ -34,6 +36,7 @@ pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. /// Otherwise use the logical core count of the system num_threads: Option, + num_blocking_threads: Option, /// If set, we'll use the given stack size rather than the system default stack_size: Option, /// Allows customizing the name of the threads - helpful for debugging. If set, threads will @@ -50,7 +53,7 @@ impl TaskPoolBuilder { Self::default() } - /// Override the number of threads created for the pool. If unset, we default to the number + /// Override the number of threads created for the pool. If unset, it will default to the number /// of logical cores of the system pub fn num_threads(mut self, num_threads: usize) -> Self { self.num_threads = Some(num_threads); @@ -135,6 +138,10 @@ impl TaskPool { } fn new_internal(builder: TaskPoolBuilder) -> Self { + if let Some(thread_count) = builder.num_blocking_threads { + env::set_var("BLOCKING_MAX_THREADS ", thread_count.to_string().as_str()); + } + let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); let executor = Arc::new(async_executor::Executor::new()); @@ -539,6 +546,34 @@ impl TaskPool { Task::new(self.executor.spawn(future)) } + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the task pool + /// from driving other futures forward. This function runs the provided + /// closure on a thread dedicated to blocking operations. + /// + /// This call will spawn more blocking threads when they are requested + /// through /// this function until the upper limit configured. This + /// limit is very large by default, because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using spawn_blocking, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut down the executor, it will wait + /// indefinitely for all blocking operations to finish. + /// + /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) + pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task + where + T: Send + 'static, + { + Task::new(unblock(f)) + } + /// Spawns a static future on the thread-local async executor for the /// current thread. The task will run entirely on the thread the task was /// spawned on. diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index fda3092b8ebc8..72d92b516ccdc 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,77 +1,44 @@ use super::TaskPool; use std::{ops::Deref, sync::OnceLock}; -macro_rules! taskpool { - ($(#[$attr:meta])* ($static:ident, $type:ident)) => { - static $static: OnceLock<$type> = OnceLock::new(); +static COMPUTE_TASK_POOL: OnceLock = OnceLock::new(); - $(#[$attr])* - #[derive(Debug)] - pub struct $type(TaskPool); - - impl $type { - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] - pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) - } - - #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ - or returns `None` if it is not initialized.")] - pub fn try_get() -> Option<&'static Self> { - $static.get() - } - - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance.")] - #[doc = ""] - #[doc = " # Panics"] - #[doc = " Panics if the global instance has not been initialized yet."] - pub fn get() -> &'static Self { - $static.get().expect( - concat!( - "The ", - stringify!($type), - " has not been initialized yet. Please call ", - stringify!($type), - "::get_or_init beforehand." - ) - ) - } - } - - impl Deref for $type { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - }; -} - -taskpool! { - /// A newtype for a task pool for CPU-intensive work that must be completed to - /// deliver the next frame - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be - /// completed before the next frame. - (COMPUTE_TASK_POOL, ComputeTaskPool) +/// A newtype for a task pool for CPU-intensive work that must be completed to +/// deliver the next frame +/// +/// See [`TaskPool`] documentation for details on Bevy tasks. +#[derive(Debug)] +pub struct ComputeTaskPool(TaskPool); + +impl ComputeTaskPool { + /// Gets the global [`ComputeTaskPool`] instance, or initializes it with `f`. + pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { + COMPUTE_TASK_POOL.get_or_init(|| Self(f())) + } + + /// Attempts to get the global [`ComputeTaskPool`] instance, + /// or returns `None` if it is not initialized. + pub fn try_get() -> Option<&'static Self> { + COMPUTE_TASK_POOL.get() + } + + /// Gets the global [`ComputeTaskPool`] instance." + /// + /// # Panics + /// Panics if the global instance has not been initialized yet. + pub fn get() -> &'static Self { + COMPUTE_TASK_POOL.get().expect( + "The ComputeTaskPool has not been initialized yet. Please call ComputeTaskPool::get_or_init beforehand" + ) + } } -taskpool! { - /// A newtype for a task pool for CPU-intensive work that may span across multiple frames - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// Use [`ComputeTaskPool`] if the work must be complete before advancing to the next frame. - (ASYNC_COMPUTE_TASK_POOL, AsyncComputeTaskPool) -} +impl Deref for ComputeTaskPool { + type Target = TaskPool; -taskpool! { - /// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a - /// "woken" state) - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - (IO_TASK_POOL, IoTaskPool) + fn deref(&self) -> &Self::Target { + &self.0 + } } /// A function used by `bevy_core` to tick the global tasks pools on the main thread. @@ -86,20 +53,8 @@ pub fn tick_global_task_pools_on_main_thread() { .get() .unwrap() .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); + for _ in 0..100 { + compute_local_executor.try_tick(); + } }); } diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index 134373bc36c97..f854a8f9c85f6 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -1,10 +1,10 @@ -//! This example shows how to use the ECS and the [`AsyncComputeTaskPool`] +//! This example shows how to use the ECS and blocking tasks on the [`ComputeTaskPool`] //! to spawn, poll, and complete tasks across systems and system ticks. use bevy::{ ecs::system::{CommandQueue, SystemState}, prelude::*, - tasks::{block_on, futures_lite::future, AsyncComputeTaskPool, Task}, + tasks::{block_on, futures_lite::future, ComputeTaskPool, Task}, }; use rand::Rng; use std::{thread, time::Duration}; @@ -50,15 +50,15 @@ struct ComputeTransform(Task); /// system, [`handle_tasks`], will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes fn spawn_tasks(mut commands: Commands) { - let thread_pool = AsyncComputeTaskPool::get(); + let thread_pool = ComputeTaskPool::get(); for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { - // Spawn new task on the AsyncComputeTaskPool; the task will be + // Spawn a new blocking task on the ComputeTaskPool; the task will be // executed in the background, and the Task future returned by // spawn() can be used to poll for the result let entity = commands.spawn_empty().id(); - let task = thread_pool.spawn(async move { + let task = thread_pool.spawn_blocking(move || { let mut rng = rand::thread_rng(); let duration = Duration::from_secs_f32(rng.gen_range(0.05..0.2)); diff --git a/examples/scene/scene.rs b/examples/scene/scene.rs index dac36e96d7fd4..2635fcdefb9fd 100644 --- a/examples/scene/scene.rs +++ b/examples/scene/scene.rs @@ -1,5 +1,5 @@ //! This example illustrates loading scenes from files. -use bevy::{prelude::*, tasks::IoTaskPool, utils::Duration}; +use bevy::{prelude::*, tasks::ComputeTaskPool, utils::Duration}; use std::{fs::File, io::Write}; fn main() { @@ -133,7 +133,7 @@ fn save_scene_system(world: &mut World) { // as they are blocking // This can't work in WASM as there is no filesystem access #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { // Write the scene RON data to file File::create(format!("assets/{NEW_SCENE_FILE_PATH}")) From 803e74ffd03b69268bb57a3ea6ec8c90f3d5ca25 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 05:36:59 -0800 Subject: [PATCH 02/19] Formatting --- .../src/render_resource/pipeline_cache.rs | 10 +- .../bevy_render/src/view/window/screenshot.rs | 100 +++++++++--------- crates/bevy_tasks/src/task_pool.rs | 40 +++---- crates/bevy_tasks/src/usages.rs | 4 +- 4 files changed, 80 insertions(+), 74 deletions(-) diff --git a/crates/bevy_render/src/render_resource/pipeline_cache.rs b/crates/bevy_render/src/render_resource/pipeline_cache.rs index abbedfa3b0348..a6b11e6eafa7a 100644 --- a/crates/bevy_render/src/render_resource/pipeline_cache.rs +++ b/crates/bevy_render/src/render_resource/pipeline_cache.rs @@ -696,7 +696,8 @@ impl PipelineCache { let device = self.device.clone(); let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); - create_pipeline_task(move || { + create_pipeline_task( + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -794,7 +795,8 @@ impl PipelineCache { let device = self.device.clone(); let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); - create_pipeline_task(move || { + create_pipeline_task( + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -954,7 +956,9 @@ fn create_pipeline_task( sync: bool, ) -> CachedPipelineState { if !sync { - return CachedPipelineState::Creating(bevy_tasks::ComputeTaskPool::get().spawn_blocking(task)); + return CachedPipelineState::Creating( + bevy_tasks::ComputeTaskPool::get().spawn_blocking(task), + ); } match task() { diff --git a/crates/bevy_render/src/view/window/screenshot.rs b/crates/bevy_render/src/view/window/screenshot.rs index 6169a95381613..c65f71144f92a 100644 --- a/crates/bevy_render/src/view/window/screenshot.rs +++ b/crates/bevy_render/src/view/window/screenshot.rs @@ -42,7 +42,7 @@ pub struct ScreenshotAlreadyRequestedError; impl ScreenshotManager { /// Signals the renderer to take a screenshot of this frame. /// - /// The given callback will eventually be called on one of the [`ComputeTaskPool`]'s + /// The given callback will eventually be called on one of the [`ComputeTaskPool`]'s /// blocking threads. pub fn take_screenshot( &mut self, @@ -318,56 +318,58 @@ pub(crate) fn collect_screenshots(world: &mut World) { let pixel_size = texture_format.pixel_size(); let ScreenshotPreparedState { buffer, .. } = window.screenshot_memory.take().unwrap(); - ComputeTaskPool::get().spawn_blocking(move || { - let (tx, rx) = async_channel::bounded(1); - let buffer_slice = buffer.slice(..); - // The polling for this map call is done every frame when the command queue is submitted. - buffer_slice.map_async(wgpu::MapMode::Read, move |result| { - let err = result.err(); - if err.is_some() { - panic!("{}", err.unwrap().to_string()); + ComputeTaskPool::get() + .spawn_blocking(move || { + let (tx, rx) = async_channel::bounded(1); + let buffer_slice = buffer.slice(..); + // The polling for this map call is done every frame when the command queue is submitted. + buffer_slice.map_async(wgpu::MapMode::Read, move |result| { + let err = result.err(); + if err.is_some() { + panic!("{}", err.unwrap().to_string()); + } + tx.try_send(()).unwrap(); + }); + rx.recv_blocking().unwrap(); + let data = buffer_slice.get_mapped_range(); + // we immediately move the data to CPU memory to avoid holding the mapped view for long + let mut result = Vec::from(&*data); + drop(data); + drop(buffer); + + if result.len() != ((width * height) as usize * pixel_size) { + // Our buffer has been padded because we needed to align to a multiple of 256. + // We remove this padding here + let initial_row_bytes = width as usize * pixel_size; + let buffered_row_bytes = + align_byte_size(width * pixel_size as u32) as usize; + + let mut take_offset = buffered_row_bytes; + let mut place_offset = initial_row_bytes; + for _ in 1..height { + result.copy_within( + take_offset..take_offset + buffered_row_bytes, + place_offset, + ); + take_offset += buffered_row_bytes; + place_offset += initial_row_bytes; + } + result.truncate(initial_row_bytes * height as usize); } - tx.try_send(()).unwrap(); - }); - rx.recv_blocking().unwrap(); - let data = buffer_slice.get_mapped_range(); - // we immediately move the data to CPU memory to avoid holding the mapped view for long - let mut result = Vec::from(&*data); - drop(data); - drop(buffer); - - if result.len() != ((width * height) as usize * pixel_size) { - // Our buffer has been padded because we needed to align to a multiple of 256. - // We remove this padding here - let initial_row_bytes = width as usize * pixel_size; - let buffered_row_bytes = align_byte_size(width * pixel_size as u32) as usize; - - let mut take_offset = buffered_row_bytes; - let mut place_offset = initial_row_bytes; - for _ in 1..height { - result.copy_within( - take_offset..take_offset + buffered_row_bytes, - place_offset, - ); - take_offset += buffered_row_bytes; - place_offset += initial_row_bytes; - } - result.truncate(initial_row_bytes * height as usize); - } - screenshot_func(Image::new( - Extent3d { - width, - height, - depth_or_array_layers: 1, - }, - wgpu::TextureDimension::D2, - result, - texture_format, - RenderAssetUsages::RENDER_WORLD, - )); - }) - .detach(); + screenshot_func(Image::new( + Extent3d { + width, + height, + depth_or_array_layers: 1, + }, + wgpu::TextureDimension::D2, + result, + texture_format, + RenderAssetUsages::RENDER_WORLD, + )); + }) + .detach(); } } } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 819de0d5e2496..776280933386a 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,11 +1,11 @@ use std::{ + env, future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe, sync::Arc, thread::{self, JoinHandle}, - env }; use async_task::FallibleTask; @@ -547,27 +547,27 @@ impl TaskPool { } /// Runs the provided closure on a thread where blocking is acceptable. - /// - /// In general, issuing a blocking call or performing a lot of compute in a - /// future without yielding is not okay, as it may prevent the task pool - /// from driving other futures forward. This function runs the provided - /// closure on a thread dedicated to blocking operations. - /// - /// This call will spawn more blocking threads when they are requested - /// through /// this function until the upper limit configured. This - /// limit is very large by default, because `spawn_blocking` is often - /// used for various kinds of IO operations that cannot be performed - /// asynchronously. When you run CPU-bound code using spawn_blocking, - /// you should keep this large upper limit in mind; to run your - /// CPU-bound computations on only a few threads. Spawning too many threads - /// will cause the OS to [thrash], which may impact the performance + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the task pool + /// from driving other futures forward. This function runs the provided + /// closure on a thread dedicated to blocking operations. + /// + /// This call will spawn more blocking threads when they are requested + /// through /// this function until the upper limit configured. This + /// limit is very large by default, because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using spawn_blocking, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance /// of the non-blocking tasks scheduled onto the `TaskPool`. - /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut down the executor, it will wait - /// indefinitely for all blocking operations to finish. - /// + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut down the executor, it will wait + /// indefinitely for all blocking operations to finish. + /// /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) - pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task + pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task where T: Send + 'static, { diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 72d92b516ccdc..6f98d3d8f250e 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -16,14 +16,14 @@ impl ComputeTaskPool { COMPUTE_TASK_POOL.get_or_init(|| Self(f())) } - /// Attempts to get the global [`ComputeTaskPool`] instance, + /// Attempts to get the global [`ComputeTaskPool`] instance, /// or returns `None` if it is not initialized. pub fn try_get() -> Option<&'static Self> { COMPUTE_TASK_POOL.get() } /// Gets the global [`ComputeTaskPool`] instance." - /// + /// /// # Panics /// Panics if the global instance has not been initialized yet. pub fn get() -> &'static Self { From 29e5fab7a938beced5a771f2a5883cf5058e3b4a Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 05:52:11 -0800 Subject: [PATCH 03/19] Backticks --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 776280933386a..6d7b31955bf0b 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -557,7 +557,7 @@ impl TaskPool { /// through /// this function until the upper limit configured. This /// limit is very large by default, because `spawn_blocking` is often /// used for various kinds of IO operations that cannot be performed - /// asynchronously. When you run CPU-bound code using spawn_blocking, + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, /// you should keep this large upper limit in mind; to run your /// CPU-bound computations on only a few threads. Spawning too many threads /// will cause the OS to [thrash], which may impact the performance From 05007b063d1e29b404fd3e1d244a61771035a891 Mon Sep 17 00:00:00 2001 From: James Liu Date: Sat, 24 Feb 2024 14:14:37 -0800 Subject: [PATCH 04/19] Apply suggestions from code review Co-authored-by: Afonso Lage --- crates/bevy_tasks/src/task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 6d7b31955bf0b..762a0446fba86 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -139,7 +139,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { if let Some(thread_count) = builder.num_blocking_threads { - env::set_var("BLOCKING_MAX_THREADS ", thread_count.to_string().as_str()); + env::set_var("BLOCKING_MAX_THREADS", thread_count.to_string().as_str()); } let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); @@ -554,7 +554,7 @@ impl TaskPool { /// closure on a thread dedicated to blocking operations. /// /// This call will spawn more blocking threads when they are requested - /// through /// this function until the upper limit configured. This + /// through until the upper limit configured. This /// limit is very large by default, because `spawn_blocking` is often /// used for various kinds of IO operations that cannot be performed /// asynchronously. When you run CPU-bound code using `spawn_blocking`, From 16c98165d2b51f45e349947005b00ef46347582d Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 19:43:01 -0800 Subject: [PATCH 05/19] Add a spawn_blocking_async --- crates/bevy_tasks/src/task_pool.rs | 77 +++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 16 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 762a0446fba86..71cf6f15debb6 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -537,8 +537,16 @@ impl TaskPool { /// any case, the pool will execute the task even without polling by the /// end-user. /// - /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should + /// If the provided future is non-`Send`, [`spawn_local`] should /// be used instead. + /// + /// If the provided future performs blocking IO or may have long lasting + /// CPU-bound operations, use [`spawn_blocking`] or [`spawn_blocking_async`] + /// instead. + /// + /// [`spawn_local`]: Self::spawn_local + /// [`spawn_blocking`]: Self::spawn_blocking + /// [`spawn_blocking`]: Self::spawn_blocking_async pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, @@ -546,6 +554,24 @@ impl TaskPool { Task::new(self.executor.spawn(future)) } + /// Spawns a static future on the thread-local async executor for the + /// current thread. The task will run entirely on the thread the task was + /// spawned on. + /// + /// The returned [`Task`] is a future that can be polled for the + /// result. It can also be canceled and "detached", allowing the task to + /// continue running even if dropped. In any case, the pool will execute the + /// task even without polling by the end-user. + /// + /// Users should generally prefer to use [`TaskPool::spawn`] instead, + /// unless the provided future is not `Send`. + pub fn spawn_local(&self, future: impl Future + 'static) -> Task + where + T: 'static, + { + Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) + } + /// Runs the provided closure on a thread where blocking is acceptable. /// /// In general, issuing a blocking call or performing a lot of compute in a @@ -554,8 +580,8 @@ impl TaskPool { /// closure on a thread dedicated to blocking operations. /// /// This call will spawn more blocking threads when they are requested - /// through until the upper limit configured. This - /// limit is very large by default, because `spawn_blocking` is often + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often /// used for various kinds of IO operations that cannot be performed /// asynchronously. When you run CPU-bound code using `spawn_blocking`, /// you should keep this large upper limit in mind; to run your @@ -573,23 +599,42 @@ impl TaskPool { { Task::new(unblock(f)) } - - /// Spawns a static future on the thread-local async executor for the - /// current thread. The task will run entirely on the thread the task was - /// spawned on. + + /// Spawns a static future onto on a thread where blocking is acceptabl.e + /// The returned [`Task`] is a future that can be polled for the result. + /// It can also be canceled and "detached", allowing the task to continue + /// running even if dropped. In any case, the pool will execute the task + /// even without polling by the end-user. /// - /// The returned [`Task`] is a future that can be polled for the - /// result. It can also be canceled and "detached", allowing the task to - /// continue running even if dropped. In any case, the pool will execute the - /// task even without polling by the end-user. + /// This function is equivalent to calling `task_pool.spawn_blocking(|| block_on(f))`. + /// + /// If the future is expected to terminate quickly, or will not spend a + /// signficant amount of time performing blocking CPU-bound or IO-bound + /// operations, [`spawn`] should be used instead. The ideal use case for + /// this function is for launching a future that may involve a combination + /// of async IO and blocking operations (i.e. loading large scenes). + /// + /// This call will spawn more blocking threads when they are requested + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. /// - /// Users should generally prefer to use [`TaskPool::spawn`] instead, - /// unless the provided future is not `Send`. - pub fn spawn_local(&self, future: impl Future + 'static) -> Task + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you + /// shut down the executor, it will wait indefinitely for all blocking + /// operations to finish. + /// + /// [`spawn`]: Self::spawn + #[inline] + pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task where - T: 'static, + T: Send + 'static, { - Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) + self.spawn_blocking(|| block_on(f)) } /// Runs a function with the local executor. Typically used to tick From 332c98ba1e466bac0c70e14923002ba92a71356d Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 19:47:39 -0800 Subject: [PATCH 06/19] Add configuration for the number of blocking threads. --- crates/bevy_tasks/src/task_pool.rs | 47 ++++++++++++++++++------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 71cf6f15debb6..4c000cba90c14 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -36,7 +36,10 @@ pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. /// Otherwise use the logical core count of the system num_threads: Option, - num_blocking_threads: Option, + /// If set, this sets the maxinum number of threads used for blocking operations. + /// Otherwise, it will default to the value set by the `BLOCKING_MAX_THREADS` environment variable, + /// or 500 if not set. + max_blocking_threads: Option, /// If set, we'll use the given stack size rather than the system default stack_size: Option, /// Allows customizing the name of the threads - helpful for debugging. If set, threads will @@ -60,6 +63,14 @@ impl TaskPoolBuilder { self } + /// Override the maximum number of blocking threads created for the pool. If unset, it will + /// default to the value set by the `BLOCKING_MAX_THREADS` environment variable, or 500 if not + /// set. + pub fn max_blocking_threads(mut self, num_threads: usize) -> Self { + self.max_blocking_threads = Some(num_threads); + self + } + /// Override the stack size of the threads created for the pool pub fn stack_size(mut self, stack_size: usize) -> Self { self.stack_size = Some(stack_size); @@ -539,11 +550,11 @@ impl TaskPool { /// /// If the provided future is non-`Send`, [`spawn_local`] should /// be used instead. - /// - /// If the provided future performs blocking IO or may have long lasting + /// + /// If the provided future performs blocking IO or may have long lasting /// CPU-bound operations, use [`spawn_blocking`] or [`spawn_blocking_async`] /// instead. - /// + /// /// [`spawn_local`]: Self::spawn_local /// [`spawn_blocking`]: Self::spawn_blocking /// [`spawn_blocking`]: Self::spawn_blocking_async @@ -580,7 +591,7 @@ impl TaskPool { /// closure on a thread dedicated to blocking operations. /// /// This call will spawn more blocking threads when they are requested - /// through this function until the upper limit configured. + /// through this function until the upper limit configured. /// This limit is very large by default (500), because `spawn_blocking` is often /// used for various kinds of IO operations that cannot be performed /// asynchronously. When you run CPU-bound code using `spawn_blocking`, @@ -599,23 +610,23 @@ impl TaskPool { { Task::new(unblock(f)) } - - /// Spawns a static future onto on a thread where blocking is acceptabl.e - /// The returned [`Task`] is a future that can be polled for the result. - /// It can also be canceled and "detached", allowing the task to continue - /// running even if dropped. In any case, the pool will execute the task + + /// Spawns a static future onto on a thread where blocking is acceptabl.e + /// The returned [`Task`] is a future that can be polled for the result. + /// It can also be canceled and "detached", allowing the task to continue + /// running even if dropped. In any case, the pool will execute the task /// even without polling by the end-user. /// /// This function is equivalent to calling `task_pool.spawn_blocking(|| block_on(f))`. - /// - /// If the future is expected to terminate quickly, or will not spend a + /// + /// If the future is expected to terminate quickly, or will not spend a /// signficant amount of time performing blocking CPU-bound or IO-bound /// operations, [`spawn`] should be used instead. The ideal use case for /// this function is for launching a future that may involve a combination /// of async IO and blocking operations (i.e. loading large scenes). - /// + /// /// This call will spawn more blocking threads when they are requested - /// through this function until the upper limit configured. + /// through this function until the upper limit configured. /// This limit is very large by default (500), because `spawn_blocking` is often /// used for various kinds of IO operations that cannot be performed /// asynchronously. When you run CPU-bound code using `spawn_blocking`, @@ -624,13 +635,13 @@ impl TaskPool { /// will cause the OS to [thrash], which may impact the performance /// of the non-blocking tasks scheduled onto the `TaskPool`. /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you - /// shut down the executor, it will wait indefinitely for all blocking + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you + /// shut down the executor, it will wait indefinitely for all blocking /// operations to finish. - /// + /// /// [`spawn`]: Self::spawn #[inline] - pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task + pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task where T: Send + 'static, { From 589cab7b9691cd9c5551e76c0d9a0c43ce47c07b Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 24 Feb 2024 19:54:52 -0800 Subject: [PATCH 07/19] Fix build --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 4c000cba90c14..879565910639d 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -149,7 +149,7 @@ impl TaskPool { } fn new_internal(builder: TaskPoolBuilder) -> Self { - if let Some(thread_count) = builder.num_blocking_threads { + if let Some(thread_count) = builder.max_blocking_threads { env::set_var("BLOCKING_MAX_THREADS", thread_count.to_string().as_str()); } From 7d85100073bfccfc0cd8492edd34ff8942f2a8a6 Mon Sep 17 00:00:00 2001 From: James Liu Date: Sun, 25 Feb 2024 14:38:25 -0800 Subject: [PATCH 08/19] Apply suggestions from code review Co-authored-by: Mike --- crates/bevy_tasks/src/task_pool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 879565910639d..675ada149d881 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -557,7 +557,7 @@ impl TaskPool { /// /// [`spawn_local`]: Self::spawn_local /// [`spawn_blocking`]: Self::spawn_blocking - /// [`spawn_blocking`]: Self::spawn_blocking_async + /// [`spawn_blocking_async`]: Self::spawn_blocking_async pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, @@ -611,9 +611,9 @@ impl TaskPool { Task::new(unblock(f)) } - /// Spawns a static future onto on a thread where blocking is acceptabl.e + /// Spawns a static future onto on a thread where blocking is acceptable. /// The returned [`Task`] is a future that can be polled for the result. - /// It can also be canceled and "detached", allowing the task to continue + /// It can also be "detached", allowing the task to continue /// running even if dropped. In any case, the pool will execute the task /// even without polling by the end-user. /// From ac13d610041f635fa7de0d12f2795d63ce7dcf20 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 25 Feb 2024 16:05:12 -0800 Subject: [PATCH 09/19] Fix Wasm and document platform specific behavior. --- .../bevy_render/src/view/window/screenshot.rs | 4 ++-- crates/bevy_tasks/Cargo.toml | 10 ++++----- .../src/single_threaded_task_pool.rs | 22 +++++++++++++++++++ crates/bevy_tasks/src/task_pool.rs | 17 +++++++++++--- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/crates/bevy_render/src/view/window/screenshot.rs b/crates/bevy_render/src/view/window/screenshot.rs index c65f71144f92a..960c1a11f9c76 100644 --- a/crates/bevy_render/src/view/window/screenshot.rs +++ b/crates/bevy_render/src/view/window/screenshot.rs @@ -319,7 +319,7 @@ pub(crate) fn collect_screenshots(world: &mut World) { let ScreenshotPreparedState { buffer, .. } = window.screenshot_memory.take().unwrap(); ComputeTaskPool::get() - .spawn_blocking(move || { + .spawn_blocking_async(async move { let (tx, rx) = async_channel::bounded(1); let buffer_slice = buffer.slice(..); // The polling for this map call is done every frame when the command queue is submitted. @@ -330,7 +330,7 @@ pub(crate) fn collect_screenshots(world: &mut World) { } tx.try_send(()).unwrap(); }); - rx.recv_blocking().unwrap(); + rx.recv().await.unwrap(); let data = buffer_slice.get_mapped_range(); // we immediately move the data to CPU memory to avoid holding the mapped view for long let mut result = Vec::from(&*data); diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index b2c7463c89158..4a82bcb182698 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,16 +9,16 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi-threaded = [] +multi-threaded = ["dep:blocking", "dep:concurrent-queue", "dep:async-channel", "dep:async-task"] [dependencies] futures-lite = "2.0.1" async-executor = "1.7.2" -async-channel = "2.2.0" +async-channel = { version = "2.2.0", optional = true } async-io = { version = "2.0.0", optional = true } -async-task = "4.2.0" -concurrent-queue = "2.0.0" -blocking = "1" +async-task = { version = "4.2.0", optional = true } +concurrent-queue = { version = "2.0.0", optional = true } +blocking = { version = "1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index f3837c4766fae..6671e557ecfee 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -167,6 +167,28 @@ impl TaskPool { self.spawn(future) } + /// Spawns a static future on the JS event loop to be executed at a later point. + /// + /// This is potentially dangerous in browsers when running long standing computations that + /// may block, as the browser will panic if the process does not periodically yield back to + /// the browser. Consider using [`spawn_blocking_async`] instead. + /// + /// [`spawn_blocking_async`]: Self::spawn_blocking_async + pub fn spawn_blocking(&self, f: impl FnOnce() + 'static) -> FakeTask + where + T: 'static, + { + self.spawn(async { f() }) + } + + /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. + pub fn spawn_blocking_async(&self, future: impl Future + 'static) -> FakeTask + where + T: 'static, + { + self.spawn(future) + } + /// Runs a function with the local executor. Typically used to tick /// the local executor on the main thread as it needs to share time with /// other things. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 675ada149d881..becef561b2be1 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -600,8 +600,14 @@ impl TaskPool { /// will cause the OS to [thrash], which may impact the performance /// of the non-blocking tasks scheduled onto the `TaskPool`. /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut down the executor, it will wait - /// indefinitely for all blocking operations to finish. + /// Closures spawned using `spawn_blocking` cannot be cancelled. When the + /// executor is shutdown, it will wait indefinitely for all blocking operations + /// to finish. + /// + /// ## Platform Specific Behavior + /// Long running blocking operations in browser environments will panic, so the app + /// must yield back to the browser periodically. If you're targetting web platforms, + /// consider using [`spawn_blocking_async`]. /// /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task @@ -638,8 +644,13 @@ impl TaskPool { /// Closures spawned using `spawn_blocking` cannot be cancelled. When you /// shut down the executor, it will wait indefinitely for all blocking /// operations to finish. - /// + /// + /// ## Platform Specific Behavior + /// This function behaves identically to `apawn` on `wasm` targets, or if + /// the `multi-threaded` feature on the crate is not enabled. + /// /// [`spawn`]: Self::spawn + /// [`spawn_blocking_async`]: Self::spawn_blocking_async #[inline] pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task where From e08a7fc3c7e7d364a991a4cb4ceffd38ddaed9d7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 25 Feb 2024 16:21:45 -0800 Subject: [PATCH 10/19] Formatting --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 6 +++--- crates/bevy_tasks/src/task_pool.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 6671e557ecfee..c0b44f588d571 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -168,11 +168,11 @@ impl TaskPool { } /// Spawns a static future on the JS event loop to be executed at a later point. - /// - /// This is potentially dangerous in browsers when running long standing computations that + /// + /// This is potentially dangerous in browsers when running long standing computations that /// may block, as the browser will panic if the process does not periodically yield back to /// the browser. Consider using [`spawn_blocking_async`] instead. - /// + /// /// [`spawn_blocking_async`]: Self::spawn_blocking_async pub fn spawn_blocking(&self, f: impl FnOnce() + 'static) -> FakeTask where diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index becef561b2be1..acf6e120f884d 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -601,12 +601,12 @@ impl TaskPool { /// of the non-blocking tasks scheduled onto the `TaskPool`. /// /// Closures spawned using `spawn_blocking` cannot be cancelled. When the - /// executor is shutdown, it will wait indefinitely for all blocking operations + /// executor is shutdown, it will wait indefinitely for all blocking operations /// to finish. - /// + /// /// ## Platform Specific Behavior /// Long running blocking operations in browser environments will panic, so the app - /// must yield back to the browser periodically. If you're targetting web platforms, + /// must yield back to the browser periodically. If you're targetting web platforms, /// consider using [`spawn_blocking_async`]. /// /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) @@ -644,11 +644,11 @@ impl TaskPool { /// Closures spawned using `spawn_blocking` cannot be cancelled. When you /// shut down the executor, it will wait indefinitely for all blocking /// operations to finish. - /// + /// /// ## Platform Specific Behavior /// This function behaves identically to `apawn` on `wasm` targets, or if /// the `multi-threaded` feature on the crate is not enabled. - /// + /// /// [`spawn`]: Self::spawn /// [`spawn_blocking_async`]: Self::spawn_blocking_async #[inline] From b0cb7c61cc03bb752567faf9866bbc37ed6076db Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 25 Feb 2024 16:25:26 -0800 Subject: [PATCH 11/19] Fix warning --- crates/bevy_core/src/task_pool_options.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 405a85eb07e7c..7be5e1f0cdf1d 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -29,7 +29,6 @@ impl TaskPoolOptions { TaskPoolOptions { min_total_threads: thread_count, max_total_threads: thread_count, - ..Default::default() } } From 25706c74545d1c5caaf9d503ad6b027cfa1473c3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 16:57:26 -0800 Subject: [PATCH 12/19] Fix toml formatting --- crates/bevy_tasks/Cargo.toml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 4a82bcb182698..9e87eaa544e02 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,7 +9,12 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi-threaded = ["dep:blocking", "dep:concurrent-queue", "dep:async-channel", "dep:async-task"] +multi-threaded = [ + "dep:blocking", + "dep:concurrent-queue", + "dep:async-channel", + "dep:async-task" +] [dependencies] futures-lite = "2.0.1" From f3ef65c344dd9fe59ae0c39cd8e85a5a9bfa2416 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 23:08:41 -0800 Subject: [PATCH 13/19] Provide more disambiguation for num_blocking_threads. --- crates/bevy_tasks/src/task_pool.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index acf6e120f884d..30e34feec8a04 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -63,11 +63,20 @@ impl TaskPoolBuilder { self } - /// Override the maximum number of blocking threads created for the pool. If unset, it will - /// default to the value set by the `BLOCKING_MAX_THREADS` environment variable, or 500 if not - /// set. - pub fn max_blocking_threads(mut self, num_threads: usize) -> Self { - self.max_blocking_threads = Some(num_threads); + /// The task pool contains a dynamically scaling group of threads for handling blocking tasks. + /// The pool will spin up and down threads as needed, unlike the threads allocated by + /// [`num_threads`] which are always available. By default, zero threads will be spawned at + /// initialization, and up to `num_blocking_threads` will be spun up. Upon reaching that limit, + /// calls to [`spawn_blocking`] and [`spawn_blocking_async`] will wait until one of the threads + /// becomes available. + /// + /// By default, this will use the `BLOCKING_MAX_THREADS` environment variable to determine, + /// the maximum, or 500 if that environment variable is not set. + /// + /// [`spawn_blocking`]: TaskPool::spawn_blocking + /// [`spawn_blocking_async`]: TaskPool::spawn_blocking_async + pub fn max_blocking_threads(mut self, num_blocking_threads: usize) -> Self { + self.max_blocking_threads = Some(num_blocking_threads); self } From 9bf32a5d2de92640ff07361f67626fae1fa7fa6e Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 23:15:14 -0800 Subject: [PATCH 14/19] Correct documentation on spawn_blocking_async --- crates/bevy_tasks/src/task_pool.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 30e34feec8a04..ab64f3c5cbef7 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -650,12 +650,11 @@ impl TaskPool { /// will cause the OS to [thrash], which may impact the performance /// of the non-blocking tasks scheduled onto the `TaskPool`. /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you - /// shut down the executor, it will wait indefinitely for all blocking - /// operations to finish. + /// The returned task can be detached or cancelled; however, any long standing + /// blocking operations will continue until the future yields. /// /// ## Platform Specific Behavior - /// This function behaves identically to `apawn` on `wasm` targets, or if + /// This function behaves identically to `spawn` on `wasm` targets, or if /// the `multi-threaded` feature on the crate is not enabled. /// /// [`spawn`]: Self::spawn From 4896e199581e25a33eab48d2eab058a8307a627a Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 23:16:07 -0800 Subject: [PATCH 15/19] Formatting --- crates/bevy_tasks/src/task_pool.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ab64f3c5cbef7..f00588307a311 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -64,15 +64,15 @@ impl TaskPoolBuilder { } /// The task pool contains a dynamically scaling group of threads for handling blocking tasks. - /// The pool will spin up and down threads as needed, unlike the threads allocated by + /// The pool will spin up and down threads as needed, unlike the threads allocated by /// [`num_threads`] which are always available. By default, zero threads will be spawned at /// initialization, and up to `num_blocking_threads` will be spun up. Upon reaching that limit, /// calls to [`spawn_blocking`] and [`spawn_blocking_async`] will wait until one of the threads /// becomes available. - /// - /// By default, this will use the `BLOCKING_MAX_THREADS` environment variable to determine, + /// + /// By default, this will use the `BLOCKING_MAX_THREADS` environment variable to determine, /// the maximum, or 500 if that environment variable is not set. - /// + /// /// [`spawn_blocking`]: TaskPool::spawn_blocking /// [`spawn_blocking_async`]: TaskPool::spawn_blocking_async pub fn max_blocking_threads(mut self, num_blocking_threads: usize) -> Self { @@ -651,7 +651,7 @@ impl TaskPool { /// of the non-blocking tasks scheduled onto the `TaskPool`. /// /// The returned task can be detached or cancelled; however, any long standing - /// blocking operations will continue until the future yields. + /// blocking operations will continue until the future yields. /// /// ## Platform Specific Behavior /// This function behaves identically to `spawn` on `wasm` targets, or if From 95b7435d27e88e5145b6096388feb14702a25e67 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 23:19:25 -0800 Subject: [PATCH 16/19] Fix typos --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index f00588307a311..fec7a93c7f468 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -36,7 +36,7 @@ pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. /// Otherwise use the logical core count of the system num_threads: Option, - /// If set, this sets the maxinum number of threads used for blocking operations. + /// If set, this sets the maximum number of threads used for blocking operations. /// Otherwise, it will default to the value set by the `BLOCKING_MAX_THREADS` environment variable, /// or 500 if not set. max_blocking_threads: Option, From 274fe31b2142c2151be668b0906fb8cc51979398 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 26 Feb 2024 23:24:31 -0800 Subject: [PATCH 17/19] Fix more typos --- crates/bevy_tasks/src/task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index fec7a93c7f468..130710c5914a4 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -615,7 +615,7 @@ impl TaskPool { /// /// ## Platform Specific Behavior /// Long running blocking operations in browser environments will panic, so the app - /// must yield back to the browser periodically. If you're targetting web platforms, + /// must yield back to the browser periodically. If you're targeting web platforms, /// consider using [`spawn_blocking_async`]. /// /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) @@ -635,7 +635,7 @@ impl TaskPool { /// This function is equivalent to calling `task_pool.spawn_blocking(|| block_on(f))`. /// /// If the future is expected to terminate quickly, or will not spend a - /// signficant amount of time performing blocking CPU-bound or IO-bound + /// significant amount of time performing blocking CPU-bound or IO-bound /// operations, [`spawn`] should be used instead. The ideal use case for /// this function is for launching a future that may involve a combination /// of async IO and blocking operations (i.e. loading large scenes). From 9a646171b6150fb5b3e0042a6513ce742dce7960 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 16 Apr 2024 20:42:30 -0700 Subject: [PATCH 18/19] Remove reference to IO task pool. --- examples/animation/animation_graph.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/animation/animation_graph.rs b/examples/animation/animation_graph.rs index c81a7911e5806..456aecd8d98de 100644 --- a/examples/animation/animation_graph.rs +++ b/examples/animation/animation_graph.rs @@ -20,7 +20,7 @@ use argh::FromArgs; #[cfg(not(target_arch = "wasm32"))] use bevy::asset::io::file::FileAssetReader; #[cfg(not(target_arch = "wasm32"))] -use bevy::tasks::IoTaskPool; +use bevy::tasks::ComputeTaskPool; #[cfg(not(target_arch = "wasm32"))] use ron::ser::PrettyConfig; @@ -180,8 +180,8 @@ fn setup_assets_programmatically( if _save { let animation_graph = animation_graph.clone(); - IoTaskPool::get() - .spawn(async move { + ComputeTaskPool::get() + .spawn_blocking(move || { let mut animation_graph_writer = File::create(Path::join( &FileAssetReader::get_base_path(), Path::join(Path::new("assets"), Path::new(ANIMATION_GRAPH_PATH)), From 1bc4ff443b2d16cac6495e7844b7d9ed6cc60c5e Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 16 Apr 2024 21:02:21 -0700 Subject: [PATCH 19/19] Toml formatting --- crates/bevy_tasks/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 4b16195c2f256..35f560243620d 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -10,10 +10,10 @@ keywords = ["bevy"] [features] multi-threaded = [ - "dep:blocking", - "dep:concurrent-queue", - "dep:async-channel", - "dep:async-task" + "dep:blocking", + "dep:concurrent-queue", + "dep:async-channel", + "dep:async-task", ] [dependencies]