Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/message/messages.rs
6849 views
1
use crate::{
2
change_detection::MaybeLocation,
3
message::{Message, MessageCursor, MessageId, MessageInstance},
4
resource::Resource,
5
};
6
use alloc::vec::Vec;
7
use core::{
8
marker::PhantomData,
9
ops::{Deref, DerefMut},
10
};
11
#[cfg(feature = "bevy_reflect")]
12
use {
13
crate::reflect::ReflectResource,
14
bevy_reflect::{std_traits::ReflectDefault, Reflect},
15
};
16
17
/// A message collection that represents the messages that occurred within the last two
18
/// [`Messages::update`] calls.
19
/// Messages can be written to using a [`MessageWriter`]
20
/// and are typically cheaply read using a [`MessageReader`].
21
///
22
/// Each message can be consumed by multiple systems, in parallel,
23
/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24
///
25
/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26
/// is applied between writing and reading systems, there is a risk of a race condition.
27
/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28
///
29
/// This collection is meant to be paired with a system that calls
30
/// [`Messages::update`] exactly once per update/frame.
31
///
32
/// [`message_update_system`] is a system that does this, typically initialized automatically using
33
/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34
/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35
/// Messages will persist across a single frame boundary and so ordering of message producers and
36
/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37
/// If messages are not handled by the end of the frame after they are updated, they will be
38
/// dropped silently.
39
///
40
/// # Example
41
///
42
/// ```
43
/// use bevy_ecs::message::{Message, Messages};
44
///
45
/// #[derive(Message)]
46
/// struct MyMessage {
47
/// value: usize
48
/// }
49
///
50
/// // setup
51
/// let mut messages = Messages::<MyMessage>::default();
52
/// let mut cursor = messages.get_cursor();
53
///
54
/// // run this once per update/frame
55
/// messages.update();
56
///
57
/// // somewhere else: write a message
58
/// messages.write(MyMessage { value: 1 });
59
///
60
/// // somewhere else: read the messages
61
/// for message in cursor.read(&messages) {
62
/// assert_eq!(message.value, 1)
63
/// }
64
///
65
/// // messages are only processed once per reader
66
/// assert_eq!(cursor.read(&messages).count(), 0);
67
/// ```
68
///
69
/// # Details
70
///
71
/// [`Messages`] is implemented using a variation of a double buffer strategy.
72
/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73
/// - [`MessageReader`]s will read messages from both buffers.
74
/// - [`MessageReader`]s that read at least once per update will never drop messages.
75
/// - [`MessageReader`]s that read once within two updates might still receive some messages
76
/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77
/// before those updates.
78
///
79
/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80
///
81
/// An alternative call pattern would be to call [`update`](Messages::update)
82
/// manually across frames to control when messages are cleared.
83
/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84
/// but can be done by adding your message as a resource instead of using
85
/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86
///
87
/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88
/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89
///
90
/// [`MessageReader`]: super::MessageReader
91
/// [`MessageWriter`]: super::MessageWriter
92
/// [`message_update_system`]: super::message_update_system
93
#[derive(Debug, Resource)]
94
#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95
pub struct Messages<E: Message> {
96
/// Holds the oldest still active messages.
97
/// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98
pub(crate) messages_a: MessageSequence<E>,
99
/// Holds the newer messages.
100
pub(crate) messages_b: MessageSequence<E>,
101
pub(crate) message_count: usize,
102
}
103
104
// Derived Default impl would incorrectly require E: Default
105
impl<E: Message> Default for Messages<E> {
106
fn default() -> Self {
107
Self {
108
messages_a: Default::default(),
109
messages_b: Default::default(),
110
message_count: Default::default(),
111
}
112
}
113
}
114
115
impl<M: Message> Messages<M> {
116
/// Returns the index of the oldest message stored in the message buffer.
117
pub fn oldest_message_count(&self) -> usize {
118
self.messages_a.start_message_count
119
}
120
121
/// Writes an `message` to the current message buffer.
122
/// [`MessageReader`](super::MessageReader)s can then read the message.
123
/// This method returns the [ID](`MessageId`) of the written `message`.
124
#[track_caller]
125
pub fn write(&mut self, message: M) -> MessageId<M> {
126
self.write_with_caller(message, MaybeLocation::caller())
127
}
128
129
pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130
let message_id = MessageId {
131
id: self.message_count,
132
caller,
133
_marker: PhantomData,
134
};
135
#[cfg(feature = "detailed_trace")]
136
tracing::trace!("Messages::write() -> id: {}", message_id);
137
138
let message_instance = MessageInstance {
139
message_id,
140
message,
141
};
142
143
self.messages_b.push(message_instance);
144
self.message_count += 1;
145
146
message_id
147
}
148
149
/// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150
/// This is more efficient than writing each message individually.
151
/// This method returns the [IDs](`MessageId`) of the written `messages`.
152
#[track_caller]
153
pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154
let last_count = self.message_count;
155
156
self.extend(messages);
157
158
WriteBatchIds {
159
last_count,
160
message_count: self.message_count,
161
_marker: PhantomData,
162
}
163
}
164
165
/// Writes the default value of the message. Useful when the message is an empty struct.
166
/// This method returns the [ID](`MessageId`) of the written `message`.
167
#[track_caller]
168
pub fn write_default(&mut self) -> MessageId<M>
169
where
170
M: Default,
171
{
172
self.write(Default::default())
173
}
174
175
/// "Sends" an `message` by writing it to the current message buffer.
176
/// [`MessageReader`](super::MessageReader)s can then read the message.
177
/// This method returns the [ID](`MessageId`) of the sent `message`.
178
#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write` instead.")]
179
#[track_caller]
180
pub fn send(&mut self, message: M) -> MessageId<M> {
181
self.write(message)
182
}
183
184
/// Sends a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
185
/// This is more efficient than sending each message individually.
186
/// This method returns the [IDs](`MessageId`) of the sent `messages`.
187
#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_batch` instead.")]
188
#[track_caller]
189
pub fn send_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
190
self.write_batch(messages)
191
}
192
193
/// Sends the default value of the message. Useful when the message is an empty struct.
194
/// This method returns the [ID](`MessageId`) of the sent `message`.
195
#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_default` instead.")]
196
#[track_caller]
197
pub fn send_default(&mut self) -> MessageId<M>
198
where
199
M: Default,
200
{
201
self.write_default()
202
}
203
204
/// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
205
pub fn get_cursor(&self) -> MessageCursor<M> {
206
MessageCursor::default()
207
}
208
209
/// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
210
/// It will read all future messages.
211
pub fn get_cursor_current(&self) -> MessageCursor<M> {
212
MessageCursor {
213
last_message_count: self.message_count,
214
..Default::default()
215
}
216
}
217
218
/// Swaps the message buffers and clears the oldest message buffer. In general, this should be
219
/// called once per frame/update.
220
///
221
/// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
222
pub fn update(&mut self) {
223
core::mem::swap(&mut self.messages_a, &mut self.messages_b);
224
self.messages_b.clear();
225
self.messages_b.start_message_count = self.message_count;
226
debug_assert_eq!(
227
self.messages_a.start_message_count + self.messages_a.len(),
228
self.messages_b.start_message_count
229
);
230
}
231
232
/// Swaps the message buffers and drains the oldest message buffer, returning an iterator
233
/// of all messages that were removed. In general, this should be called once per frame/update.
234
///
235
/// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
236
#[must_use = "If you do not need the returned messages, call .update() instead."]
237
pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
238
core::mem::swap(&mut self.messages_a, &mut self.messages_b);
239
let iter = self.messages_b.messages.drain(..);
240
self.messages_b.start_message_count = self.message_count;
241
debug_assert_eq!(
242
self.messages_a.start_message_count + self.messages_a.len(),
243
self.messages_b.start_message_count
244
);
245
246
iter.map(|e| e.message)
247
}
248
249
#[inline]
250
fn reset_start_message_count(&mut self) {
251
self.messages_a.start_message_count = self.message_count;
252
self.messages_b.start_message_count = self.message_count;
253
}
254
255
/// Removes all messages.
256
#[inline]
257
pub fn clear(&mut self) {
258
self.reset_start_message_count();
259
self.messages_a.clear();
260
self.messages_b.clear();
261
}
262
263
/// Returns the number of messages currently stored in the message buffer.
264
#[inline]
265
pub fn len(&self) -> usize {
266
self.messages_a.len() + self.messages_b.len()
267
}
268
269
/// Returns true if there are no messages currently stored in the message buffer.
270
#[inline]
271
pub fn is_empty(&self) -> bool {
272
self.len() == 0
273
}
274
275
/// Creates a draining iterator that removes all messages.
276
pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
277
self.reset_start_message_count();
278
279
// Drain the oldest messages first, then the newest
280
self.messages_a
281
.drain(..)
282
.chain(self.messages_b.drain(..))
283
.map(|i| i.message)
284
}
285
286
/// Iterates over messages that happened since the last "update" call.
287
/// WARNING: You probably don't want to use this call. In most cases you should use an
288
/// [`MessageReader`]. You should only use this if you know you only need to consume messages
289
/// between the last `update()` call and your call to `iter_current_update_messages`.
290
/// If messages happen outside that window, they will not be handled. For example, any messages that
291
/// happen after this call and before the next `update()` call will be dropped.
292
///
293
/// [`MessageReader`]: super::MessageReader
294
pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
295
self.messages_b.iter().map(|i| &i.message)
296
}
297
298
/// Get a specific message by id if it still exists in the messages buffer.
299
pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
300
if id < self.oldest_message_count() {
301
return None;
302
}
303
304
let sequence = self.sequence(id);
305
let index = id.saturating_sub(sequence.start_message_count);
306
307
sequence
308
.get(index)
309
.map(|instance| (&instance.message, instance.message_id))
310
}
311
312
/// Which message buffer is this message id a part of.
313
fn sequence(&self, id: usize) -> &MessageSequence<M> {
314
if id < self.messages_b.start_message_count {
315
&self.messages_a
316
} else {
317
&self.messages_b
318
}
319
}
320
}
321
322
impl<E: Message> Extend<E> for Messages<E> {
323
#[track_caller]
324
fn extend<I>(&mut self, iter: I)
325
where
326
I: IntoIterator<Item = E>,
327
{
328
let old_count = self.message_count;
329
let mut message_count = self.message_count;
330
let messages = iter.into_iter().map(|message| {
331
let message_id = MessageId {
332
id: message_count,
333
caller: MaybeLocation::caller(),
334
_marker: PhantomData,
335
};
336
message_count += 1;
337
MessageInstance {
338
message_id,
339
message,
340
}
341
});
342
343
self.messages_b.extend(messages);
344
345
if old_count != message_count {
346
#[cfg(feature = "detailed_trace")]
347
tracing::trace!(
348
"Messages::extend() -> ids: ({}..{})",
349
self.message_count,
350
message_count
351
);
352
}
353
354
self.message_count = message_count;
355
}
356
}
357
358
#[derive(Debug)]
359
#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
360
pub(crate) struct MessageSequence<E: Message> {
361
pub(crate) messages: Vec<MessageInstance<E>>,
362
pub(crate) start_message_count: usize,
363
}
364
365
// Derived Default impl would incorrectly require E: Default
366
impl<E: Message> Default for MessageSequence<E> {
367
fn default() -> Self {
368
Self {
369
messages: Default::default(),
370
start_message_count: Default::default(),
371
}
372
}
373
}
374
375
impl<E: Message> Deref for MessageSequence<E> {
376
type Target = Vec<MessageInstance<E>>;
377
378
fn deref(&self) -> &Self::Target {
379
&self.messages
380
}
381
}
382
383
impl<E: Message> DerefMut for MessageSequence<E> {
384
fn deref_mut(&mut self) -> &mut Self::Target {
385
&mut self.messages
386
}
387
}
388
389
/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
390
pub struct WriteBatchIds<E> {
391
last_count: usize,
392
message_count: usize,
393
_marker: PhantomData<E>,
394
}
395
396
/// [`Iterator`] over sent [`MessageIds`](`MessageId`) from a batch.
397
#[deprecated(since = "0.17.0", note = "Use `WriteBatchIds` instead.")]
398
pub type SendBatchIds<E> = WriteBatchIds<E>;
399
400
impl<E: Message> Iterator for WriteBatchIds<E> {
401
type Item = MessageId<E>;
402
403
fn next(&mut self) -> Option<Self::Item> {
404
if self.last_count >= self.message_count {
405
return None;
406
}
407
408
let result = Some(MessageId {
409
id: self.last_count,
410
caller: MaybeLocation::caller(),
411
_marker: PhantomData,
412
});
413
414
self.last_count += 1;
415
416
result
417
}
418
}
419
420
impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {
421
fn len(&self) -> usize {
422
self.message_count.saturating_sub(self.last_count)
423
}
424
}
425
426
#[cfg(test)]
427
mod tests {
428
use crate::message::{Message, Messages};
429
430
#[test]
431
fn iter_current_update_messages_iterates_over_current_messages() {
432
#[derive(Message, Clone)]
433
struct TestMessage;
434
435
let mut test_messages = Messages::<TestMessage>::default();
436
437
// Starting empty
438
assert_eq!(test_messages.len(), 0);
439
assert_eq!(test_messages.iter_current_update_messages().count(), 0);
440
test_messages.update();
441
442
// Writing one message
443
test_messages.write(TestMessage);
444
445
assert_eq!(test_messages.len(), 1);
446
assert_eq!(test_messages.iter_current_update_messages().count(), 1);
447
test_messages.update();
448
449
// Writing two messages on the next frame
450
test_messages.write(TestMessage);
451
test_messages.write(TestMessage);
452
453
assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
454
assert_eq!(test_messages.iter_current_update_messages().count(), 2);
455
test_messages.update();
456
457
// Writing zero messages
458
assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
459
assert_eq!(test_messages.iter_current_update_messages().count(), 0);
460
}
461
}
462
463