Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
6849 views
1
use alloc::{boxed::Box, vec::Vec};
2
use bevy_platform::cell::SyncUnsafeCell;
3
use bevy_platform::sync::Arc;
4
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
5
use concurrent_queue::ConcurrentQueue;
6
use core::{any::Any, panic::AssertUnwindSafe};
7
use fixedbitset::FixedBitSet;
8
#[cfg(feature = "std")]
9
use std::eprintln;
10
use std::sync::{Mutex, MutexGuard};
11
12
#[cfg(feature = "trace")]
13
use tracing::{info_span, Span};
14
15
use crate::{
16
error::{ErrorContext, ErrorHandler, Result},
17
prelude::Resource,
18
schedule::{
19
is_apply_deferred, ConditionWithAccess, ExecutorKind, SystemExecutor, SystemSchedule,
20
SystemWithAccess,
21
},
22
system::{RunSystemError, ScheduleSystem},
23
world::{unsafe_world_cell::UnsafeWorldCell, World},
24
};
25
#[cfg(feature = "hotpatching")]
26
use crate::{prelude::DetectChanges, HotPatchChanges};
27
28
use super::__rust_begin_short_backtrace;
29
30
/// Borrowed data used by the [`MultiThreadedExecutor`].
31
struct Environment<'env, 'sys> {
32
executor: &'env MultiThreadedExecutor,
33
systems: &'sys [SyncUnsafeCell<SystemWithAccess>],
34
conditions: SyncUnsafeCell<Conditions<'sys>>,
35
world_cell: UnsafeWorldCell<'env>,
36
}
37
38
struct Conditions<'a> {
39
system_conditions: &'a mut [Vec<ConditionWithAccess>],
40
set_conditions: &'a mut [Vec<ConditionWithAccess>],
41
sets_with_conditions_of_systems: &'a [FixedBitSet],
42
systems_in_sets_with_conditions: &'a [FixedBitSet],
43
}
44
45
impl<'env, 'sys> Environment<'env, 'sys> {
46
fn new(
47
executor: &'env MultiThreadedExecutor,
48
schedule: &'sys mut SystemSchedule,
49
world: &'env mut World,
50
) -> Self {
51
Environment {
52
executor,
53
systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
54
conditions: SyncUnsafeCell::new(Conditions {
55
system_conditions: &mut schedule.system_conditions,
56
set_conditions: &mut schedule.set_conditions,
57
sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
58
systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
59
}),
60
world_cell: world.as_unsafe_world_cell(),
61
}
62
}
63
}
64
65
/// Per-system data used by the [`MultiThreadedExecutor`].
66
// Copied here because it can't be read from the system when it's running.
67
struct SystemTaskMetadata {
68
/// The set of systems whose `component_access_set()` conflicts with this one.
69
conflicting_systems: FixedBitSet,
70
/// The set of systems whose `component_access_set()` conflicts with this system's conditions.
71
/// Note that this is separate from `conflicting_systems` to handle the case where
72
/// a system is skipped by an earlier system set condition or system stepping,
73
/// and needs access to run its conditions but not for itself.
74
condition_conflicting_systems: FixedBitSet,
75
/// Indices of the systems that directly depend on the system.
76
dependents: Vec<usize>,
77
/// Is `true` if the system does not access `!Send` data.
78
is_send: bool,
79
/// Is `true` if the system is exclusive.
80
is_exclusive: bool,
81
}
82
83
/// The result of running a system that is sent across a channel.
84
struct SystemResult {
85
system_index: usize,
86
}
87
88
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
89
pub struct MultiThreadedExecutor {
90
/// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
91
state: Mutex<ExecutorState>,
92
/// Queue of system completion events.
93
system_completion: ConcurrentQueue<SystemResult>,
94
/// Setting when true applies deferred system buffers after all systems have run
95
apply_final_deferred: bool,
96
/// When set, tells the executor that a thread has panicked.
97
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
98
starting_systems: FixedBitSet,
99
/// Cached tracing span
100
#[cfg(feature = "trace")]
101
executor_span: Span,
102
}
103
104
/// The state of the executor while running.
105
pub struct ExecutorState {
106
/// Metadata for scheduling and running system tasks.
107
system_task_metadata: Vec<SystemTaskMetadata>,
108
/// The set of systems whose `component_access_set()` conflicts with this system set's conditions.
109
set_condition_conflicting_systems: Vec<FixedBitSet>,
110
/// Returns `true` if a system with non-`Send` access is running.
111
local_thread_running: bool,
112
/// Returns `true` if an exclusive system is running.
113
exclusive_running: bool,
114
/// The number of systems that are running.
115
num_running_systems: usize,
116
/// The number of dependencies each system has that have not completed.
117
num_dependencies_remaining: Vec<usize>,
118
/// System sets whose conditions have been evaluated.
119
evaluated_sets: FixedBitSet,
120
/// Systems that have no remaining dependencies and are waiting to run.
121
ready_systems: FixedBitSet,
122
/// copy of `ready_systems`
123
ready_systems_copy: FixedBitSet,
124
/// Systems that are running.
125
running_systems: FixedBitSet,
126
/// Systems that got skipped.
127
skipped_systems: FixedBitSet,
128
/// Systems whose conditions have been evaluated and were run or skipped.
129
completed_systems: FixedBitSet,
130
/// Systems that have run but have not had their buffers applied.
131
unapplied_systems: FixedBitSet,
132
}
133
134
/// References to data required by the executor.
135
/// This is copied to each system task so that can invoke the executor when they complete.
136
// These all need to outlive 'scope in order to be sent to new tasks,
137
// and keeping them all in a struct means we can use lifetime elision.
138
#[derive(Copy, Clone)]
139
struct Context<'scope, 'env, 'sys> {
140
environment: &'env Environment<'env, 'sys>,
141
scope: &'scope Scope<'scope, 'env, ()>,
142
error_handler: ErrorHandler,
143
}
144
145
impl Default for MultiThreadedExecutor {
146
fn default() -> Self {
147
Self::new()
148
}
149
}
150
151
impl SystemExecutor for MultiThreadedExecutor {
152
fn kind(&self) -> ExecutorKind {
153
ExecutorKind::MultiThreaded
154
}
155
156
fn init(&mut self, schedule: &SystemSchedule) {
157
let state = self.state.get_mut().unwrap();
158
// pre-allocate space
159
let sys_count = schedule.system_ids.len();
160
let set_count = schedule.set_ids.len();
161
162
self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
163
self.starting_systems = FixedBitSet::with_capacity(sys_count);
164
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
165
state.ready_systems = FixedBitSet::with_capacity(sys_count);
166
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
167
state.running_systems = FixedBitSet::with_capacity(sys_count);
168
state.completed_systems = FixedBitSet::with_capacity(sys_count);
169
state.skipped_systems = FixedBitSet::with_capacity(sys_count);
170
state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
171
172
state.system_task_metadata = Vec::with_capacity(sys_count);
173
for index in 0..sys_count {
174
state.system_task_metadata.push(SystemTaskMetadata {
175
conflicting_systems: FixedBitSet::with_capacity(sys_count),
176
condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
177
dependents: schedule.system_dependents[index].clone(),
178
is_send: schedule.systems[index].system.is_send(),
179
is_exclusive: schedule.systems[index].system.is_exclusive(),
180
});
181
if schedule.system_dependencies[index] == 0 {
182
self.starting_systems.insert(index);
183
}
184
}
185
186
{
187
#[cfg(feature = "trace")]
188
let _span = info_span!("calculate conflicting systems").entered();
189
for index1 in 0..sys_count {
190
let system1 = &schedule.systems[index1];
191
for index2 in 0..index1 {
192
let system2 = &schedule.systems[index2];
193
if !system2.access.is_compatible(&system1.access) {
194
state.system_task_metadata[index1]
195
.conflicting_systems
196
.insert(index2);
197
state.system_task_metadata[index2]
198
.conflicting_systems
199
.insert(index1);
200
}
201
}
202
203
for index2 in 0..sys_count {
204
let system2 = &schedule.systems[index2];
205
if schedule.system_conditions[index1]
206
.iter()
207
.any(|condition| !system2.access.is_compatible(&condition.access))
208
{
209
state.system_task_metadata[index1]
210
.condition_conflicting_systems
211
.insert(index2);
212
}
213
}
214
}
215
216
state.set_condition_conflicting_systems.clear();
217
state.set_condition_conflicting_systems.reserve(set_count);
218
for set_idx in 0..set_count {
219
let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
220
for sys_index in 0..sys_count {
221
let system = &schedule.systems[sys_index];
222
if schedule.set_conditions[set_idx]
223
.iter()
224
.any(|condition| !system.access.is_compatible(&condition.access))
225
{
226
conflicting_systems.insert(sys_index);
227
}
228
}
229
state
230
.set_condition_conflicting_systems
231
.push(conflicting_systems);
232
}
233
}
234
235
state.num_dependencies_remaining = Vec::with_capacity(sys_count);
236
}
237
238
fn run(
239
&mut self,
240
schedule: &mut SystemSchedule,
241
world: &mut World,
242
_skip_systems: Option<&FixedBitSet>,
243
error_handler: ErrorHandler,
244
) {
245
let state = self.state.get_mut().unwrap();
246
// reset counts
247
if schedule.systems.is_empty() {
248
return;
249
}
250
state.num_running_systems = 0;
251
state
252
.num_dependencies_remaining
253
.clone_from(&schedule.system_dependencies);
254
state.ready_systems.clone_from(&self.starting_systems);
255
256
// If stepping is enabled, make sure we skip those systems that should
257
// not be run.
258
#[cfg(feature = "bevy_debug_stepping")]
259
if let Some(skipped_systems) = _skip_systems {
260
debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
261
// mark skipped systems as completed
262
state.completed_systems |= skipped_systems;
263
264
// signal the dependencies for each of the skipped systems, as
265
// though they had run
266
for system_index in skipped_systems.ones() {
267
state.signal_dependents(system_index);
268
state.ready_systems.remove(system_index);
269
}
270
}
271
272
let thread_executor = world
273
.get_resource::<MainThreadExecutor>()
274
.map(|e| e.0.clone());
275
let thread_executor = thread_executor.as_deref();
276
277
let environment = &Environment::new(self, schedule, world);
278
279
ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
280
false,
281
thread_executor,
282
|scope| {
283
let context = Context {
284
environment,
285
scope,
286
error_handler,
287
};
288
289
// The first tick won't need to process finished systems, but we still need to run the loop in
290
// tick_executor() in case a system completes while the first tick still holds the mutex.
291
context.tick_executor();
292
},
293
);
294
295
// End the borrows of self and world in environment by copying out the reference to systems.
296
let systems = environment.systems;
297
298
let state = self.state.get_mut().unwrap();
299
if self.apply_final_deferred {
300
// Do one final apply buffers after all systems have completed
301
// Commands should be applied while on the scope's thread, not the executor's thread
302
let res = apply_deferred(&state.unapplied_systems, systems, world);
303
if let Err(payload) = res {
304
let panic_payload = self.panic_payload.get_mut().unwrap();
305
*panic_payload = Some(payload);
306
}
307
state.unapplied_systems.clear();
308
}
309
310
// check to see if there was a panic
311
let payload = self.panic_payload.get_mut().unwrap();
312
if let Some(payload) = payload.take() {
313
std::panic::resume_unwind(payload);
314
}
315
316
debug_assert!(state.ready_systems.is_clear());
317
debug_assert!(state.running_systems.is_clear());
318
state.evaluated_sets.clear();
319
state.skipped_systems.clear();
320
state.completed_systems.clear();
321
}
322
323
fn set_apply_final_deferred(&mut self, value: bool) {
324
self.apply_final_deferred = value;
325
}
326
}
327
328
impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
329
fn system_completed(
330
&self,
331
system_index: usize,
332
res: Result<(), Box<dyn Any + Send>>,
333
system: &ScheduleSystem,
334
) {
335
// tell the executor that the system finished
336
self.environment
337
.executor
338
.system_completion
339
.push(SystemResult { system_index })
340
.unwrap_or_else(|error| unreachable!("{}", error));
341
if let Err(payload) = res {
342
#[cfg(feature = "std")]
343
#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
344
{
345
eprintln!("Encountered a panic in system `{}`!", system.name());
346
}
347
// set the payload to propagate the error
348
{
349
let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
350
*panic_payload = Some(payload);
351
}
352
}
353
self.tick_executor();
354
}
355
356
#[expect(
357
clippy::mut_from_ref,
358
reason = "Field is only accessed here and is guarded by lock with a documented safety comment"
359
)]
360
fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
361
let guard = self.environment.executor.state.try_lock().ok()?;
362
// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
363
// is synchronized by the lock on the executor state.
364
let conditions = unsafe { &mut *self.environment.conditions.get() };
365
Some((conditions, guard))
366
}
367
368
fn tick_executor(&self) {
369
// Ensure that the executor handles any events pushed to the system_completion queue by this thread.
370
// If this thread acquires the lock, the executor runs after the push() and they are processed.
371
// If this thread does not acquire the lock, then the is_empty() check on the other thread runs
372
// after the lock is released, which is after try_lock() failed, which is after the push()
373
// on this thread, so the is_empty() check will see the new events and loop.
374
loop {
375
let Some((conditions, mut guard)) = self.try_lock() else {
376
return;
377
};
378
guard.tick(self, conditions);
379
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
380
drop(guard);
381
if self.environment.executor.system_completion.is_empty() {
382
return;
383
}
384
}
385
}
386
}
387
388
impl MultiThreadedExecutor {
389
/// Creates a new `multi_threaded` executor for use with a [`Schedule`].
390
///
391
/// [`Schedule`]: crate::schedule::Schedule
392
pub fn new() -> Self {
393
Self {
394
state: Mutex::new(ExecutorState::new()),
395
system_completion: ConcurrentQueue::unbounded(),
396
starting_systems: FixedBitSet::new(),
397
apply_final_deferred: true,
398
panic_payload: Mutex::new(None),
399
#[cfg(feature = "trace")]
400
executor_span: info_span!("multithreaded executor"),
401
}
402
}
403
}
404
405
impl ExecutorState {
406
fn new() -> Self {
407
Self {
408
system_task_metadata: Vec::new(),
409
set_condition_conflicting_systems: Vec::new(),
410
num_running_systems: 0,
411
num_dependencies_remaining: Vec::new(),
412
local_thread_running: false,
413
exclusive_running: false,
414
evaluated_sets: FixedBitSet::new(),
415
ready_systems: FixedBitSet::new(),
416
ready_systems_copy: FixedBitSet::new(),
417
running_systems: FixedBitSet::new(),
418
skipped_systems: FixedBitSet::new(),
419
completed_systems: FixedBitSet::new(),
420
unapplied_systems: FixedBitSet::new(),
421
}
422
}
423
424
fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
425
#[cfg(feature = "trace")]
426
let _span = context.environment.executor.executor_span.enter();
427
428
for result in context.environment.executor.system_completion.try_iter() {
429
self.finish_system_and_handle_dependents(result);
430
}
431
432
// SAFETY:
433
// - `finish_system_and_handle_dependents` has updated the currently running systems.
434
// - `rebuild_active_access` locks access for all currently running systems.
435
unsafe {
436
self.spawn_system_tasks(context, conditions);
437
}
438
}
439
440
/// # Safety
441
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
442
/// have been mutably borrowed (such as the systems currently running).
443
/// - `world_cell` must have permission to access all world data (not counting
444
/// any world data that is claimed by systems currently running on this executor).
445
unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
446
if self.exclusive_running {
447
return;
448
}
449
450
#[cfg(feature = "hotpatching")]
451
let hotpatch_tick = context
452
.environment
453
.world_cell
454
.get_resource_ref::<HotPatchChanges>()
455
.map(|r| r.last_changed())
456
.unwrap_or_default();
457
458
// can't borrow since loop mutably borrows `self`
459
let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
460
461
// Skipping systems may cause their dependents to become ready immediately.
462
// If that happens, we need to run again immediately or we may fail to spawn those dependents.
463
let mut check_for_new_ready_systems = true;
464
while check_for_new_ready_systems {
465
check_for_new_ready_systems = false;
466
467
ready_systems.clone_from(&self.ready_systems);
468
469
for system_index in ready_systems.ones() {
470
debug_assert!(!self.running_systems.contains(system_index));
471
// SAFETY: Caller assured that these systems are not running.
472
// Therefore, no other reference to this system exists and there is no aliasing.
473
let system =
474
&mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
475
476
#[cfg(feature = "hotpatching")]
477
if hotpatch_tick.is_newer_than(
478
system.get_last_run(),
479
context.environment.world_cell.change_tick(),
480
) {
481
system.refresh_hotpatch();
482
}
483
484
if !self.can_run(system_index, conditions) {
485
// NOTE: exclusive systems with ambiguities are susceptible to
486
// being significantly displaced here (compared to single-threaded order)
487
// if systems after them in topological order can run
488
// if that becomes an issue, `break;` if exclusive system
489
continue;
490
}
491
492
self.ready_systems.remove(system_index);
493
494
// SAFETY: `can_run` returned true, which means that:
495
// - There can be no systems running whose accesses would conflict with any conditions.
496
if unsafe {
497
!self.should_run(
498
system_index,
499
system,
500
conditions,
501
context.environment.world_cell,
502
context.error_handler,
503
)
504
} {
505
self.skip_system_and_signal_dependents(system_index);
506
// signal_dependents may have set more systems to ready.
507
check_for_new_ready_systems = true;
508
continue;
509
}
510
511
self.running_systems.insert(system_index);
512
self.num_running_systems += 1;
513
514
if self.system_task_metadata[system_index].is_exclusive {
515
// SAFETY: `can_run` returned true for this system,
516
// which means no systems are currently borrowed.
517
unsafe {
518
self.spawn_exclusive_system_task(context, system_index);
519
}
520
check_for_new_ready_systems = false;
521
break;
522
}
523
524
// SAFETY:
525
// - Caller ensured no other reference to this system exists.
526
// - `system_task_metadata[system_index].is_exclusive` is `false`,
527
// so `System::is_exclusive` returned `false` when we called it.
528
// - `can_run` returned true, so no systems with conflicting world access are running.
529
unsafe {
530
self.spawn_system_task(context, system_index);
531
}
532
}
533
}
534
535
// give back
536
self.ready_systems_copy = ready_systems;
537
}
538
539
fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {
540
let system_meta = &self.system_task_metadata[system_index];
541
if system_meta.is_exclusive && self.num_running_systems > 0 {
542
return false;
543
}
544
545
if !system_meta.is_send && self.local_thread_running {
546
return false;
547
}
548
549
// TODO: an earlier out if world's archetypes did not change
550
for set_idx in conditions.sets_with_conditions_of_systems[system_index]
551
.difference(&self.evaluated_sets)
552
{
553
if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
554
return false;
555
}
556
}
557
558
if !system_meta
559
.condition_conflicting_systems
560
.is_disjoint(&self.running_systems)
561
{
562
return false;
563
}
564
565
if !self.skipped_systems.contains(system_index)
566
&& !system_meta
567
.conflicting_systems
568
.is_disjoint(&self.running_systems)
569
{
570
return false;
571
}
572
573
true
574
}
575
576
/// # Safety
577
/// * `world` must have permission to read any world data required by
578
/// the system's conditions: this includes conditions for the system
579
/// itself, and conditions for any of the system's sets.
580
unsafe fn should_run(
581
&mut self,
582
system_index: usize,
583
system: &mut ScheduleSystem,
584
conditions: &mut Conditions,
585
world: UnsafeWorldCell,
586
error_handler: ErrorHandler,
587
) -> bool {
588
let mut should_run = !self.skipped_systems.contains(system_index);
589
590
for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
591
if self.evaluated_sets.contains(set_idx) {
592
continue;
593
}
594
595
// Evaluate the system set's conditions.
596
// SAFETY:
597
// - The caller ensures that `world` has permission to read any data
598
// required by the conditions.
599
let set_conditions_met = unsafe {
600
evaluate_and_fold_conditions(
601
&mut conditions.set_conditions[set_idx],
602
world,
603
error_handler,
604
system,
605
true,
606
)
607
};
608
609
if !set_conditions_met {
610
self.skipped_systems
611
.union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
612
}
613
614
should_run &= set_conditions_met;
615
self.evaluated_sets.insert(set_idx);
616
}
617
618
// Evaluate the system's conditions.
619
// SAFETY:
620
// - The caller ensures that `world` has permission to read any data
621
// required by the conditions.
622
let system_conditions_met = unsafe {
623
evaluate_and_fold_conditions(
624
&mut conditions.system_conditions[system_index],
625
world,
626
error_handler,
627
system,
628
false,
629
)
630
};
631
632
if !system_conditions_met {
633
self.skipped_systems.insert(system_index);
634
}
635
636
should_run &= system_conditions_met;
637
638
if should_run {
639
// SAFETY:
640
// - The caller ensures that `world` has permission to read any data
641
// required by the system.
642
let valid_params = match unsafe { system.validate_param_unsafe(world) } {
643
Ok(()) => true,
644
Err(e) => {
645
if !e.skipped {
646
error_handler(
647
e.into(),
648
ErrorContext::System {
649
name: system.name(),
650
last_run: system.get_last_run(),
651
},
652
);
653
}
654
false
655
}
656
};
657
if !valid_params {
658
self.skipped_systems.insert(system_index);
659
}
660
661
should_run &= valid_params;
662
}
663
664
should_run
665
}
666
667
/// # Safety
668
/// - Caller must not alias systems that are running.
669
/// - `is_exclusive` must have returned `false` for the specified system.
670
/// - `world` must have permission to access the world data
671
/// used by the specified system.
672
unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
673
// SAFETY: this system is not running, no other reference exists
674
let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
675
// Move the full context object into the new future.
676
let context = *context;
677
678
let system_meta = &self.system_task_metadata[system_index];
679
680
let task = async move {
681
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
682
// SAFETY:
683
// - The caller ensures that we have permission to
684
// access the world data used by the system.
685
// - `is_exclusive` returned false
686
unsafe {
687
if let Err(RunSystemError::Failed(err)) =
688
__rust_begin_short_backtrace::run_unsafe(
689
system,
690
context.environment.world_cell,
691
)
692
{
693
(context.error_handler)(
694
err,
695
ErrorContext::System {
696
name: system.name(),
697
last_run: system.get_last_run(),
698
},
699
);
700
}
701
};
702
}));
703
context.system_completed(system_index, res, system);
704
};
705
706
if system_meta.is_send {
707
context.scope.spawn(task);
708
} else {
709
self.local_thread_running = true;
710
context.scope.spawn_on_external(task);
711
}
712
}
713
714
/// # Safety
715
/// Caller must ensure no systems are currently borrowed.
716
unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
717
// SAFETY: this system is not running, no other reference exists
718
let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
719
// Move the full context object into the new future.
720
let context = *context;
721
722
if is_apply_deferred(&**system) {
723
// TODO: avoid allocation
724
let unapplied_systems = self.unapplied_systems.clone();
725
self.unapplied_systems.clear();
726
let task = async move {
727
// SAFETY: `can_run` returned true for this system, which means
728
// that no other systems currently have access to the world.
729
let world = unsafe { context.environment.world_cell.world_mut() };
730
let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
731
context.system_completed(system_index, res, system);
732
};
733
734
context.scope.spawn_on_scope(task);
735
} else {
736
let task = async move {
737
// SAFETY: `can_run` returned true for this system, which means
738
// that no other systems currently have access to the world.
739
let world = unsafe { context.environment.world_cell.world_mut() };
740
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
741
if let Err(RunSystemError::Failed(err)) =
742
__rust_begin_short_backtrace::run(system, world)
743
{
744
(context.error_handler)(
745
err,
746
ErrorContext::System {
747
name: system.name(),
748
last_run: system.get_last_run(),
749
},
750
);
751
}
752
}));
753
context.system_completed(system_index, res, system);
754
};
755
756
context.scope.spawn_on_scope(task);
757
}
758
759
self.exclusive_running = true;
760
self.local_thread_running = true;
761
}
762
763
fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
764
let SystemResult { system_index, .. } = result;
765
766
if self.system_task_metadata[system_index].is_exclusive {
767
self.exclusive_running = false;
768
}
769
770
if !self.system_task_metadata[system_index].is_send {
771
self.local_thread_running = false;
772
}
773
774
debug_assert!(self.num_running_systems >= 1);
775
self.num_running_systems -= 1;
776
self.running_systems.remove(system_index);
777
self.completed_systems.insert(system_index);
778
self.unapplied_systems.insert(system_index);
779
780
self.signal_dependents(system_index);
781
}
782
783
fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
784
self.completed_systems.insert(system_index);
785
self.signal_dependents(system_index);
786
}
787
788
fn signal_dependents(&mut self, system_index: usize) {
789
for &dep_idx in &self.system_task_metadata[system_index].dependents {
790
let remaining = &mut self.num_dependencies_remaining[dep_idx];
791
debug_assert!(*remaining >= 1);
792
*remaining -= 1;
793
if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
794
self.ready_systems.insert(dep_idx);
795
}
796
}
797
}
798
}
799
800
fn apply_deferred(
801
unapplied_systems: &FixedBitSet,
802
systems: &[SyncUnsafeCell<SystemWithAccess>],
803
world: &mut World,
804
) -> Result<(), Box<dyn Any + Send>> {
805
for system_index in unapplied_systems.ones() {
806
// SAFETY: none of these systems are running, no other references exist
807
let system = &mut unsafe { &mut *systems[system_index].get() }.system;
808
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
809
system.apply_deferred(world);
810
}));
811
if let Err(payload) = res {
812
#[cfg(feature = "std")]
813
#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
814
{
815
eprintln!(
816
"Encountered a panic when applying buffers for system `{}`!",
817
system.name()
818
);
819
}
820
return Err(payload);
821
}
822
}
823
Ok(())
824
}
825
826
/// # Safety
827
/// - `world` must have permission to read any world data
828
/// required by `conditions`.
829
unsafe fn evaluate_and_fold_conditions(
830
conditions: &mut [ConditionWithAccess],
831
world: UnsafeWorldCell,
832
error_handler: ErrorHandler,
833
for_system: &ScheduleSystem,
834
on_set: bool,
835
) -> bool {
836
#[expect(
837
clippy::unnecessary_fold,
838
reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
839
)]
840
conditions
841
.iter_mut()
842
.map(|ConditionWithAccess { condition, .. }| {
843
// SAFETY:
844
// - The caller ensures that `world` has permission to read any data
845
// required by the condition.
846
unsafe { condition.validate_param_unsafe(world) }
847
.map_err(From::from)
848
.and_then(|()| {
849
// SAFETY:
850
// - The caller ensures that `world` has permission to read any data
851
// required by the condition.
852
unsafe {
853
__rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world)
854
}
855
})
856
.unwrap_or_else(|err| {
857
if let RunSystemError::Failed(err) = err {
858
error_handler(
859
err,
860
ErrorContext::RunCondition {
861
name: condition.name(),
862
last_run: condition.get_last_run(),
863
system: for_system.name(),
864
on_set,
865
},
866
);
867
};
868
false
869
})
870
})
871
.fold(true, |acc, res| acc && res)
872
}
873
874
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
875
#[derive(Resource, Clone)]
876
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
877
878
impl Default for MainThreadExecutor {
879
fn default() -> Self {
880
Self::new()
881
}
882
}
883
884
impl MainThreadExecutor {
885
/// Creates a new executor that can be used to run systems on the main thread.
886
pub fn new() -> Self {
887
MainThreadExecutor(TaskPool::get_thread_executor())
888
}
889
}
890
891
#[cfg(test)]
892
mod tests {
893
use crate::{
894
prelude::Resource,
895
schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},
896
system::Commands,
897
world::World,
898
};
899
900
#[derive(Resource)]
901
struct R;
902
903
#[test]
904
fn skipped_systems_notify_dependents() {
905
let mut world = World::new();
906
let mut schedule = Schedule::default();
907
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
908
schedule.add_systems(
909
(
910
(|| {}).run_if(|| false),
911
// This system depends on a system that is always skipped.
912
|mut commands: Commands| {
913
commands.insert_resource(R);
914
},
915
)
916
.chain(),
917
);
918
schedule.run(&mut world);
919
assert!(world.get_resource::<R>().is_some());
920
}
921
922
/// Regression test for a weird bug flagged by MIRI in
923
/// `spawn_exclusive_system_task`, related to a `&mut World` being captured
924
/// inside an `async` block and somehow remaining alive even after its last use.
925
#[test]
926
fn check_spawn_exclusive_system_task_miri() {
927
let mut world = World::new();
928
let mut schedule = Schedule::default();
929
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
930
schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
931
schedule.run(&mut world);
932
}
933
}
934
935