Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/world/command_queue.rs
6849 views
1
#[cfg(feature = "track_location")]
2
use crate::change_detection::MaybeLocation;
3
use crate::{
4
system::{Command, SystemBuffer, SystemMeta},
5
world::{DeferredWorld, World},
6
};
7
8
use alloc::{boxed::Box, vec::Vec};
9
use bevy_ptr::{OwningPtr, Unaligned};
10
use core::{
11
fmt::Debug,
12
mem::{size_of, MaybeUninit},
13
panic::AssertUnwindSafe,
14
ptr::{addr_of_mut, NonNull},
15
};
16
use log::warn;
17
18
struct CommandMeta {
19
/// SAFETY: The `value` must point to a value of type `T: Command`,
20
/// where `T` is some specific type that was used to produce this metadata.
21
///
22
/// `world` is optional to allow this one function pointer to perform double-duty as a drop.
23
///
24
/// Advances `cursor` by the size of `T` in bytes.
25
consume_command_and_get_size:
26
unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
27
}
28
29
/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
30
// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
31
// as an optimization. Since commands are used frequently in systems as a way to spawn
32
// entities/components/resources, and it's not currently possible to parallelize these
33
// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
34
// preferred to simplicity of implementation.
35
pub struct CommandQueue {
36
// This buffer densely stores all queued commands.
37
//
38
// For each command, one `CommandMeta` is stored, followed by zero or more bytes
39
// to store the command itself. To interpret these bytes, a pointer must
40
// be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
41
pub(crate) bytes: Vec<MaybeUninit<u8>>,
42
pub(crate) cursor: usize,
43
pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
44
#[cfg(feature = "track_location")]
45
pub(crate) caller: MaybeLocation,
46
}
47
48
impl Default for CommandQueue {
49
#[track_caller]
50
fn default() -> Self {
51
Self {
52
bytes: Default::default(),
53
cursor: Default::default(),
54
panic_recovery: Default::default(),
55
#[cfg(feature = "track_location")]
56
caller: MaybeLocation::caller(),
57
}
58
}
59
}
60
61
/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
62
/// partially applying the world's command queue recursively
63
#[derive(Clone)]
64
pub(crate) struct RawCommandQueue {
65
pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
66
pub(crate) cursor: NonNull<usize>,
67
pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
68
}
69
70
// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
71
// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
72
// which gets extremely verbose very quickly, while also providing no useful information.
73
// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
74
// So instead, the manual impl just prints the length of vec.
75
impl Debug for CommandQueue {
76
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77
let mut binding = f.debug_struct("CommandQueue");
78
binding.field("len_bytes", &self.bytes.len());
79
80
#[cfg(feature = "track_location")]
81
binding.field("caller", &self.caller.into_option());
82
83
binding.finish_non_exhaustive()
84
}
85
}
86
87
// SAFETY: All commands [`Command`] implement [`Send`]
88
unsafe impl Send for CommandQueue {}
89
90
// SAFETY: `&CommandQueue` never gives access to the inner commands.
91
unsafe impl Sync for CommandQueue {}
92
93
impl CommandQueue {
94
/// Push a [`Command`] onto the queue.
95
#[inline]
96
pub fn push(&mut self, command: impl Command) {
97
// SAFETY: self is guaranteed to live for the lifetime of this method
98
unsafe {
99
self.get_raw().push(command);
100
}
101
}
102
103
/// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
104
/// This clears the queue.
105
#[inline]
106
pub fn apply(&mut self, world: &mut World) {
107
// flush the previously queued entities
108
world.flush_entities();
109
110
// flush the world's internal queue
111
world.flush_commands();
112
113
// SAFETY: A reference is always a valid pointer
114
unsafe {
115
self.get_raw().apply_or_drop_queued(Some(world.into()));
116
}
117
}
118
119
/// Take all commands from `other` and append them to `self`, leaving `other` empty
120
pub fn append(&mut self, other: &mut CommandQueue) {
121
self.bytes.append(&mut other.bytes);
122
}
123
124
/// Returns false if there are any commands in the queue
125
#[inline]
126
pub fn is_empty(&self) -> bool {
127
self.cursor >= self.bytes.len()
128
}
129
130
/// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
131
pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
132
// SAFETY: self is always valid memory
133
unsafe {
134
RawCommandQueue {
135
bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
136
cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
137
panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
138
}
139
}
140
}
141
}
142
143
impl RawCommandQueue {
144
/// Returns a new `RawCommandQueue` instance, this must be manually dropped.
145
pub(crate) fn new() -> Self {
146
// SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
147
unsafe {
148
Self {
149
bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
150
cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
151
panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
152
}
153
}
154
}
155
156
/// Returns true if the queue is empty.
157
///
158
/// # Safety
159
///
160
/// * Caller ensures that `bytes` and `cursor` point to valid memory
161
pub unsafe fn is_empty(&self) -> bool {
162
// SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
163
(unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
164
}
165
166
/// Push a [`Command`] onto the queue.
167
///
168
/// # Safety
169
///
170
/// * Caller ensures that `self` has not outlived the underlying queue
171
#[inline]
172
pub unsafe fn push<C: Command>(&mut self, command: C) {
173
// Stores a command alongside its metadata.
174
// `repr(C)` prevents the compiler from reordering the fields,
175
// while `repr(packed)` prevents the compiler from inserting padding bytes.
176
#[repr(C, packed)]
177
struct Packed<C: Command> {
178
meta: CommandMeta,
179
command: C,
180
}
181
182
let meta = CommandMeta {
183
consume_command_and_get_size: |command, world, cursor| {
184
*cursor += size_of::<C>();
185
186
// SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
187
// `command` must point to a value of type `C`.
188
let command: C = unsafe { command.read_unaligned() };
189
match world {
190
// Apply command to the provided world...
191
Some(mut world) => {
192
// SAFETY: Caller ensures pointer is not null
193
let world = unsafe { world.as_mut() };
194
command.apply(world);
195
// The command may have queued up world commands, which we flush here to ensure they are also picked up.
196
// If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
197
// is still at the current `stop`, ensuring only the newly queued Commands will be applied.
198
world.flush();
199
}
200
// ...or discard it.
201
None => drop(command),
202
}
203
},
204
};
205
206
// SAFETY: There are no outstanding references to self.bytes
207
let bytes = unsafe { self.bytes.as_mut() };
208
209
let old_len = bytes.len();
210
211
// Reserve enough bytes for both the metadata and the command itself.
212
bytes.reserve(size_of::<Packed<C>>());
213
214
// Pointer to the bytes at the end of the buffer.
215
// SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
216
let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
217
218
// Write the metadata into the buffer, followed by the command.
219
// We are using a packed struct to write them both as one operation.
220
// SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
221
// The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
222
// and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
223
unsafe {
224
ptr.cast::<Packed<C>>()
225
.write_unaligned(Packed { meta, command });
226
}
227
228
// Extend the length of the buffer to include the data we just wrote.
229
// SAFETY: The new length is guaranteed to fit in the vector's capacity,
230
// due to the call to `.reserve()` above.
231
unsafe {
232
bytes.set_len(old_len + size_of::<Packed<C>>());
233
}
234
}
235
236
/// If `world` is [`Some`], this will apply the queued [commands](`Command`).
237
/// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
238
/// This clears the queue.
239
///
240
/// # Safety
241
///
242
/// * Caller ensures that `self` has not outlived the underlying queue
243
#[inline]
244
pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
245
// SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
246
// If this is not the command queue on world we have exclusive ownership and self will not be mutated
247
let start = *self.cursor.as_ref();
248
let stop = self.bytes.as_ref().len();
249
let mut local_cursor = start;
250
// SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
251
// the remaining commands currently in this list. This is safe.
252
*self.cursor.as_mut() = stop;
253
254
while local_cursor < stop {
255
// SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
256
// Since we know that the cursor is in bounds, it must point to the start of a new command.
257
let meta = unsafe {
258
self.bytes
259
.as_mut()
260
.as_mut_ptr()
261
.add(local_cursor)
262
.cast::<CommandMeta>()
263
.read_unaligned()
264
};
265
266
// Advance to the bytes just after `meta`, which represent a type-erased command.
267
local_cursor += size_of::<CommandMeta>();
268
// Construct an owned pointer to the command.
269
// SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
270
// guarantees that nothing stored in the buffer will get observed after this function ends.
271
// `cmd` points to a valid address of a stored command, so it must be non-null.
272
let cmd = unsafe {
273
OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
274
self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
275
))
276
};
277
let f = AssertUnwindSafe(|| {
278
// SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
279
// since they were stored next to each other by `.push()`.
280
// For ZSTs, the type doesn't matter as long as the pointer is non-null.
281
// This also advances the cursor past the command. For ZSTs, the cursor will not move.
282
// At this point, it will either point to the next `CommandMeta`,
283
// or the cursor will be out of bounds and the loop will end.
284
unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
285
});
286
287
#[cfg(feature = "std")]
288
{
289
let result = std::panic::catch_unwind(f);
290
291
if let Err(payload) = result {
292
// local_cursor now points to the location _after_ the panicked command.
293
// Add the remaining commands that _would have_ been applied to the
294
// panic_recovery queue.
295
//
296
// This uses `current_stop` instead of `stop` to account for any commands
297
// that were queued _during_ this panic.
298
//
299
// This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
300
// an applied Command, the correct command order will be retained.
301
let panic_recovery = self.panic_recovery.as_mut();
302
let bytes = self.bytes.as_mut();
303
let current_stop = bytes.len();
304
panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
305
bytes.set_len(start);
306
*self.cursor.as_mut() = start;
307
308
// This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
309
// when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
310
// until we reach the top.
311
if start == 0 {
312
bytes.append(panic_recovery);
313
}
314
std::panic::resume_unwind(payload);
315
}
316
}
317
318
#[cfg(not(feature = "std"))]
319
(f)();
320
}
321
322
// Reset the buffer: all commands past the original `start` cursor have been applied.
323
// SAFETY: we are setting the length of bytes to the original length, minus the length of the original
324
// list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
325
unsafe {
326
self.bytes.as_mut().set_len(start);
327
*self.cursor.as_mut() = start;
328
};
329
}
330
}
331
332
impl Drop for CommandQueue {
333
fn drop(&mut self) {
334
if !self.bytes.is_empty() {
335
#[cfg(feature = "track_location")]
336
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{:?}",self.caller.into_option());
337
#[cfg(not(feature = "track_location"))]
338
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
339
}
340
// SAFETY: A reference is always a valid pointer
341
unsafe { self.get_raw().apply_or_drop_queued(None) };
342
}
343
}
344
345
impl SystemBuffer for CommandQueue {
346
#[inline]
347
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
348
#[cfg(feature = "trace")]
349
let _span_guard = _system_meta.commands_span.enter();
350
self.apply(world);
351
}
352
353
#[inline]
354
fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
355
world.commands().append(self);
356
}
357
}
358
359
#[cfg(test)]
360
mod test {
361
use super::*;
362
use crate::{component::Component, resource::Resource};
363
use alloc::{borrow::ToOwned, string::String, sync::Arc};
364
use core::{
365
panic::AssertUnwindSafe,
366
sync::atomic::{AtomicU32, Ordering},
367
};
368
369
#[cfg(miri)]
370
use alloc::format;
371
372
struct DropCheck(Arc<AtomicU32>);
373
374
impl DropCheck {
375
fn new() -> (Self, Arc<AtomicU32>) {
376
let drops = Arc::new(AtomicU32::new(0));
377
(Self(drops.clone()), drops)
378
}
379
}
380
381
impl Drop for DropCheck {
382
fn drop(&mut self) {
383
self.0.fetch_add(1, Ordering::Relaxed);
384
}
385
}
386
387
impl Command for DropCheck {
388
fn apply(self, _: &mut World) {}
389
}
390
391
#[test]
392
fn test_command_queue_inner_drop() {
393
let mut queue = CommandQueue::default();
394
395
let (dropcheck_a, drops_a) = DropCheck::new();
396
let (dropcheck_b, drops_b) = DropCheck::new();
397
398
queue.push(dropcheck_a);
399
queue.push(dropcheck_b);
400
401
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
402
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
403
404
let mut world = World::new();
405
queue.apply(&mut world);
406
407
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
408
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
409
}
410
411
/// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
412
/// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
413
#[test]
414
fn test_command_queue_inner_drop_early() {
415
let mut queue = CommandQueue::default();
416
417
let (dropcheck_a, drops_a) = DropCheck::new();
418
let (dropcheck_b, drops_b) = DropCheck::new();
419
420
queue.push(dropcheck_a);
421
queue.push(dropcheck_b);
422
423
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
424
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
425
426
drop(queue);
427
428
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
429
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
430
}
431
432
#[derive(Component)]
433
struct A;
434
435
struct SpawnCommand;
436
437
impl Command for SpawnCommand {
438
fn apply(self, world: &mut World) {
439
world.spawn(A);
440
}
441
}
442
443
#[test]
444
fn test_command_queue_inner() {
445
let mut queue = CommandQueue::default();
446
447
queue.push(SpawnCommand);
448
queue.push(SpawnCommand);
449
450
let mut world = World::new();
451
queue.apply(&mut world);
452
453
assert_eq!(world.query::<&A>().query(&world).count(), 2);
454
455
// The previous call to `apply` cleared the queue.
456
// This call should do nothing.
457
queue.apply(&mut world);
458
assert_eq!(world.query::<&A>().query(&world).count(), 2);
459
}
460
461
#[expect(
462
dead_code,
463
reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
464
)]
465
struct PanicCommand(String);
466
impl Command for PanicCommand {
467
fn apply(self, _: &mut World) {
468
panic!("command is panicking");
469
}
470
}
471
472
#[test]
473
fn test_command_queue_inner_panic_safe() {
474
std::panic::set_hook(Box::new(|_| {}));
475
476
let mut queue = CommandQueue::default();
477
478
queue.push(PanicCommand("I panic!".to_owned()));
479
queue.push(SpawnCommand);
480
481
let mut world = World::new();
482
483
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
484
queue.apply(&mut world);
485
}));
486
487
// Even though the first command panicked, it's still ok to push
488
// more commands.
489
queue.push(SpawnCommand);
490
queue.push(SpawnCommand);
491
queue.apply(&mut world);
492
assert_eq!(world.query::<&A>().query(&world).count(), 3);
493
}
494
495
#[test]
496
fn test_command_queue_inner_nested_panic_safe() {
497
std::panic::set_hook(Box::new(|_| {}));
498
499
#[derive(Resource, Default)]
500
struct Order(Vec<usize>);
501
502
let mut world = World::new();
503
world.init_resource::<Order>();
504
505
fn add_index(index: usize) -> impl Command {
506
move |world: &mut World| world.resource_mut::<Order>().0.push(index)
507
}
508
world.commands().queue(add_index(1));
509
world.commands().queue(|world: &mut World| {
510
world.commands().queue(add_index(2));
511
world.commands().queue(PanicCommand("I panic!".to_owned()));
512
world.commands().queue(add_index(3));
513
world.flush_commands();
514
});
515
world.commands().queue(add_index(4));
516
517
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
518
world.flush_commands();
519
}));
520
521
world.commands().queue(add_index(5));
522
world.flush_commands();
523
assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
524
}
525
526
// NOTE: `CommandQueue` is `Send` because `Command` is send.
527
// If the `Command` trait gets reworked to be non-send, `CommandQueue`
528
// should be reworked.
529
// This test asserts that Command types are send.
530
fn assert_is_send_impl(_: impl Send) {}
531
fn assert_is_send(command: impl Command) {
532
assert_is_send_impl(command);
533
}
534
535
#[test]
536
fn test_command_is_send() {
537
assert_is_send(SpawnCommand);
538
}
539
540
#[expect(
541
dead_code,
542
reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
543
)]
544
struct CommandWithPadding(u8, u16);
545
impl Command for CommandWithPadding {
546
fn apply(self, _: &mut World) {}
547
}
548
549
#[cfg(miri)]
550
#[test]
551
fn test_uninit_bytes() {
552
let mut queue = CommandQueue::default();
553
queue.push(CommandWithPadding(0, 0));
554
let _ = format!("{:?}", queue.bytes);
555
}
556
}
557
558