Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/schedule/executor/mod.rs
6849 views
1
#[cfg(feature = "std")]
2
mod multi_threaded;
3
mod single_threaded;
4
5
use alloc::{vec, vec::Vec};
6
use bevy_utils::prelude::DebugName;
7
use core::any::TypeId;
8
9
pub use self::single_threaded::SingleThreadedExecutor;
10
11
#[cfg(feature = "std")]
12
pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor};
13
14
use fixedbitset::FixedBitSet;
15
16
use crate::{
17
component::{CheckChangeTicks, Tick},
18
error::{BevyError, ErrorContext, Result},
19
prelude::{IntoSystemSet, SystemSet},
20
query::FilteredAccessSet,
21
schedule::{
22
ConditionWithAccess, InternedSystemSet, SystemKey, SystemSetKey, SystemTypeSet,
23
SystemWithAccess,
24
},
25
system::{RunSystemError, System, SystemIn, SystemParamValidationError, SystemStateFlags},
26
world::{unsafe_world_cell::UnsafeWorldCell, DeferredWorld, World},
27
};
28
29
/// Types that can run a [`SystemSchedule`] on a [`World`].
30
pub(super) trait SystemExecutor: Send + Sync {
31
fn kind(&self) -> ExecutorKind;
32
fn init(&mut self, schedule: &SystemSchedule);
33
fn run(
34
&mut self,
35
schedule: &mut SystemSchedule,
36
world: &mut World,
37
skip_systems: Option<&FixedBitSet>,
38
error_handler: fn(BevyError, ErrorContext),
39
);
40
fn set_apply_final_deferred(&mut self, value: bool);
41
}
42
43
/// Specifies how a [`Schedule`](super::Schedule) will be run.
44
///
45
/// The default depends on the target platform:
46
/// - [`SingleThreaded`](ExecutorKind::SingleThreaded) on Wasm.
47
/// - [`MultiThreaded`](ExecutorKind::MultiThreaded) everywhere else.
48
#[derive(PartialEq, Eq, Default, Debug, Copy, Clone)]
49
pub enum ExecutorKind {
50
/// Runs the schedule using a single thread.
51
///
52
/// Useful if you're dealing with a single-threaded environment, saving your threads for
53
/// other things, or just trying minimize overhead.
54
#[cfg_attr(
55
any(
56
target_arch = "wasm32",
57
not(feature = "std"),
58
not(feature = "multi_threaded")
59
),
60
default
61
)]
62
SingleThreaded,
63
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
64
#[cfg(feature = "std")]
65
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi_threaded"), default)]
66
MultiThreaded,
67
}
68
69
/// Holds systems and conditions of a [`Schedule`](super::Schedule) sorted in topological order
70
/// (along with dependency information for `multi_threaded` execution).
71
///
72
/// Since the arrays are sorted in the same order, elements are referenced by their index.
73
/// [`FixedBitSet`] is used as a smaller, more efficient substitute of `HashSet<usize>`.
74
#[derive(Default)]
75
pub struct SystemSchedule {
76
/// List of system node ids.
77
pub(super) system_ids: Vec<SystemKey>,
78
/// Indexed by system node id.
79
pub(super) systems: Vec<SystemWithAccess>,
80
/// Indexed by system node id.
81
pub(super) system_conditions: Vec<Vec<ConditionWithAccess>>,
82
/// Indexed by system node id.
83
/// Number of systems that the system immediately depends on.
84
#[cfg_attr(
85
not(feature = "std"),
86
expect(dead_code, reason = "currently only used with the std feature")
87
)]
88
pub(super) system_dependencies: Vec<usize>,
89
/// Indexed by system node id.
90
/// List of systems that immediately depend on the system.
91
#[cfg_attr(
92
not(feature = "std"),
93
expect(dead_code, reason = "currently only used with the std feature")
94
)]
95
pub(super) system_dependents: Vec<Vec<usize>>,
96
/// Indexed by system node id.
97
/// List of sets containing the system that have conditions
98
pub(super) sets_with_conditions_of_systems: Vec<FixedBitSet>,
99
/// List of system set node ids.
100
pub(super) set_ids: Vec<SystemSetKey>,
101
/// Indexed by system set node id.
102
pub(super) set_conditions: Vec<Vec<ConditionWithAccess>>,
103
/// Indexed by system set node id.
104
/// List of systems that are in sets that have conditions.
105
///
106
/// If a set doesn't run because of its conditions, this is used to skip all systems in it.
107
pub(super) systems_in_sets_with_conditions: Vec<FixedBitSet>,
108
}
109
110
impl SystemSchedule {
111
/// Creates an empty [`SystemSchedule`].
112
pub const fn new() -> Self {
113
Self {
114
systems: Vec::new(),
115
system_conditions: Vec::new(),
116
set_conditions: Vec::new(),
117
system_ids: Vec::new(),
118
set_ids: Vec::new(),
119
system_dependencies: Vec::new(),
120
system_dependents: Vec::new(),
121
sets_with_conditions_of_systems: Vec::new(),
122
systems_in_sets_with_conditions: Vec::new(),
123
}
124
}
125
}
126
127
/// A special [`System`] that instructs the executor to call
128
/// [`System::apply_deferred`] on the systems that have run but not applied
129
/// their [`Deferred`] system parameters (like [`Commands`]) or other system buffers.
130
///
131
/// ## Scheduling
132
///
133
/// `ApplyDeferred` systems are scheduled *by default*
134
/// - later in the same schedule run (for example, if a system with `Commands` param
135
/// is scheduled in `Update`, all the changes will be visible in `PostUpdate`)
136
/// - between systems with dependencies if the dependency [has deferred buffers]
137
/// (if system `bar` directly or indirectly depends on `foo`, and `foo` uses
138
/// `Commands` param, changes to the world in `foo` will be visible in `bar`)
139
///
140
/// ## Notes
141
/// - This system (currently) does nothing if it's called manually or wrapped
142
/// inside a [`PipeSystem`].
143
/// - Modifying a [`Schedule`] may change the order buffers are applied.
144
///
145
/// [`System::apply_deferred`]: crate::system::System::apply_deferred
146
/// [`Deferred`]: crate::system::Deferred
147
/// [`Commands`]: crate::prelude::Commands
148
/// [has deferred buffers]: crate::system::System::has_deferred
149
/// [`PipeSystem`]: crate::system::PipeSystem
150
/// [`Schedule`]: super::Schedule
151
#[doc(alias = "apply_system_buffers")]
152
pub struct ApplyDeferred;
153
154
/// Returns `true` if the [`System`] is an instance of [`ApplyDeferred`].
155
pub(super) fn is_apply_deferred(system: &dyn System<In = (), Out = ()>) -> bool {
156
system.type_id() == TypeId::of::<ApplyDeferred>()
157
}
158
159
impl System for ApplyDeferred {
160
type In = ();
161
type Out = ();
162
163
fn name(&self) -> DebugName {
164
DebugName::borrowed("bevy_ecs::apply_deferred")
165
}
166
167
fn flags(&self) -> SystemStateFlags {
168
// non-send , exclusive , no deferred
169
SystemStateFlags::NON_SEND | SystemStateFlags::EXCLUSIVE
170
}
171
172
unsafe fn run_unsafe(
173
&mut self,
174
_input: SystemIn<'_, Self>,
175
_world: UnsafeWorldCell,
176
) -> Result<Self::Out, RunSystemError> {
177
// This system does nothing on its own. The executor will apply deferred
178
// commands from other systems instead of running this system.
179
Ok(())
180
}
181
182
#[cfg(feature = "hotpatching")]
183
#[inline]
184
fn refresh_hotpatch(&mut self) {}
185
186
fn run(
187
&mut self,
188
_input: SystemIn<'_, Self>,
189
_world: &mut World,
190
) -> Result<Self::Out, RunSystemError> {
191
// This system does nothing on its own. The executor will apply deferred
192
// commands from other systems instead of running this system.
193
Ok(())
194
}
195
196
fn apply_deferred(&mut self, _world: &mut World) {}
197
198
fn queue_deferred(&mut self, _world: DeferredWorld) {}
199
200
unsafe fn validate_param_unsafe(
201
&mut self,
202
_world: UnsafeWorldCell,
203
) -> Result<(), SystemParamValidationError> {
204
// This system is always valid to run because it doesn't do anything,
205
// and only used as a marker for the executor.
206
Ok(())
207
}
208
209
fn initialize(&mut self, _world: &mut World) -> FilteredAccessSet {
210
FilteredAccessSet::new()
211
}
212
213
fn check_change_tick(&mut self, _check: CheckChangeTicks) {}
214
215
fn default_system_sets(&self) -> Vec<InternedSystemSet> {
216
vec![SystemTypeSet::<Self>::new().intern()]
217
}
218
219
fn get_last_run(&self) -> Tick {
220
// This system is never run, so it has no last run tick.
221
Tick::MAX
222
}
223
224
fn set_last_run(&mut self, _last_run: Tick) {}
225
}
226
227
impl IntoSystemSet<()> for ApplyDeferred {
228
type Set = SystemTypeSet<Self>;
229
230
fn into_system_set(self) -> Self::Set {
231
SystemTypeSet::<Self>::new()
232
}
233
}
234
235
/// These functions hide the bottom of the callstack from `RUST_BACKTRACE=1` (assuming the default panic handler is used).
236
///
237
/// The full callstack will still be visible with `RUST_BACKTRACE=full`.
238
/// They are specialized for `System::run` & co instead of being generic over closures because this avoids an
239
/// extra frame in the backtrace.
240
///
241
/// This is reliant on undocumented behavior in Rust's default panic handler, which checks the call stack for symbols
242
/// containing the string `__rust_begin_short_backtrace` in their mangled name.
243
mod __rust_begin_short_backtrace {
244
use core::hint::black_box;
245
246
#[cfg(feature = "std")]
247
use crate::world::unsafe_world_cell::UnsafeWorldCell;
248
use crate::{
249
error::Result,
250
system::{ReadOnlySystem, RunSystemError, ScheduleSystem},
251
world::World,
252
};
253
254
/// # Safety
255
/// See `System::run_unsafe`.
256
// This is only used by `MultiThreadedExecutor`, and would be dead code without `std`.
257
#[cfg(feature = "std")]
258
#[inline(never)]
259
pub(super) unsafe fn run_unsafe(
260
system: &mut ScheduleSystem,
261
world: UnsafeWorldCell,
262
) -> Result<(), RunSystemError> {
263
let result = system.run_unsafe((), world);
264
// Call `black_box` to prevent this frame from being tail-call optimized away
265
black_box(());
266
result
267
}
268
269
/// # Safety
270
/// See `ReadOnlySystem::run_unsafe`.
271
// This is only used by `MultiThreadedExecutor`, and would be dead code without `std`.
272
#[cfg(feature = "std")]
273
#[inline(never)]
274
pub(super) unsafe fn readonly_run_unsafe<O: 'static>(
275
system: &mut dyn ReadOnlySystem<In = (), Out = O>,
276
world: UnsafeWorldCell,
277
) -> Result<O, RunSystemError> {
278
// Call `black_box` to prevent this frame from being tail-call optimized away
279
black_box(system.run_unsafe((), world))
280
}
281
282
#[cfg(feature = "std")]
283
#[inline(never)]
284
pub(super) fn run(
285
system: &mut ScheduleSystem,
286
world: &mut World,
287
) -> Result<(), RunSystemError> {
288
let result = system.run((), world);
289
// Call `black_box` to prevent this frame from being tail-call optimized away
290
black_box(());
291
result
292
}
293
294
#[inline(never)]
295
pub(super) fn run_without_applying_deferred(
296
system: &mut ScheduleSystem,
297
world: &mut World,
298
) -> Result<(), RunSystemError> {
299
let result = system.run_without_applying_deferred((), world);
300
// Call `black_box` to prevent this frame from being tail-call optimized away
301
black_box(());
302
result
303
}
304
305
#[inline(never)]
306
pub(super) fn readonly_run<O: 'static>(
307
system: &mut dyn ReadOnlySystem<In = (), Out = O>,
308
world: &mut World,
309
) -> Result<O, RunSystemError> {
310
// Call `black_box` to prevent this frame from being tail-call optimized away
311
black_box(system.run((), world))
312
}
313
}
314
315
#[cfg(test)]
316
mod tests {
317
use crate::{
318
prelude::{Component, In, IntoSystem, Resource, Schedule},
319
schedule::ExecutorKind,
320
system::{Populated, Res, ResMut, Single},
321
world::World,
322
};
323
324
#[derive(Component)]
325
struct TestComponent;
326
327
const EXECUTORS: [ExecutorKind; 2] =
328
[ExecutorKind::SingleThreaded, ExecutorKind::MultiThreaded];
329
330
#[derive(Resource, Default)]
331
struct TestState {
332
populated_ran: bool,
333
single_ran: bool,
334
}
335
336
#[derive(Resource, Default)]
337
struct Counter(u8);
338
339
fn set_single_state(mut _single: Single<&TestComponent>, mut state: ResMut<TestState>) {
340
state.single_ran = true;
341
}
342
343
fn set_populated_state(
344
mut _populated: Populated<&TestComponent>,
345
mut state: ResMut<TestState>,
346
) {
347
state.populated_ran = true;
348
}
349
350
#[test]
351
#[expect(clippy::print_stdout, reason = "std and println are allowed in tests")]
352
fn single_and_populated_skipped_and_run() {
353
for executor in EXECUTORS {
354
std::println!("Testing executor: {executor:?}");
355
356
let mut world = World::new();
357
world.init_resource::<TestState>();
358
359
let mut schedule = Schedule::default();
360
schedule.set_executor_kind(executor);
361
schedule.add_systems((set_single_state, set_populated_state));
362
schedule.run(&mut world);
363
364
let state = world.get_resource::<TestState>().unwrap();
365
assert!(!state.single_ran);
366
assert!(!state.populated_ran);
367
368
world.spawn(TestComponent);
369
370
schedule.run(&mut world);
371
let state = world.get_resource::<TestState>().unwrap();
372
assert!(state.single_ran);
373
assert!(state.populated_ran);
374
}
375
}
376
377
fn look_for_missing_resource(_res: Res<TestState>) {}
378
379
#[test]
380
#[should_panic]
381
fn missing_resource_panics_single_threaded() {
382
let mut world = World::new();
383
let mut schedule = Schedule::default();
384
385
schedule.set_executor_kind(ExecutorKind::SingleThreaded);
386
schedule.add_systems(look_for_missing_resource);
387
schedule.run(&mut world);
388
}
389
390
#[test]
391
#[should_panic]
392
fn missing_resource_panics_multi_threaded() {
393
let mut world = World::new();
394
let mut schedule = Schedule::default();
395
396
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
397
schedule.add_systems(look_for_missing_resource);
398
schedule.run(&mut world);
399
}
400
401
#[test]
402
fn piped_systems_first_system_skipped() {
403
// This system should be skipped when run due to no matching entity
404
fn pipe_out(_single: Single<&TestComponent>) -> u8 {
405
42
406
}
407
408
fn pipe_in(_input: In<u8>, mut counter: ResMut<Counter>) {
409
counter.0 += 1;
410
}
411
412
let mut world = World::new();
413
world.init_resource::<Counter>();
414
let mut schedule = Schedule::default();
415
416
schedule.add_systems(pipe_out.pipe(pipe_in));
417
schedule.run(&mut world);
418
419
let counter = world.resource::<Counter>();
420
assert_eq!(counter.0, 0);
421
}
422
423
#[test]
424
fn piped_system_second_system_skipped() {
425
// This system will be run before the second system is validated
426
fn pipe_out(mut counter: ResMut<Counter>) -> u8 {
427
counter.0 += 1;
428
42
429
}
430
431
// This system should be skipped when run due to no matching entity
432
fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>, mut counter: ResMut<Counter>) {
433
counter.0 += 1;
434
}
435
436
let mut world = World::new();
437
world.init_resource::<Counter>();
438
let mut schedule = Schedule::default();
439
440
schedule.add_systems(pipe_out.pipe(pipe_in));
441
schedule.run(&mut world);
442
let counter = world.resource::<Counter>();
443
assert_eq!(counter.0, 1);
444
}
445
446
#[test]
447
#[should_panic]
448
fn piped_system_first_system_panics() {
449
// This system should panic when run because the resource is missing
450
fn pipe_out(_res: Res<TestState>) -> u8 {
451
42
452
}
453
454
fn pipe_in(_input: In<u8>) {}
455
456
let mut world = World::new();
457
let mut schedule = Schedule::default();
458
459
schedule.add_systems(pipe_out.pipe(pipe_in));
460
schedule.run(&mut world);
461
}
462
463
#[test]
464
#[should_panic]
465
fn piped_system_second_system_panics() {
466
fn pipe_out() -> u8 {
467
42
468
}
469
470
// This system should panic when run because the resource is missing
471
fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
472
473
let mut world = World::new();
474
let mut schedule = Schedule::default();
475
476
schedule.add_systems(pipe_out.pipe(pipe_in));
477
schedule.run(&mut world);
478
}
479
480
// This test runs without panicking because we've
481
// decided to use early-out behavior for piped systems
482
#[test]
483
fn piped_system_skip_and_panic() {
484
// This system should be skipped when run due to no matching entity
485
fn pipe_out(_single: Single<&TestComponent>) -> u8 {
486
42
487
}
488
489
// This system should panic when run because the resource is missing
490
fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
491
492
let mut world = World::new();
493
let mut schedule = Schedule::default();
494
495
schedule.add_systems(pipe_out.pipe(pipe_in));
496
schedule.run(&mut world);
497
}
498
499
#[test]
500
#[should_panic]
501
fn piped_system_panic_and_skip() {
502
// This system should panic when run because the resource is missing
503
504
fn pipe_out(_res: Res<TestState>) -> u8 {
505
42
506
}
507
508
// This system should be skipped when run due to no matching entity
509
fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>) {}
510
511
let mut world = World::new();
512
let mut schedule = Schedule::default();
513
514
schedule.add_systems(pipe_out.pipe(pipe_in));
515
schedule.run(&mut world);
516
}
517
518
#[test]
519
#[should_panic]
520
fn piped_system_panic_and_panic() {
521
// This system should panic when run because the resource is missing
522
523
fn pipe_out(_res: Res<TestState>) -> u8 {
524
42
525
}
526
527
// This system should panic when run because the resource is missing
528
fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
529
530
let mut world = World::new();
531
let mut schedule = Schedule::default();
532
533
schedule.add_systems(pipe_out.pipe(pipe_in));
534
schedule.run(&mut world);
535
}
536
537
#[test]
538
fn piped_system_skip_and_skip() {
539
// This system should be skipped when run due to no matching entity
540
541
fn pipe_out(_single: Single<&TestComponent>, mut counter: ResMut<Counter>) -> u8 {
542
counter.0 += 1;
543
42
544
}
545
546
// This system should be skipped when run due to no matching entity
547
fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>, mut counter: ResMut<Counter>) {
548
counter.0 += 1;
549
}
550
551
let mut world = World::new();
552
world.init_resource::<Counter>();
553
let mut schedule = Schedule::default();
554
555
schedule.add_systems(pipe_out.pipe(pipe_in));
556
schedule.run(&mut world);
557
558
let counter = world.resource::<Counter>();
559
assert_eq!(counter.0, 0);
560
}
561
}
562
563