Path: blob/main/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
6849 views
use alloc::{boxed::Box, vec::Vec};1use bevy_platform::cell::SyncUnsafeCell;2use bevy_platform::sync::Arc;3use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};4use concurrent_queue::ConcurrentQueue;5use core::{any::Any, panic::AssertUnwindSafe};6use fixedbitset::FixedBitSet;7#[cfg(feature = "std")]8use std::eprintln;9use std::sync::{Mutex, MutexGuard};1011#[cfg(feature = "trace")]12use tracing::{info_span, Span};1314use crate::{15error::{ErrorContext, ErrorHandler, Result},16prelude::Resource,17schedule::{18is_apply_deferred, ConditionWithAccess, ExecutorKind, SystemExecutor, SystemSchedule,19SystemWithAccess,20},21system::{RunSystemError, ScheduleSystem},22world::{unsafe_world_cell::UnsafeWorldCell, World},23};24#[cfg(feature = "hotpatching")]25use crate::{prelude::DetectChanges, HotPatchChanges};2627use super::__rust_begin_short_backtrace;2829/// Borrowed data used by the [`MultiThreadedExecutor`].30struct Environment<'env, 'sys> {31executor: &'env MultiThreadedExecutor,32systems: &'sys [SyncUnsafeCell<SystemWithAccess>],33conditions: SyncUnsafeCell<Conditions<'sys>>,34world_cell: UnsafeWorldCell<'env>,35}3637struct Conditions<'a> {38system_conditions: &'a mut [Vec<ConditionWithAccess>],39set_conditions: &'a mut [Vec<ConditionWithAccess>],40sets_with_conditions_of_systems: &'a [FixedBitSet],41systems_in_sets_with_conditions: &'a [FixedBitSet],42}4344impl<'env, 'sys> Environment<'env, 'sys> {45fn new(46executor: &'env MultiThreadedExecutor,47schedule: &'sys mut SystemSchedule,48world: &'env mut World,49) -> Self {50Environment {51executor,52systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),53conditions: SyncUnsafeCell::new(Conditions {54system_conditions: &mut schedule.system_conditions,55set_conditions: &mut schedule.set_conditions,56sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,57systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,58}),59world_cell: world.as_unsafe_world_cell(),60}61}62}6364/// Per-system data used by the [`MultiThreadedExecutor`].65// Copied here because it can't be read from the system when it's running.66struct SystemTaskMetadata {67/// The set of systems whose `component_access_set()` conflicts with this one.68conflicting_systems: FixedBitSet,69/// The set of systems whose `component_access_set()` conflicts with this system's conditions.70/// Note that this is separate from `conflicting_systems` to handle the case where71/// a system is skipped by an earlier system set condition or system stepping,72/// and needs access to run its conditions but not for itself.73condition_conflicting_systems: FixedBitSet,74/// Indices of the systems that directly depend on the system.75dependents: Vec<usize>,76/// Is `true` if the system does not access `!Send` data.77is_send: bool,78/// Is `true` if the system is exclusive.79is_exclusive: bool,80}8182/// The result of running a system that is sent across a channel.83struct SystemResult {84system_index: usize,85}8687/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.88pub struct MultiThreadedExecutor {89/// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.90state: Mutex<ExecutorState>,91/// Queue of system completion events.92system_completion: ConcurrentQueue<SystemResult>,93/// Setting when true applies deferred system buffers after all systems have run94apply_final_deferred: bool,95/// When set, tells the executor that a thread has panicked.96panic_payload: Mutex<Option<Box<dyn Any + Send>>>,97starting_systems: FixedBitSet,98/// Cached tracing span99#[cfg(feature = "trace")]100executor_span: Span,101}102103/// The state of the executor while running.104pub struct ExecutorState {105/// Metadata for scheduling and running system tasks.106system_task_metadata: Vec<SystemTaskMetadata>,107/// The set of systems whose `component_access_set()` conflicts with this system set's conditions.108set_condition_conflicting_systems: Vec<FixedBitSet>,109/// Returns `true` if a system with non-`Send` access is running.110local_thread_running: bool,111/// Returns `true` if an exclusive system is running.112exclusive_running: bool,113/// The number of systems that are running.114num_running_systems: usize,115/// The number of dependencies each system has that have not completed.116num_dependencies_remaining: Vec<usize>,117/// System sets whose conditions have been evaluated.118evaluated_sets: FixedBitSet,119/// Systems that have no remaining dependencies and are waiting to run.120ready_systems: FixedBitSet,121/// copy of `ready_systems`122ready_systems_copy: FixedBitSet,123/// Systems that are running.124running_systems: FixedBitSet,125/// Systems that got skipped.126skipped_systems: FixedBitSet,127/// Systems whose conditions have been evaluated and were run or skipped.128completed_systems: FixedBitSet,129/// Systems that have run but have not had their buffers applied.130unapplied_systems: FixedBitSet,131}132133/// References to data required by the executor.134/// This is copied to each system task so that can invoke the executor when they complete.135// These all need to outlive 'scope in order to be sent to new tasks,136// and keeping them all in a struct means we can use lifetime elision.137#[derive(Copy, Clone)]138struct Context<'scope, 'env, 'sys> {139environment: &'env Environment<'env, 'sys>,140scope: &'scope Scope<'scope, 'env, ()>,141error_handler: ErrorHandler,142}143144impl Default for MultiThreadedExecutor {145fn default() -> Self {146Self::new()147}148}149150impl SystemExecutor for MultiThreadedExecutor {151fn kind(&self) -> ExecutorKind {152ExecutorKind::MultiThreaded153}154155fn init(&mut self, schedule: &SystemSchedule) {156let state = self.state.get_mut().unwrap();157// pre-allocate space158let sys_count = schedule.system_ids.len();159let set_count = schedule.set_ids.len();160161self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));162self.starting_systems = FixedBitSet::with_capacity(sys_count);163state.evaluated_sets = FixedBitSet::with_capacity(set_count);164state.ready_systems = FixedBitSet::with_capacity(sys_count);165state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);166state.running_systems = FixedBitSet::with_capacity(sys_count);167state.completed_systems = FixedBitSet::with_capacity(sys_count);168state.skipped_systems = FixedBitSet::with_capacity(sys_count);169state.unapplied_systems = FixedBitSet::with_capacity(sys_count);170171state.system_task_metadata = Vec::with_capacity(sys_count);172for index in 0..sys_count {173state.system_task_metadata.push(SystemTaskMetadata {174conflicting_systems: FixedBitSet::with_capacity(sys_count),175condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),176dependents: schedule.system_dependents[index].clone(),177is_send: schedule.systems[index].system.is_send(),178is_exclusive: schedule.systems[index].system.is_exclusive(),179});180if schedule.system_dependencies[index] == 0 {181self.starting_systems.insert(index);182}183}184185{186#[cfg(feature = "trace")]187let _span = info_span!("calculate conflicting systems").entered();188for index1 in 0..sys_count {189let system1 = &schedule.systems[index1];190for index2 in 0..index1 {191let system2 = &schedule.systems[index2];192if !system2.access.is_compatible(&system1.access) {193state.system_task_metadata[index1]194.conflicting_systems195.insert(index2);196state.system_task_metadata[index2]197.conflicting_systems198.insert(index1);199}200}201202for index2 in 0..sys_count {203let system2 = &schedule.systems[index2];204if schedule.system_conditions[index1]205.iter()206.any(|condition| !system2.access.is_compatible(&condition.access))207{208state.system_task_metadata[index1]209.condition_conflicting_systems210.insert(index2);211}212}213}214215state.set_condition_conflicting_systems.clear();216state.set_condition_conflicting_systems.reserve(set_count);217for set_idx in 0..set_count {218let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);219for sys_index in 0..sys_count {220let system = &schedule.systems[sys_index];221if schedule.set_conditions[set_idx]222.iter()223.any(|condition| !system.access.is_compatible(&condition.access))224{225conflicting_systems.insert(sys_index);226}227}228state229.set_condition_conflicting_systems230.push(conflicting_systems);231}232}233234state.num_dependencies_remaining = Vec::with_capacity(sys_count);235}236237fn run(238&mut self,239schedule: &mut SystemSchedule,240world: &mut World,241_skip_systems: Option<&FixedBitSet>,242error_handler: ErrorHandler,243) {244let state = self.state.get_mut().unwrap();245// reset counts246if schedule.systems.is_empty() {247return;248}249state.num_running_systems = 0;250state251.num_dependencies_remaining252.clone_from(&schedule.system_dependencies);253state.ready_systems.clone_from(&self.starting_systems);254255// If stepping is enabled, make sure we skip those systems that should256// not be run.257#[cfg(feature = "bevy_debug_stepping")]258if let Some(skipped_systems) = _skip_systems {259debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());260// mark skipped systems as completed261state.completed_systems |= skipped_systems;262263// signal the dependencies for each of the skipped systems, as264// though they had run265for system_index in skipped_systems.ones() {266state.signal_dependents(system_index);267state.ready_systems.remove(system_index);268}269}270271let thread_executor = world272.get_resource::<MainThreadExecutor>()273.map(|e| e.0.clone());274let thread_executor = thread_executor.as_deref();275276let environment = &Environment::new(self, schedule, world);277278ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(279false,280thread_executor,281|scope| {282let context = Context {283environment,284scope,285error_handler,286};287288// The first tick won't need to process finished systems, but we still need to run the loop in289// tick_executor() in case a system completes while the first tick still holds the mutex.290context.tick_executor();291},292);293294// End the borrows of self and world in environment by copying out the reference to systems.295let systems = environment.systems;296297let state = self.state.get_mut().unwrap();298if self.apply_final_deferred {299// Do one final apply buffers after all systems have completed300// Commands should be applied while on the scope's thread, not the executor's thread301let res = apply_deferred(&state.unapplied_systems, systems, world);302if let Err(payload) = res {303let panic_payload = self.panic_payload.get_mut().unwrap();304*panic_payload = Some(payload);305}306state.unapplied_systems.clear();307}308309// check to see if there was a panic310let payload = self.panic_payload.get_mut().unwrap();311if let Some(payload) = payload.take() {312std::panic::resume_unwind(payload);313}314315debug_assert!(state.ready_systems.is_clear());316debug_assert!(state.running_systems.is_clear());317state.evaluated_sets.clear();318state.skipped_systems.clear();319state.completed_systems.clear();320}321322fn set_apply_final_deferred(&mut self, value: bool) {323self.apply_final_deferred = value;324}325}326327impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {328fn system_completed(329&self,330system_index: usize,331res: Result<(), Box<dyn Any + Send>>,332system: &ScheduleSystem,333) {334// tell the executor that the system finished335self.environment336.executor337.system_completion338.push(SystemResult { system_index })339.unwrap_or_else(|error| unreachable!("{}", error));340if let Err(payload) = res {341#[cfg(feature = "std")]342#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]343{344eprintln!("Encountered a panic in system `{}`!", system.name());345}346// set the payload to propagate the error347{348let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();349*panic_payload = Some(payload);350}351}352self.tick_executor();353}354355#[expect(356clippy::mut_from_ref,357reason = "Field is only accessed here and is guarded by lock with a documented safety comment"358)]359fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {360let guard = self.environment.executor.state.try_lock().ok()?;361// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and362// is synchronized by the lock on the executor state.363let conditions = unsafe { &mut *self.environment.conditions.get() };364Some((conditions, guard))365}366367fn tick_executor(&self) {368// Ensure that the executor handles any events pushed to the system_completion queue by this thread.369// If this thread acquires the lock, the executor runs after the push() and they are processed.370// If this thread does not acquire the lock, then the is_empty() check on the other thread runs371// after the lock is released, which is after try_lock() failed, which is after the push()372// on this thread, so the is_empty() check will see the new events and loop.373loop {374let Some((conditions, mut guard)) = self.try_lock() else {375return;376};377guard.tick(self, conditions);378// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.379drop(guard);380if self.environment.executor.system_completion.is_empty() {381return;382}383}384}385}386387impl MultiThreadedExecutor {388/// Creates a new `multi_threaded` executor for use with a [`Schedule`].389///390/// [`Schedule`]: crate::schedule::Schedule391pub fn new() -> Self {392Self {393state: Mutex::new(ExecutorState::new()),394system_completion: ConcurrentQueue::unbounded(),395starting_systems: FixedBitSet::new(),396apply_final_deferred: true,397panic_payload: Mutex::new(None),398#[cfg(feature = "trace")]399executor_span: info_span!("multithreaded executor"),400}401}402}403404impl ExecutorState {405fn new() -> Self {406Self {407system_task_metadata: Vec::new(),408set_condition_conflicting_systems: Vec::new(),409num_running_systems: 0,410num_dependencies_remaining: Vec::new(),411local_thread_running: false,412exclusive_running: false,413evaluated_sets: FixedBitSet::new(),414ready_systems: FixedBitSet::new(),415ready_systems_copy: FixedBitSet::new(),416running_systems: FixedBitSet::new(),417skipped_systems: FixedBitSet::new(),418completed_systems: FixedBitSet::new(),419unapplied_systems: FixedBitSet::new(),420}421}422423fn tick(&mut self, context: &Context, conditions: &mut Conditions) {424#[cfg(feature = "trace")]425let _span = context.environment.executor.executor_span.enter();426427for result in context.environment.executor.system_completion.try_iter() {428self.finish_system_and_handle_dependents(result);429}430431// SAFETY:432// - `finish_system_and_handle_dependents` has updated the currently running systems.433// - `rebuild_active_access` locks access for all currently running systems.434unsafe {435self.spawn_system_tasks(context, conditions);436}437}438439/// # Safety440/// - Caller must ensure that `self.ready_systems` does not contain any systems that441/// have been mutably borrowed (such as the systems currently running).442/// - `world_cell` must have permission to access all world data (not counting443/// any world data that is claimed by systems currently running on this executor).444unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {445if self.exclusive_running {446return;447}448449#[cfg(feature = "hotpatching")]450let hotpatch_tick = context451.environment452.world_cell453.get_resource_ref::<HotPatchChanges>()454.map(|r| r.last_changed())455.unwrap_or_default();456457// can't borrow since loop mutably borrows `self`458let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);459460// Skipping systems may cause their dependents to become ready immediately.461// If that happens, we need to run again immediately or we may fail to spawn those dependents.462let mut check_for_new_ready_systems = true;463while check_for_new_ready_systems {464check_for_new_ready_systems = false;465466ready_systems.clone_from(&self.ready_systems);467468for system_index in ready_systems.ones() {469debug_assert!(!self.running_systems.contains(system_index));470// SAFETY: Caller assured that these systems are not running.471// Therefore, no other reference to this system exists and there is no aliasing.472let system =473&mut unsafe { &mut *context.environment.systems[system_index].get() }.system;474475#[cfg(feature = "hotpatching")]476if hotpatch_tick.is_newer_than(477system.get_last_run(),478context.environment.world_cell.change_tick(),479) {480system.refresh_hotpatch();481}482483if !self.can_run(system_index, conditions) {484// NOTE: exclusive systems with ambiguities are susceptible to485// being significantly displaced here (compared to single-threaded order)486// if systems after them in topological order can run487// if that becomes an issue, `break;` if exclusive system488continue;489}490491self.ready_systems.remove(system_index);492493// SAFETY: `can_run` returned true, which means that:494// - There can be no systems running whose accesses would conflict with any conditions.495if unsafe {496!self.should_run(497system_index,498system,499conditions,500context.environment.world_cell,501context.error_handler,502)503} {504self.skip_system_and_signal_dependents(system_index);505// signal_dependents may have set more systems to ready.506check_for_new_ready_systems = true;507continue;508}509510self.running_systems.insert(system_index);511self.num_running_systems += 1;512513if self.system_task_metadata[system_index].is_exclusive {514// SAFETY: `can_run` returned true for this system,515// which means no systems are currently borrowed.516unsafe {517self.spawn_exclusive_system_task(context, system_index);518}519check_for_new_ready_systems = false;520break;521}522523// SAFETY:524// - Caller ensured no other reference to this system exists.525// - `system_task_metadata[system_index].is_exclusive` is `false`,526// so `System::is_exclusive` returned `false` when we called it.527// - `can_run` returned true, so no systems with conflicting world access are running.528unsafe {529self.spawn_system_task(context, system_index);530}531}532}533534// give back535self.ready_systems_copy = ready_systems;536}537538fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {539let system_meta = &self.system_task_metadata[system_index];540if system_meta.is_exclusive && self.num_running_systems > 0 {541return false;542}543544if !system_meta.is_send && self.local_thread_running {545return false;546}547548// TODO: an earlier out if world's archetypes did not change549for set_idx in conditions.sets_with_conditions_of_systems[system_index]550.difference(&self.evaluated_sets)551{552if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {553return false;554}555}556557if !system_meta558.condition_conflicting_systems559.is_disjoint(&self.running_systems)560{561return false;562}563564if !self.skipped_systems.contains(system_index)565&& !system_meta566.conflicting_systems567.is_disjoint(&self.running_systems)568{569return false;570}571572true573}574575/// # Safety576/// * `world` must have permission to read any world data required by577/// the system's conditions: this includes conditions for the system578/// itself, and conditions for any of the system's sets.579unsafe fn should_run(580&mut self,581system_index: usize,582system: &mut ScheduleSystem,583conditions: &mut Conditions,584world: UnsafeWorldCell,585error_handler: ErrorHandler,586) -> bool {587let mut should_run = !self.skipped_systems.contains(system_index);588589for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {590if self.evaluated_sets.contains(set_idx) {591continue;592}593594// Evaluate the system set's conditions.595// SAFETY:596// - The caller ensures that `world` has permission to read any data597// required by the conditions.598let set_conditions_met = unsafe {599evaluate_and_fold_conditions(600&mut conditions.set_conditions[set_idx],601world,602error_handler,603system,604true,605)606};607608if !set_conditions_met {609self.skipped_systems610.union_with(&conditions.systems_in_sets_with_conditions[set_idx]);611}612613should_run &= set_conditions_met;614self.evaluated_sets.insert(set_idx);615}616617// Evaluate the system's conditions.618// SAFETY:619// - The caller ensures that `world` has permission to read any data620// required by the conditions.621let system_conditions_met = unsafe {622evaluate_and_fold_conditions(623&mut conditions.system_conditions[system_index],624world,625error_handler,626system,627false,628)629};630631if !system_conditions_met {632self.skipped_systems.insert(system_index);633}634635should_run &= system_conditions_met;636637if should_run {638// SAFETY:639// - The caller ensures that `world` has permission to read any data640// required by the system.641let valid_params = match unsafe { system.validate_param_unsafe(world) } {642Ok(()) => true,643Err(e) => {644if !e.skipped {645error_handler(646e.into(),647ErrorContext::System {648name: system.name(),649last_run: system.get_last_run(),650},651);652}653false654}655};656if !valid_params {657self.skipped_systems.insert(system_index);658}659660should_run &= valid_params;661}662663should_run664}665666/// # Safety667/// - Caller must not alias systems that are running.668/// - `is_exclusive` must have returned `false` for the specified system.669/// - `world` must have permission to access the world data670/// used by the specified system.671unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {672// SAFETY: this system is not running, no other reference exists673let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;674// Move the full context object into the new future.675let context = *context;676677let system_meta = &self.system_task_metadata[system_index];678679let task = async move {680let res = std::panic::catch_unwind(AssertUnwindSafe(|| {681// SAFETY:682// - The caller ensures that we have permission to683// access the world data used by the system.684// - `is_exclusive` returned false685unsafe {686if let Err(RunSystemError::Failed(err)) =687__rust_begin_short_backtrace::run_unsafe(688system,689context.environment.world_cell,690)691{692(context.error_handler)(693err,694ErrorContext::System {695name: system.name(),696last_run: system.get_last_run(),697},698);699}700};701}));702context.system_completed(system_index, res, system);703};704705if system_meta.is_send {706context.scope.spawn(task);707} else {708self.local_thread_running = true;709context.scope.spawn_on_external(task);710}711}712713/// # Safety714/// Caller must ensure no systems are currently borrowed.715unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {716// SAFETY: this system is not running, no other reference exists717let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;718// Move the full context object into the new future.719let context = *context;720721if is_apply_deferred(&**system) {722// TODO: avoid allocation723let unapplied_systems = self.unapplied_systems.clone();724self.unapplied_systems.clear();725let task = async move {726// SAFETY: `can_run` returned true for this system, which means727// that no other systems currently have access to the world.728let world = unsafe { context.environment.world_cell.world_mut() };729let res = apply_deferred(&unapplied_systems, context.environment.systems, world);730context.system_completed(system_index, res, system);731};732733context.scope.spawn_on_scope(task);734} else {735let task = async move {736// SAFETY: `can_run` returned true for this system, which means737// that no other systems currently have access to the world.738let world = unsafe { context.environment.world_cell.world_mut() };739let res = std::panic::catch_unwind(AssertUnwindSafe(|| {740if let Err(RunSystemError::Failed(err)) =741__rust_begin_short_backtrace::run(system, world)742{743(context.error_handler)(744err,745ErrorContext::System {746name: system.name(),747last_run: system.get_last_run(),748},749);750}751}));752context.system_completed(system_index, res, system);753};754755context.scope.spawn_on_scope(task);756}757758self.exclusive_running = true;759self.local_thread_running = true;760}761762fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {763let SystemResult { system_index, .. } = result;764765if self.system_task_metadata[system_index].is_exclusive {766self.exclusive_running = false;767}768769if !self.system_task_metadata[system_index].is_send {770self.local_thread_running = false;771}772773debug_assert!(self.num_running_systems >= 1);774self.num_running_systems -= 1;775self.running_systems.remove(system_index);776self.completed_systems.insert(system_index);777self.unapplied_systems.insert(system_index);778779self.signal_dependents(system_index);780}781782fn skip_system_and_signal_dependents(&mut self, system_index: usize) {783self.completed_systems.insert(system_index);784self.signal_dependents(system_index);785}786787fn signal_dependents(&mut self, system_index: usize) {788for &dep_idx in &self.system_task_metadata[system_index].dependents {789let remaining = &mut self.num_dependencies_remaining[dep_idx];790debug_assert!(*remaining >= 1);791*remaining -= 1;792if *remaining == 0 && !self.completed_systems.contains(dep_idx) {793self.ready_systems.insert(dep_idx);794}795}796}797}798799fn apply_deferred(800unapplied_systems: &FixedBitSet,801systems: &[SyncUnsafeCell<SystemWithAccess>],802world: &mut World,803) -> Result<(), Box<dyn Any + Send>> {804for system_index in unapplied_systems.ones() {805// SAFETY: none of these systems are running, no other references exist806let system = &mut unsafe { &mut *systems[system_index].get() }.system;807let res = std::panic::catch_unwind(AssertUnwindSafe(|| {808system.apply_deferred(world);809}));810if let Err(payload) = res {811#[cfg(feature = "std")]812#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]813{814eprintln!(815"Encountered a panic when applying buffers for system `{}`!",816system.name()817);818}819return Err(payload);820}821}822Ok(())823}824825/// # Safety826/// - `world` must have permission to read any world data827/// required by `conditions`.828unsafe fn evaluate_and_fold_conditions(829conditions: &mut [ConditionWithAccess],830world: UnsafeWorldCell,831error_handler: ErrorHandler,832for_system: &ScheduleSystem,833on_set: bool,834) -> bool {835#[expect(836clippy::unnecessary_fold,837reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."838)]839conditions840.iter_mut()841.map(|ConditionWithAccess { condition, .. }| {842// SAFETY:843// - The caller ensures that `world` has permission to read any data844// required by the condition.845unsafe { condition.validate_param_unsafe(world) }846.map_err(From::from)847.and_then(|()| {848// SAFETY:849// - The caller ensures that `world` has permission to read any data850// required by the condition.851unsafe {852__rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world)853}854})855.unwrap_or_else(|err| {856if let RunSystemError::Failed(err) = err {857error_handler(858err,859ErrorContext::RunCondition {860name: condition.name(),861last_run: condition.get_last_run(),862system: for_system.name(),863on_set,864},865);866};867false868})869})870.fold(true, |acc, res| acc && res)871}872873/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread874#[derive(Resource, Clone)]875pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);876877impl Default for MainThreadExecutor {878fn default() -> Self {879Self::new()880}881}882883impl MainThreadExecutor {884/// Creates a new executor that can be used to run systems on the main thread.885pub fn new() -> Self {886MainThreadExecutor(TaskPool::get_thread_executor())887}888}889890#[cfg(test)]891mod tests {892use crate::{893prelude::Resource,894schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},895system::Commands,896world::World,897};898899#[derive(Resource)]900struct R;901902#[test]903fn skipped_systems_notify_dependents() {904let mut world = World::new();905let mut schedule = Schedule::default();906schedule.set_executor_kind(ExecutorKind::MultiThreaded);907schedule.add_systems(908(909(|| {}).run_if(|| false),910// This system depends on a system that is always skipped.911|mut commands: Commands| {912commands.insert_resource(R);913},914)915.chain(),916);917schedule.run(&mut world);918assert!(world.get_resource::<R>().is_some());919}920921/// Regression test for a weird bug flagged by MIRI in922/// `spawn_exclusive_system_task`, related to a `&mut World` being captured923/// inside an `async` block and somehow remaining alive even after its last use.924#[test]925fn check_spawn_exclusive_system_task_miri() {926let mut world = World::new();927let mut schedule = Schedule::default();928schedule.set_executor_kind(ExecutorKind::MultiThreaded);929schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());930schedule.run(&mut world);931}932}933934935