diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 380b1b2b4fd1a..d97bfc528828b 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -18,7 +18,7 @@ use crate::{ MissingAssetLoaderForExtensionError, }; use bevy_ecs::prelude::*; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::tracing::{debug, error, trace, warn}; #[cfg(feature = "trace")] use bevy_utils::{ @@ -175,7 +175,7 @@ impl AssetProcessor { pub fn process_assets(&self) { let start_time = std::time::Instant::now(); debug!("Processing Assets"); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.initialize().await.unwrap(); for source in self.sources().iter_processed() { @@ -325,7 +325,7 @@ impl AssetProcessor { #[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] error!("AddFolder event cannot be handled in single threaded mode (or WASM) yet."); #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.process_assets_internal(scope, source, path) .await @@ -465,7 +465,7 @@ impl AssetProcessor { loop { let mut check_reprocess_queue = std::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { for path in check_reprocess_queue.drain(..) { let processor = self.clone(); let source = self.get_source(path.source()).unwrap(); diff --git a/crates/bevy_asset/src/server/loaders.rs b/crates/bevy_asset/src/server/loaders.rs index d853abd9a96e0..ae4ddcf79ab38 100644 --- a/crates/bevy_asset/src/server/loaders.rs +++ b/crates/bevy_asset/src/server/loaders.rs @@ -3,7 +3,7 @@ use crate::{ path::AssetPath, }; use async_broadcast::RecvError; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::tracing::{error, warn}; #[cfg(feature = "trace")] use bevy_utils::{ @@ -85,7 +85,7 @@ impl AssetLoaders { match maybe_loader { MaybeAssetLoader::Ready(_) => unreachable!(), MaybeAssetLoader::Pending { sender, .. } => { - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let _ = sender.broadcast(loader).await; }) diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 0e4ae6aab1f78..63231880295c6 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -18,9 +18,11 @@ use crate::{ UntypedAssetLoadFailedEvent, UntypedHandle, }; use bevy_ecs::prelude::*; -use bevy_tasks::IoTaskPool; -use bevy_utils::tracing::{error, info}; -use bevy_utils::{CowArc, HashSet}; +use bevy_tasks::ComputeTaskPool; +use bevy_utils::{ + tracing::{error, info}, + CowArc, HashSet, +}; use crossbeam_channel::{Receiver, Sender}; use futures_lite::StreamExt; use info::*; @@ -300,7 +302,7 @@ impl AssetServer { if should_load { let owned_handle = Some(handle.clone().untyped()); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { if let Err(err) = server.load_internal(owned_handle, path, false, None).await { error!("{}", err); @@ -370,7 +372,7 @@ impl AssetServer { let id = handle.id().untyped(); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let path_clone = path.clone(); match server.load_untyped_async(path).await { @@ -555,7 +557,7 @@ impl AssetServer { pub fn reload<'a>(&self, path: impl Into>) { let server = self.clone(); let path = path.into().into_owned(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let mut reloaded = false; @@ -698,7 +700,7 @@ impl AssetServer { let path = path.into_owned(); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let Ok(source) = server.get_source(path.source()) else { error!( diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index 073460b3b3796..5cbd806180b37 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -41,8 +41,7 @@ impl Plugin for TypeRegistrationPlugin { } } -/// Setup of default task pools: [`AsyncComputeTaskPool`](bevy_tasks::AsyncComputeTaskPool), -/// [`ComputeTaskPool`](bevy_tasks::ComputeTaskPool), [`IoTaskPool`](bevy_tasks::IoTaskPool). +/// Setup of default task pool: [`ComputeTaskPool`](bevy_tasks::ComputeTaskPool). #[derive(Default)] pub struct TaskPoolPlugin { /// Options for the [`TaskPool`](bevy_tasks::TaskPool) created at application start. @@ -104,20 +103,13 @@ pub fn update_frame_count(mut frame_count: ResMut) { #[cfg(test)] mod tests { use super::*; - use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; + use bevy_tasks::prelude::ComputeTaskPool; #[test] fn runs_spawn_local_tasks() { let mut app = App::new(); app.add_plugins((TaskPoolPlugin::default(), TypeRegistrationPlugin)); - let (async_tx, async_rx) = crossbeam_channel::unbounded(); - AsyncComputeTaskPool::get() - .spawn_local(async move { - async_tx.send(()).unwrap(); - }) - .detach(); - let (compute_tx, compute_rx) = crossbeam_channel::unbounded(); ComputeTaskPool::get() .spawn_local(async move { @@ -125,18 +117,9 @@ mod tests { }) .detach(); - let (io_tx, io_rx) = crossbeam_channel::unbounded(); - IoTaskPool::get() - .spawn_local(async move { - io_tx.send(()).unwrap(); - }) - .detach(); - app.run(); - async_rx.try_recv().unwrap(); compute_rx.try_recv().unwrap(); - io_rx.try_recv().unwrap(); } #[test] diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 276902fb499da..4c525f123ca98 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,35 +1,6 @@ -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use bevy_utils::tracing::trace; -/// Defines a simple way to determine how many threads to use given the number of remaining cores -/// and number of total cores -#[derive(Clone, Debug)] -pub struct TaskPoolThreadAssignmentPolicy { - /// Force using at least this many threads - pub min_threads: usize, - /// Under no circumstance use more than this many threads for this pool - pub max_threads: usize, - /// Target using this percentage of total cores, clamped by `min_threads` and `max_threads`. It is - /// permitted to use 1.0 to try to use all remaining threads - pub percent: f32, -} - -impl TaskPoolThreadAssignmentPolicy { - /// Determine the number of threads to use for this task pool - fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { - assert!(self.percent >= 0.0); - let mut desired = (total_threads as f32 * self.percent).round() as usize; - - // Limit ourselves to the number of cores available - desired = desired.min(remaining_threads); - - // Clamp by min_threads, max_threads. (This may result in us using more threads than are - // available, this is intended. An example case where this might happen is a device with - // <= 2 threads. - desired.clamp(self.min_threads, self.max_threads) - } -} - /// Helper for configuring and creating the default task pools. For end-users who want full control, /// set up [`TaskPoolPlugin`](super::TaskPoolPlugin) #[derive(Clone, Debug)] @@ -40,13 +11,6 @@ pub struct TaskPoolOptions { /// If the number of physical cores is greater than `max_total_threads`, force using /// `max_total_threads` pub max_total_threads: usize, - - /// Used to determine number of IO threads to allocate - pub io: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of async compute threads to allocate - pub async_compute: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of compute threads to allocate - pub compute: TaskPoolThreadAssignmentPolicy, } impl Default for TaskPoolOptions { @@ -55,27 +19,6 @@ impl Default for TaskPoolOptions { // By default, use however many cores are available on the system min_total_threads: 1, max_total_threads: usize::MAX, - - // Use 25% of cores for IO, at least 1, no more than 4 - io: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use 25% of cores for async compute, at least 1, no more than 4 - async_compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use all remaining cores for compute (at least 1) - compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: usize::MAX, - percent: 1.0, // This 1.0 here means "whatever is left over" - }, } } } @@ -86,7 +29,6 @@ impl TaskPoolOptions { TaskPoolOptions { min_total_threads: thread_count, max_total_threads: thread_count, - ..Default::default() } } @@ -96,57 +38,11 @@ impl TaskPoolOptions { .clamp(self.min_total_threads, self.max_total_threads); trace!("Assigning {} cores to default task pools", total_threads); - let mut remaining_threads = total_threads; - - { - // Determine the number of IO threads we will use - let io_threads = self - .io - .get_number_of_threads(remaining_threads, total_threads); - - trace!("IO Threads: {}", io_threads); - remaining_threads = remaining_threads.saturating_sub(io_threads); - - IoTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of async compute threads we will use - let async_compute_threads = self - .async_compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Async Compute Threads: {}", async_compute_threads); - remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - - AsyncComputeTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of compute threads we will use - // This is intentionally last so that an end user can specify 1.0 as the percent - let compute_threads = self - .compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Compute Threads: {}", compute_threads); - - ComputeTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build() - }); - } + ComputeTaskPool::get_or_init(|| { + TaskPoolBuilder::default() + .num_threads(total_threads) + .thread_name("Compute Task Pool".to_string()) + .build() + }); } } diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index 963b747488837..20ae83f7d00de 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -34,7 +34,7 @@ use bevy_render::{ }; use bevy_scene::Scene; #[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_transform::components::Transform; use bevy_utils::tracing::{error, info_span, warn}; use bevy_utils::{HashMap, HashSet}; @@ -354,7 +354,7 @@ async fn load_gltf<'a, 'b, 'c>( } } else { #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .scope(|scope| { gltf.textures().for_each(|gltf_texture| { let parent_path = load_context.path().parent().unwrap(); diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index 822a77b2e6278..c83705ea55c0a 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -311,7 +311,7 @@ impl Plugin for RenderPlugin { }; // In wasm, spawn a task and detach it for execution #[cfg(target_arch = "wasm32")] - bevy_tasks::IoTaskPool::get() + bevy_tasks::ComputeTaskPool::get() .spawn_local(async_renderer) .detach(); // Otherwise, just block for it to complete diff --git a/crates/bevy_render/src/render_resource/pipeline_cache.rs b/crates/bevy_render/src/render_resource/pipeline_cache.rs index bd20f44e2e3e1..fdf242bf66eba 100644 --- a/crates/bevy_render/src/render_resource/pipeline_cache.rs +++ b/crates/bevy_render/src/render_resource/pipeline_cache.rs @@ -13,7 +13,6 @@ use bevy_utils::{ use naga::valid::Capabilities; use std::{ borrow::Cow, - future::Future, hash::Hash, mem, ops::Deref, @@ -702,7 +701,7 @@ impl PipelineCache { let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); create_pipeline_task( - async move { + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -801,7 +800,7 @@ impl PipelineCache { let shader_cache = self.shader_cache.clone(); let layout_cache = self.layout_cache.clone(); create_pipeline_task( - async move { + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -957,14 +956,16 @@ impl PipelineCache { feature = "multi-threaded" ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, sync: bool, ) -> CachedPipelineState { if !sync { - return CachedPipelineState::Creating(bevy_tasks::AsyncComputeTaskPool::get().spawn(task)); + return CachedPipelineState::Creating( + bevy_tasks::ComputeTaskPool::get().spawn_blocking(task), + ); } - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } @@ -976,10 +977,10 @@ fn create_pipeline_task( not(feature = "multi-threaded") ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, _sync: bool, ) -> CachedPipelineState { - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } diff --git a/crates/bevy_render/src/view/window/screenshot.rs b/crates/bevy_render/src/view/window/screenshot.rs index ebf9b3e223a30..5dca597b5d5c0 100644 --- a/crates/bevy_render/src/view/window/screenshot.rs +++ b/crates/bevy_render/src/view/window/screenshot.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, path::Path, sync::PoisonError}; use bevy_app::Plugin; use bevy_asset::{load_internal_asset, Handle}; use bevy_ecs::{entity::EntityHashMap, prelude::*}; -use bevy_tasks::AsyncComputeTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::tracing::{error, info, info_span}; use std::sync::Mutex; use thiserror::Error; @@ -42,7 +42,8 @@ pub struct ScreenshotAlreadyRequestedError; impl ScreenshotManager { /// Signals the renderer to take a screenshot of this frame. /// - /// The given callback will eventually be called on one of the [`AsyncComputeTaskPool`]s threads. + /// The given callback will eventually be called on one of the [`ComputeTaskPool`]'s + /// blocking threads. pub fn take_screenshot( &mut self, window: Entity, @@ -289,57 +290,58 @@ pub(crate) fn collect_screenshots(world: &mut World) { let pixel_size = texture_format.pixel_size(); let ScreenshotPreparedState { buffer, .. } = window.screenshot_memory.take().unwrap(); - let finish = async move { - let (tx, rx) = async_channel::bounded(1); - let buffer_slice = buffer.slice(..); - // The polling for this map call is done every frame when the command queue is submitted. - buffer_slice.map_async(wgpu::MapMode::Read, move |result| { - let err = result.err(); - if err.is_some() { - panic!("{}", err.unwrap().to_string()); + ComputeTaskPool::get() + .spawn_blocking_async(async move { + let (tx, rx) = async_channel::bounded(1); + let buffer_slice = buffer.slice(..); + // The polling for this map call is done every frame when the command queue is submitted. + buffer_slice.map_async(wgpu::MapMode::Read, move |result| { + let err = result.err(); + if err.is_some() { + panic!("{}", err.unwrap().to_string()); + } + tx.try_send(()).unwrap(); + }); + rx.recv().await.unwrap(); + let data = buffer_slice.get_mapped_range(); + // we immediately move the data to CPU memory to avoid holding the mapped view for long + let mut result = Vec::from(&*data); + drop(data); + drop(buffer); + + if result.len() != ((width * height) as usize * pixel_size) { + // Our buffer has been padded because we needed to align to a multiple of 256. + // We remove this padding here + let initial_row_bytes = width as usize * pixel_size; + let buffered_row_bytes = + align_byte_size(width * pixel_size as u32) as usize; + + let mut take_offset = buffered_row_bytes; + let mut place_offset = initial_row_bytes; + for _ in 1..height { + result.copy_within( + take_offset..take_offset + buffered_row_bytes, + place_offset, + ); + take_offset += buffered_row_bytes; + place_offset += initial_row_bytes; + } + result.truncate(initial_row_bytes * height as usize); } - tx.try_send(()).unwrap(); - }); - rx.recv().await.unwrap(); - let data = buffer_slice.get_mapped_range(); - // we immediately move the data to CPU memory to avoid holding the mapped view for long - let mut result = Vec::from(&*data); - drop(data); - drop(buffer); - - if result.len() != ((width * height) as usize * pixel_size) { - // Our buffer has been padded because we needed to align to a multiple of 256. - // We remove this padding here - let initial_row_bytes = width as usize * pixel_size; - let buffered_row_bytes = align_byte_size(width * pixel_size as u32) as usize; - - let mut take_offset = buffered_row_bytes; - let mut place_offset = initial_row_bytes; - for _ in 1..height { - result.copy_within( - take_offset..take_offset + buffered_row_bytes, - place_offset, - ); - take_offset += buffered_row_bytes; - place_offset += initial_row_bytes; - } - result.truncate(initial_row_bytes * height as usize); - } - screenshot_func(Image::new( - Extent3d { - width, - height, - depth_or_array_layers: 1, - }, - wgpu::TextureDimension::D2, - result, - texture_format, - RenderAssetUsages::RENDER_WORLD, - )); - }; - - AsyncComputeTaskPool::get().spawn(finish).detach(); + screenshot_func(Image::new( + Extent3d { + width, + height, + depth_or_array_layers: 1, + }, + wgpu::TextureDimension::D2, + result, + texture_format, + RenderAssetUsages::RENDER_WORLD, + )); + }) + .detach(); } } } diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 98c4edbb8d49d..35f560243620d 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,7 +9,12 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi-threaded = ["dep:async-channel", "dep:async-task", "dep:concurrent-queue"] +multi-threaded = [ + "dep:blocking", + "dep:concurrent-queue", + "dep:async-channel", + "dep:async-task", +] [dependencies] futures-lite = "2.0.1" @@ -18,6 +23,7 @@ async-channel = { version = "2.2.0", optional = true } async-io = { version = "2.0.0", optional = true } async-task = { version = "4.2.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } +blocking = { version = "1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 1d1a7fb90465b..8c778ae4f92a1 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -25,14 +25,9 @@ bevy provides three different thread pools via which tasks of different kinds ca This currently applies to WASM targets.) The determining factor for what kind of work should go in each pool is latency requirements: -* For CPU-intensive work (tasks that generally spin until completion) we have a standard - [`ComputeTaskPool`] and an [`AsyncComputeTaskPool`]. Work that does not need to be completed to - present the next frame should go to the [`AsyncComputeTaskPool`]. - -* For IO-intensive work (tasks that spend very little time in a "woken" state) we have an - [`IoTaskPool`] whose tasks are expected to complete very quickly. Generally speaking, they should just - await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready - for consumption. (likely via channels) +For CPU-intensive work, the standard [`ComputeTaskPool`] can be used. It can also be used for +non-blocking IO-intensive work (tasks that spend very little time in a "woken" state). +Work that does not need to be completed to present the next frame should use [`TaskPool::spawn_blocking`]. [bevy]: https://bevyengine.org [rayon]: /~https://github.com/rayon-rs/rayon diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 34011532d6b96..c405eaeca98e8 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -24,7 +24,7 @@ pub use single_threaded_task_pool::{FakeTask, Scope, TaskPool, TaskPoolBuilder, mod usages; #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; +pub use usages::ComputeTaskPool; #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] mod thread_executor; @@ -49,7 +49,7 @@ pub mod prelude { block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, + usages::ComputeTaskPool, }; } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 3a32c9e286211..92608625df80b 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -168,6 +168,28 @@ impl TaskPool { self.spawn(future) } + /// Spawns a static future on the JS event loop to be executed at a later point. + /// + /// This is potentially dangerous in browsers when running long standing computations that + /// may block, as the browser will panic if the process does not periodically yield back to + /// the browser. Consider using [`spawn_blocking_async`] instead. + /// + /// [`spawn_blocking_async`]: Self::spawn_blocking_async + pub fn spawn_blocking(&self, f: impl FnOnce() + 'static) -> FakeTask + where + T: 'static, + { + self.spawn(async { f() }) + } + + /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. + pub fn spawn_blocking_async(&self, future: impl Future + 'static) -> FakeTask + where + T: 'static, + { + self.spawn(future) + } + /// Runs a function with the local executor. Typically used to tick /// the local executor on the main thread as it needs to share time with /// other things. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 1e58f128ca2ec..7c703ee0b9d95 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,4 +1,5 @@ use std::{ + env, future::Future, marker::PhantomData, mem, @@ -8,6 +9,7 @@ use std::{ }; use async_task::FallibleTask; +use blocking::unblock; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; @@ -34,6 +36,10 @@ pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. /// Otherwise use the logical core count of the system num_threads: Option, + /// If set, this sets the maximum number of threads used for blocking operations. + /// Otherwise, it will default to the value set by the `BLOCKING_MAX_THREADS` environment variable, + /// or 500 if not set. + max_blocking_threads: Option, /// If set, we'll use the given stack size rather than the system default stack_size: Option, /// Allows customizing the name of the threads - helpful for debugging. If set, threads will @@ -50,13 +56,30 @@ impl TaskPoolBuilder { Self::default() } - /// Override the number of threads created for the pool. If unset, we default to the number + /// Override the number of threads created for the pool. If unset, it will default to the number /// of logical cores of the system pub fn num_threads(mut self, num_threads: usize) -> Self { self.num_threads = Some(num_threads); self } + /// The task pool contains a dynamically scaling group of threads for handling blocking tasks. + /// The pool will spin up and down threads as needed, unlike the threads allocated by + /// [`num_threads`] which are always available. By default, zero threads will be spawned at + /// initialization, and up to `num_blocking_threads` will be spun up. Upon reaching that limit, + /// calls to [`spawn_blocking`] and [`spawn_blocking_async`] will wait until one of the threads + /// becomes available. + /// + /// By default, this will use the `BLOCKING_MAX_THREADS` environment variable to determine, + /// the maximum, or 500 if that environment variable is not set. + /// + /// [`spawn_blocking`]: TaskPool::spawn_blocking + /// [`spawn_blocking_async`]: TaskPool::spawn_blocking_async + pub fn max_blocking_threads(mut self, num_blocking_threads: usize) -> Self { + self.max_blocking_threads = Some(num_blocking_threads); + self + } + /// Override the stack size of the threads created for the pool pub fn stack_size(mut self, stack_size: usize) -> Self { self.stack_size = Some(stack_size); @@ -131,6 +154,10 @@ impl TaskPool { } fn new_internal(builder: TaskPoolBuilder) -> Self { + if let Some(thread_count) = builder.max_blocking_threads { + env::set_var("BLOCKING_MAX_THREADS", thread_count.to_string().as_str()); + } + let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); let executor = Arc::new(async_executor::Executor::new()); @@ -527,8 +554,16 @@ impl TaskPool { /// any case, the pool will execute the task even without polling by the /// end-user. /// - /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should + /// If the provided future is non-`Send`, [`spawn_local`] should /// be used instead. + /// + /// If the provided future performs blocking IO or may have long lasting + /// CPU-bound operations, use [`spawn_blocking`] or [`spawn_blocking_async`] + /// instead. + /// + /// [`spawn_local`]: Self::spawn_local + /// [`spawn_blocking`]: Self::spawn_blocking + /// [`spawn_blocking_async`]: Self::spawn_blocking_async pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, @@ -554,6 +589,81 @@ impl TaskPool { Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) } + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the task pool + /// from driving other futures forward. This function runs the provided + /// closure on a thread dedicated to blocking operations. + /// + /// This call will spawn more blocking threads when they are requested + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When the + /// executor is shutdown, it will wait indefinitely for all blocking operations + /// to finish. + /// + /// ## Platform Specific Behavior + /// Long running blocking operations in browser environments will panic, so the app + /// must yield back to the browser periodically. If you're targeting web platforms, + /// consider using [`spawn_blocking_async`]. + /// + /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) + pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task + where + T: Send + 'static, + { + Task::new(unblock(f)) + } + + /// Spawns a static future onto on a thread where blocking is acceptable. + /// The returned [`Task`] is a future that can be polled for the result. + /// It can also be "detached", allowing the task to continue + /// running even if dropped. In any case, the pool will execute the task + /// even without polling by the end-user. + /// + /// This function is equivalent to calling `task_pool.spawn_blocking(|| block_on(f))`. + /// + /// If the future is expected to terminate quickly, or will not spend a + /// significant amount of time performing blocking CPU-bound or IO-bound + /// operations, [`spawn`] should be used instead. The ideal use case for + /// this function is for launching a future that may involve a combination + /// of async IO and blocking operations (i.e. loading large scenes). + /// + /// This call will spawn more blocking threads when they are requested + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. + /// + /// The returned task can be detached or cancelled; however, any long standing + /// blocking operations will continue until the future yields. + /// + /// ## Platform Specific Behavior + /// This function behaves identically to `spawn` on `wasm` targets, or if + /// the `multi-threaded` feature on the crate is not enabled. + /// + /// [`spawn`]: Self::spawn + /// [`spawn_blocking_async`]: Self::spawn_blocking_async + #[inline] + pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + self.spawn_blocking(|| block_on(f)) + } + /// Runs a function with the local executor. Typically used to tick /// the local executor on the main thread as it needs to share time with /// other things. diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index fda3092b8ebc8..6f98d3d8f250e 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,77 +1,44 @@ use super::TaskPool; use std::{ops::Deref, sync::OnceLock}; -macro_rules! taskpool { - ($(#[$attr:meta])* ($static:ident, $type:ident)) => { - static $static: OnceLock<$type> = OnceLock::new(); +static COMPUTE_TASK_POOL: OnceLock = OnceLock::new(); - $(#[$attr])* - #[derive(Debug)] - pub struct $type(TaskPool); - - impl $type { - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] - pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) - } - - #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ - or returns `None` if it is not initialized.")] - pub fn try_get() -> Option<&'static Self> { - $static.get() - } - - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance.")] - #[doc = ""] - #[doc = " # Panics"] - #[doc = " Panics if the global instance has not been initialized yet."] - pub fn get() -> &'static Self { - $static.get().expect( - concat!( - "The ", - stringify!($type), - " has not been initialized yet. Please call ", - stringify!($type), - "::get_or_init beforehand." - ) - ) - } - } - - impl Deref for $type { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - }; -} - -taskpool! { - /// A newtype for a task pool for CPU-intensive work that must be completed to - /// deliver the next frame +/// A newtype for a task pool for CPU-intensive work that must be completed to +/// deliver the next frame +/// +/// See [`TaskPool`] documentation for details on Bevy tasks. +#[derive(Debug)] +pub struct ComputeTaskPool(TaskPool); + +impl ComputeTaskPool { + /// Gets the global [`ComputeTaskPool`] instance, or initializes it with `f`. + pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { + COMPUTE_TASK_POOL.get_or_init(|| Self(f())) + } + + /// Attempts to get the global [`ComputeTaskPool`] instance, + /// or returns `None` if it is not initialized. + pub fn try_get() -> Option<&'static Self> { + COMPUTE_TASK_POOL.get() + } + + /// Gets the global [`ComputeTaskPool`] instance." /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be - /// completed before the next frame. - (COMPUTE_TASK_POOL, ComputeTaskPool) + /// # Panics + /// Panics if the global instance has not been initialized yet. + pub fn get() -> &'static Self { + COMPUTE_TASK_POOL.get().expect( + "The ComputeTaskPool has not been initialized yet. Please call ComputeTaskPool::get_or_init beforehand" + ) + } } -taskpool! { - /// A newtype for a task pool for CPU-intensive work that may span across multiple frames - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// Use [`ComputeTaskPool`] if the work must be complete before advancing to the next frame. - (ASYNC_COMPUTE_TASK_POOL, AsyncComputeTaskPool) -} +impl Deref for ComputeTaskPool { + type Target = TaskPool; -taskpool! { - /// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a - /// "woken" state) - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - (IO_TASK_POOL, IoTaskPool) + fn deref(&self) -> &Self::Target { + &self.0 + } } /// A function used by `bevy_core` to tick the global tasks pools on the main thread. @@ -86,20 +53,8 @@ pub fn tick_global_task_pools_on_main_thread() { .get() .unwrap() .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); + for _ in 0..100 { + compute_local_executor.try_tick(); + } }); } diff --git a/examples/animation/animation_graph.rs b/examples/animation/animation_graph.rs index c81a7911e5806..456aecd8d98de 100644 --- a/examples/animation/animation_graph.rs +++ b/examples/animation/animation_graph.rs @@ -20,7 +20,7 @@ use argh::FromArgs; #[cfg(not(target_arch = "wasm32"))] use bevy::asset::io::file::FileAssetReader; #[cfg(not(target_arch = "wasm32"))] -use bevy::tasks::IoTaskPool; +use bevy::tasks::ComputeTaskPool; #[cfg(not(target_arch = "wasm32"))] use ron::ser::PrettyConfig; @@ -180,8 +180,8 @@ fn setup_assets_programmatically( if _save { let animation_graph = animation_graph.clone(); - IoTaskPool::get() - .spawn(async move { + ComputeTaskPool::get() + .spawn_blocking(move || { let mut animation_graph_writer = File::create(Path::join( &FileAssetReader::get_base_path(), Path::join(Path::new("assets"), Path::new(ANIMATION_GRAPH_PATH)), diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index bf4b8a26ec807..1dbf8b8bf6333 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -1,11 +1,11 @@ -//! This example shows how to use the ECS and the [`AsyncComputeTaskPool`] +//! This example shows how to use the ECS and blocking tasks on the [`ComputeTaskPool`] //! to spawn, poll, and complete tasks across systems and system ticks. use bevy::{ ecs::system::SystemState, ecs::world::CommandQueue, prelude::*, - tasks::{block_on, futures_lite::future, AsyncComputeTaskPool, Task}, + tasks::{block_on, futures_lite::future, ComputeTaskPool, Task}, }; use rand::Rng; use std::{thread, time::Duration}; @@ -51,15 +51,15 @@ struct ComputeTransform(Task); /// system, [`handle_tasks`], will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes fn spawn_tasks(mut commands: Commands) { - let thread_pool = AsyncComputeTaskPool::get(); + let thread_pool = ComputeTaskPool::get(); for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { - // Spawn new task on the AsyncComputeTaskPool; the task will be + // Spawn a new blocking task on the ComputeTaskPool; the task will be // executed in the background, and the Task future returned by // spawn() can be used to poll for the result let entity = commands.spawn_empty().id(); - let task = thread_pool.spawn(async move { + let task = thread_pool.spawn_blocking(move || { let mut rng = rand::thread_rng(); let duration = Duration::from_secs_f32(rng.gen_range(0.05..0.2)); diff --git a/examples/scene/scene.rs b/examples/scene/scene.rs index a75eba99ff43d..4ea28f2b5a706 100644 --- a/examples/scene/scene.rs +++ b/examples/scene/scene.rs @@ -1,5 +1,5 @@ //! This example illustrates loading scenes from files. -use bevy::{prelude::*, tasks::IoTaskPool, utils::Duration}; +use bevy::{prelude::*, tasks::ComputeTaskPool, utils::Duration}; use std::{fs::File, io::Write}; fn main() { @@ -135,7 +135,7 @@ fn save_scene_system(world: &mut World) { // as they are blocking // This can't work in WASM as there is no filesystem access #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { // Write the scene RON data to file File::create(format!("assets/{NEW_SCENE_FILE_PATH}"))